Moving to a single-page Angular application also necessitated moving away from the standard Rails application that VendorX had been using. Ruby was, however, working very well for us as a development language, so we instead built a backing API using Grape, a Rack-based API framework. We also needed to move away from MySQL for the sake of speed. Our datastore was becoming large and unmanageable with any sort of relational system, so we migrated our data to Elasticsearch and (somewhat later) Neo4j.

This section of the repository includes 4 files:

  • metric.rb, a virtual model class to pull data from Elasticsearch and return it as JSON to the front end
  • esquire.rb, a useful means of constructing the set of Elasticsearch queries we used
  • definition.rb, another virtual model class to control and update Datasift
  • csdl_helpers.rb, a utility to convert definitions into Datasift CSDL.

Sample 1: Generating Queries

Thor presented data by means of aggregated Elasticsearch queries. We used several pre-defined axes of analysis: Top Authors (by Volume, by Mention, by Top Hashtags, by Mentions by Top Authors) and Top Hashtags (by Volume, by Top Authors). All of these queries were filterable by date range, and could exclude unwanted data. To facilitate all of this, I created a query library called Esquire, and a virtual “Model” class for metrics:

# This is an excerpt from models/metric.rb
module Metric
  class TopAuthors
    def by_volume(opts={})
      opts.to_options! # to_options! turned String hash keys into Symbols

      # Top Authors By Volume searched the data and returned a list of the top n
      # authors, ordered by the number of results found for each.
      q = Esquire.new
      q.configure(opts)
      q.source_filter(@project, @track)
      q.date_filter(opts[:start_date], opts[:end_date])
      q.aggregations[:top_authors] = {terms: {field: 'interaction.author.username', size: 50}}

      # MeasuredQuery was a basic query that included instrumentation to give us an idea how
      # quickly our queries were running. Since they were usually pretty fast, this was mostly
      # to alert us to someone sending in very, very large date ranges, or a problem with the
      # infrastructure that needed addressing.
      result = self.MeasuredQuery(q.build, "authors.#{__method__}", opts)
      unless result.nil? || result.empty?
        list = result['aggregations']['top_authors']['buckets']
        list = self.normalize_keys(list, ['key', 'doc_count'], ['screen_name', 'count'])
        return list
      end
    end # end #by_volume
  end # end ::TopAuthors

  class TopHashtags
    def by_top_authors(opts={})
      opts.to_options!

      # For Top Hashtags By Top Authors, we actually had to first perform a Top Authors By Volume
      # query, then use that to extract the screen names of those top authors, and pass them to
      # the Top Hashtags query as a filter.
      author_search = Metric::TopAuthors.new(@project, @track)
      # From that query, we bundle up all the author names as an array of strings.
      top_authors   = author_search.by_volume(opts).map{|bucket| bucket['screen_name']}

      q = Esquire.new
      q.configure(opts)
      q.source_filter(@project, @track)
      q.date_filter(opts[:start_date], opts[:end_date])
      q.qfilters << {exists: {field: 'interaction.hashtags'}}
      q.qfilters << {terms: {'interaction.author.username' => top_authors}} # here's where we used the Top Authors
      q.facets[:top_hashtags] = {terms: {field: 'interaction.hashtags', size: 50, shard_size: 50}}

      result = self.MeasuredQuery(q.build, "hashtags.#{__method__}", opts)
      unless result.nil? || result.empty?
        list = result['facets']['top_hashtags']['terms']
        list = self.normalize_keys(list, ['term'], ['hashtag'])
        list
      end
    end
  end

Sample 2: Controlling Data Intake

Our source for all of this data was Datasift, which at the time was a Twitter firehose partner. From time to time, our customers would need to change the data that we were ingesting - to add or remove tracked keywords, hashtags, or authors. Datasift provided a public API to do all of these things, but we didn’t want to add that functionality directly into the frontend client app. So, we added a control surface to the API to manage and shape the data stream.

# There was one Definition per customer, which combined all of the seperate projects and topics into
# a single structure. This model object kept a copy of the most recent CSDL (Customer Service
# Definition Language) statement used to configure the data stream, and also a copy of the hash ID
# of the job that was running in Datasift.
class Definition
  # excerpt from definition.rb
  include Mongoid::Document
  include Mongoid::Timestamps

  field :csdl,            type: String,  default: ''
  field :last_updated_at, type: DateTime
  field :datasift_hash,   type: String,  default: ''
  field :subscription_id, type: String,  default: ''
  field :valid,           type: Boolean, default: false

  # A lifecycle hook; anytime we called for a Definition object, we wanted to create
  # a fresh connection to the Datasift system.
  after_find :network_setup

  def network_setup
    datasift_config  = YAML.load_file(File.expand_path('../../../config/', __FILE__) + '/datasift.yml')
    @datasift_client = DataSift::User.new(datasift_config['username'], datasift_config['api_key'], false)
  end

  # Rebuild the CSDL definition from the Project and Topic objects
  # this gathered all of the keywords, hashtags, and authors from the various
  # topics in the project and converted them into a CSDL statement.
  def refresh(force=false)    
    projects      = Project.where(active: true)
    new_timestamp = projects.max_by(&:terms_updated_at).terms_updated_at.utc.to_i
    if force == true || new_timestamp > last_updated_at.utc.to_i
      # CSDLHelper was a useful, but not very interesting, library to turn the data
      # in Project and Topic objects into Datasift's stream-curation language. It
      # will be included in the repository because it was an interesting challenge
      # in marshalling a whole bunch of stuff into a very specific shape.
      csdl       = Thor::CSDLHelpers.generate(projects).gsub(/\"\"/,'"')
      update_attributes!(last_updated_at: new_timestamp, csdl: csdl)

      definition = @datasift_client.createDefinition(csdl)
      if definition.validate
        update_attributes!(datasift_hash: definition.hash, valid: true)
      else
        update_attributes!(datasift_hash: '', valid: false)
      end

      return true
    end
    false
  end

  # This would rebuild the CSDL via #refresh, and then send it off to Datasift
  # as a Push Definition, which instructed Datasift to dump its data onto the
  # right side of a Redis queue.
  def start_collection(force_refresh=false)
    updated = refresh(force_refresh)
    if updated || subscription_id.empty?

      unless subscription_id.empty?
        stop_collection
      end

      # create new stream
      pushdef = @datasift_client.createPushDefinition()
      pushdef.output_type               = 'redis'
      pushdef.output_params['host']     = @redis_config['public_host']
      pushdef.output_params['port']     = @redis_config['port']
      pushdef.output_params['database'] = @redis_config['db']
      pushdef.output_params['list']     = @redis_config['queue']
      pushdef.output_params['format']   = 'json_interaction'

      subscr  = pushdef.subscribeStreamHash(datasift_hash, "redis.outpost.#{ENV['RACK_ENV']}")

      update_attributes!(subscription_id: subscr.id)
    end
  end
end

Note: The CSDLHelper file is here for completeness; it’s not as interesting code-wise, but it was a challenge to build.

Previous: Thor Overview

Next: Thor Ingestion Supervisor