The VendorX API
Moving to a single-page Angular application also necessitated moving away from the standard Rails application that VendorX had been using. Ruby was, however, working very well for us as a development language, so we instead built a backing API using Grape, a Rack-based API framework. We also needed to move away from MySQL for the sake of speed. Our datastore was becoming large and unmanageable with any sort of relational system, so we migrated our data to Elasticsearch and (somewhat later) Neo4j.
This section of the repository includes 4 files:
- metric.rb, a virtual model class to pull data from Elasticsearch and return it as JSON to the front end
- esquire.rb, a useful means of constructing the set of Elasticsearch queries we used
- definition.rb, another virtual model class to control and update Datasift
- csdl_helpers.rb, a utility to convert definitions into Datasift CSDL.
Sample 1: Generating Queries
Thor presented data by means of aggregated Elasticsearch queries. We used several pre-defined axes of analysis: Top Authors (by Volume, by Mention, by Top Hashtags, by Mentions by Top Authors) and Top Hashtags (by Volume, by Top Authors). All of these queries were filterable by date range, and could exclude unwanted data. To facilitate all of this, I created a query library called Esquire, and a virtual “Model” class for metrics:
# This is an excerpt from models/metric.rb
module Metric
class TopAuthors
def by_volume(opts={})
opts.to_options! # to_options! turned String hash keys into Symbols
# Top Authors By Volume searched the data and returned a list of the top n
# authors, ordered by the number of results found for each.
q = Esquire.new
q.configure(opts)
q.source_filter(@project, @track)
q.date_filter(opts[:start_date], opts[:end_date])
q.aggregations[:top_authors] = {terms: {field: 'interaction.author.username', size: 50}}
# MeasuredQuery was a basic query that included instrumentation to give us an idea how
# quickly our queries were running. Since they were usually pretty fast, this was mostly
# to alert us to someone sending in very, very large date ranges, or a problem with the
# infrastructure that needed addressing.
result = self.MeasuredQuery(q.build, "authors.#{__method__}", opts)
unless result.nil? || result.empty?
list = result['aggregations']['top_authors']['buckets']
list = self.normalize_keys(list, ['key', 'doc_count'], ['screen_name', 'count'])
return list
end
end # end #by_volume
end # end ::TopAuthors
class TopHashtags
def by_top_authors(opts={})
opts.to_options!
# For Top Hashtags By Top Authors, we actually had to first perform a Top Authors By Volume
# query, then use that to extract the screen names of those top authors, and pass them to
# the Top Hashtags query as a filter.
author_search = Metric::TopAuthors.new(@project, @track)
# From that query, we bundle up all the author names as an array of strings.
top_authors = author_search.by_volume(opts).map{|bucket| bucket['screen_name']}
q = Esquire.new
q.configure(opts)
q.source_filter(@project, @track)
q.date_filter(opts[:start_date], opts[:end_date])
q.qfilters << {exists: {field: 'interaction.hashtags'}}
q.qfilters << {terms: {'interaction.author.username' => top_authors}} # here's where we used the Top Authors
q.facets[:top_hashtags] = {terms: {field: 'interaction.hashtags', size: 50, shard_size: 50}}
result = self.MeasuredQuery(q.build, "hashtags.#{__method__}", opts)
unless result.nil? || result.empty?
list = result['facets']['top_hashtags']['terms']
list = self.normalize_keys(list, ['term'], ['hashtag'])
list
end
end
end
Sample 2: Controlling Data Intake
Our source for all of this data was Datasift, which at the time was a Twitter firehose partner. From time to time, our customers would need to change the data that we were ingesting - to add or remove tracked keywords, hashtags, or authors. Datasift provided a public API to do all of these things, but we didn’t want to add that functionality directly into the frontend client app. So, we added a control surface to the API to manage and shape the data stream.
# There was one Definition per customer, which combined all of the seperate projects and topics into
# a single structure. This model object kept a copy of the most recent CSDL (Customer Service
# Definition Language) statement used to configure the data stream, and also a copy of the hash ID
# of the job that was running in Datasift.
class Definition
# excerpt from definition.rb
include Mongoid::Document
include Mongoid::Timestamps
field :csdl, type: String, default: ''
field :last_updated_at, type: DateTime
field :datasift_hash, type: String, default: ''
field :subscription_id, type: String, default: ''
field :valid, type: Boolean, default: false
# A lifecycle hook; anytime we called for a Definition object, we wanted to create
# a fresh connection to the Datasift system.
after_find :network_setup
def network_setup
datasift_config = YAML.load_file(File.expand_path('../../../config/', __FILE__) + '/datasift.yml')
@datasift_client = DataSift::User.new(datasift_config['username'], datasift_config['api_key'], false)
end
# Rebuild the CSDL definition from the Project and Topic objects
# this gathered all of the keywords, hashtags, and authors from the various
# topics in the project and converted them into a CSDL statement.
def refresh(force=false)
projects = Project.where(active: true)
new_timestamp = projects.max_by(&:terms_updated_at).terms_updated_at.utc.to_i
if force == true || new_timestamp > last_updated_at.utc.to_i
# CSDLHelper was a useful, but not very interesting, library to turn the data
# in Project and Topic objects into Datasift's stream-curation language. It
# will be included in the repository because it was an interesting challenge
# in marshalling a whole bunch of stuff into a very specific shape.
csdl = Thor::CSDLHelpers.generate(projects).gsub(/\"\"/,'"')
update_attributes!(last_updated_at: new_timestamp, csdl: csdl)
definition = @datasift_client.createDefinition(csdl)
if definition.validate
update_attributes!(datasift_hash: definition.hash, valid: true)
else
update_attributes!(datasift_hash: '', valid: false)
end
return true
end
false
end
# This would rebuild the CSDL via #refresh, and then send it off to Datasift
# as a Push Definition, which instructed Datasift to dump its data onto the
# right side of a Redis queue.
def start_collection(force_refresh=false)
updated = refresh(force_refresh)
if updated || subscription_id.empty?
unless subscription_id.empty?
stop_collection
end
# create new stream
pushdef = @datasift_client.createPushDefinition()
pushdef.output_type = 'redis'
pushdef.output_params['host'] = @redis_config['public_host']
pushdef.output_params['port'] = @redis_config['port']
pushdef.output_params['database'] = @redis_config['db']
pushdef.output_params['list'] = @redis_config['queue']
pushdef.output_params['format'] = 'json_interaction'
subscr = pushdef.subscribeStreamHash(datasift_hash, "redis.outpost.#{ENV['RACK_ENV']}")
update_attributes!(subscription_id: subscr.id)
end
end
end
Note: The CSDLHelper file is here for completeness; it’s not as interesting code-wise, but it was a challenge to build.
Previous: Thor Overview