As mentioned on other pages, VendorX used Datasift to push large quantities of data onto a Redis queue running on AWS. The same machine that hosted that queue also included a supervisor application, written in Ruby and based on DaemonKit. Its job was to keep an eye on the size of the queue over time. Based on the size of the queue, it had the ability to spawn multiple AWS micro instances from an AMI image, which contained a basic Ruby execution environment and a processor application.

This section of the repository includes 2 files:

Sample 1: Measuring Desired Swarm Size

This is a simplistic algorithm designed to ensure that we have no fewer than 2 and no more than 7 processor nodes running at any given time. We establish a base number of processors for various queue lengths, and then we examine a moving histogram of the change in the length of the queue to see if we need to add a bit more processing power to handle whatever sort of surge we’re in.

# This is a combination of excerpts from several files, arranged here for convenience.

DaemonKit::Application.running! do |config|
  # Initialize an empty histogram
  @histogram = []

  # quick hack to get Infinity. Infinity is a constant in Ruby, but cannot be
  # referred to or instantiated or accessed in any way other than to divide 1.0 by 0.
  # This, I presume, is a feature.
  Inf = 1.0/0

  @ranges = {
    0..100000      => 2,
    100001..250000 => 3,
    250001..500000 => 4,
    500001..Inf    => 5
  }
end

# get the number of sequential intervals in which the queue has been larger or
# smaller than the measurement before it. Returns a positive integer for consecutive
# queue length increases, a negative integer for consecutive decreases, or zero.
def histogram_trend
  histo = @histogram.dup.reverse
  last  = histo[0]
  trend = 0
  dir   = 0
  for x in histo
    break if x == 0
    if x > last
      break if dir < 0
      trend += 1
      dir = 1
      last = x
    elsif x < last
      break if dir > 0
      trend += 1
      dir = -1
      last = x
    else
      trend++
    end
  end
  return dir*trend
end

loop do
  # Get the size of the current processing queue
  current_qlen = @queue.llen @app_config['queue']['q']

  # push this measurement onto the histogram and trim the oldest entry
  @histogram << current_qlen
  @histogram.shift

  # How many workers does this queue size call for?
  desired_workers = 0
  @ranges.each {|k,v| desired_workers = v if k.include? qlen}

  # get the histogram trend
  # the histogram trend measures data in 2-minute increments. If a trend has
  # continued for 10 consecutive minutes (5 entries), add or subtract 1 from
  # the desired workers. If 20 minutes, add or subtract 2.
  trend = histogram_trend()
  if trend >= 0
    desired_workers += trend / 5
  elsif trend < 0
    desired_workers -= trend.abs / 5
  end

  # Finally, we want to have 2 workers running at all times. Don't go lower than 2.
  desired_workers = 2 if desired_workers < 2

  # there's more here, it'll be shown in Sample #2
end

Sample #2: Spawning or Reaping Processor Nodes

In the first example, we went through steps to get a number of desired workers. This example is simpler, designed to grow or shrink the size of the processor swarm based on the number of desired workers we get.

loop do
  # ... all the stuff to get our desired swarm size

  # Get our current processing swarm for this stack (without any terminated ones)
  instances = @ec2.instances.select { |instance|
    instance.tags['App']   == @aws_config['tags']['App']  &&
    instance.tags['Role']  == @aws_config['tags']['Role'] &&
    instance.tags['Stack'] == DaemonKit.env.capitalize
  }.keep_if {|instance| instance.status != :terminated}

  # how many more or fewer instances do we require?
  delta = desired_workers - instances.size

  # take action!
  if delta == 0
    # nothing to do here, we've got the swarm we need, if not the swarm we deserve right now.
    # I'm such a dork.
  elsif delta < 0
    # we need to kill some instances. Find the $delta oldest and terminate them.
    kill_list = instances.sort_by(&:launch_time)[0..(delta.abs - 1)]
    kill_list.each do|instance|
      DaemonKit.logger.info "\t RIP: Instance #{instance.id} []"
      instance.terminate
    end
  elsif delta > 0
    # we need to launch some instances.
    (0..(delta.abs - 1)).each do |n|
      instance = @ec2.instances.create(
        image_id: @aws_config['ec2']['image_id'],
        instance_type: @aws_config['ec2']['instance_type'],
        count: 1,
        security_groups: @aws_config['ec2']['security_groups'],
        key_pair: @ec2.key_pairs[@aws_config['ec2']['key_pair_id']],
        instance_initiated_shutdown_behavior: 'terminate',
        user_data: ''
      )
      instance.tag('Name',  value: "thor-integration-qproc-#{instances.size + 1 + n}")
      instance.tag('App',   value: @aws_config['tags']['App'])
      instance.tag('Role',  value: @aws_config['tags']['Role'])
      instance.tag('Stack', value: DaemonKit.env.capitalize)

      DaemonKit.logger.info "\t Spawned: Instance #{instance.id}"
    end
  end

  # sleep now. Wake in 2 minutes to check our status again.
  sleep 120
end

Previous: Thor Ingestion Supervisor

Next: Thor Ingestion Processor