The VendorX Ingestion Supervisor
As mentioned on other pages, VendorX used Datasift to push large quantities of data onto a Redis queue running on AWS. The same machine that hosted that queue also included a supervisor application, written in Ruby and based on DaemonKit. Its job was to keep an eye on the size of the queue over time. Based on the size of the queue, it had the ability to spawn multiple AWS micro instances from an AMI image, which contained a basic Ruby execution environment and a processor application.
This section of the repository includes 2 files:
- thor-ingest-supervisor-daemon.rb, the configuration and work section of the supervisor daemon
- thor-ingest-supervisor.rb, a library of useful code that supports the supervisor daemon
Sample 1: Measuring Desired Swarm Size
This is a simplistic algorithm designed to ensure that we have no fewer than 2 and no more than 7 processor nodes running at any given time. We establish a base number of processors for various queue lengths, and then we examine a moving histogram of the change in the length of the queue to see if we need to add a bit more processing power to handle whatever sort of surge we’re in.
# This is a combination of excerpts from several files, arranged here for convenience.
DaemonKit::Application.running! do |config|
# Initialize an empty histogram
@histogram = []
# quick hack to get Infinity. Infinity is a constant in Ruby, but cannot be
# referred to or instantiated or accessed in any way other than to divide 1.0 by 0.
# This, I presume, is a feature.
Inf = 1.0/0
@ranges = {
0..100000 => 2,
100001..250000 => 3,
250001..500000 => 4,
500001..Inf => 5
}
end
# get the number of sequential intervals in which the queue has been larger or
# smaller than the measurement before it. Returns a positive integer for consecutive
# queue length increases, a negative integer for consecutive decreases, or zero.
def histogram_trend
histo = @histogram.dup.reverse
last = histo[0]
trend = 0
dir = 0
for x in histo
break if x == 0
if x > last
break if dir < 0
trend += 1
dir = 1
last = x
elsif x < last
break if dir > 0
trend += 1
dir = -1
last = x
else
trend++
end
end
return dir*trend
end
loop do
# Get the size of the current processing queue
current_qlen = @queue.llen @app_config['queue']['q']
# push this measurement onto the histogram and trim the oldest entry
@histogram << current_qlen
@histogram.shift
# How many workers does this queue size call for?
desired_workers = 0
@ranges.each {|k,v| desired_workers = v if k.include? qlen}
# get the histogram trend
# the histogram trend measures data in 2-minute increments. If a trend has
# continued for 10 consecutive minutes (5 entries), add or subtract 1 from
# the desired workers. If 20 minutes, add or subtract 2.
trend = histogram_trend()
if trend >= 0
desired_workers += trend / 5
elsif trend < 0
desired_workers -= trend.abs / 5
end
# Finally, we want to have 2 workers running at all times. Don't go lower than 2.
desired_workers = 2 if desired_workers < 2
# there's more here, it'll be shown in Sample #2
end
Sample #2: Spawning or Reaping Processor Nodes
In the first example, we went through steps to get a number of desired workers. This example is simpler, designed to grow or shrink the size of the processor swarm based on the number of desired workers we get.
loop do
# ... all the stuff to get our desired swarm size
# Get our current processing swarm for this stack (without any terminated ones)
instances = @ec2.instances.select { |instance|
instance.tags['App'] == @aws_config['tags']['App'] &&
instance.tags['Role'] == @aws_config['tags']['Role'] &&
instance.tags['Stack'] == DaemonKit.env.capitalize
}.keep_if {|instance| instance.status != :terminated}
# how many more or fewer instances do we require?
delta = desired_workers - instances.size
# take action!
if delta == 0
# nothing to do here, we've got the swarm we need, if not the swarm we deserve right now.
# I'm such a dork.
elsif delta < 0
# we need to kill some instances. Find the $delta oldest and terminate them.
kill_list = instances.sort_by(&:launch_time)[0..(delta.abs - 1)]
kill_list.each do|instance|
DaemonKit.logger.info "\t RIP: Instance #{instance.id} []"
instance.terminate
end
elsif delta > 0
# we need to launch some instances.
(0..(delta.abs - 1)).each do |n|
instance = @ec2.instances.create(
image_id: @aws_config['ec2']['image_id'],
instance_type: @aws_config['ec2']['instance_type'],
count: 1,
security_groups: @aws_config['ec2']['security_groups'],
key_pair: @ec2.key_pairs[@aws_config['ec2']['key_pair_id']],
instance_initiated_shutdown_behavior: 'terminate',
user_data: ''
)
instance.tag('Name', value: "thor-integration-qproc-#{instances.size + 1 + n}")
instance.tag('App', value: @aws_config['tags']['App'])
instance.tag('Role', value: @aws_config['tags']['Role'])
instance.tag('Stack', value: DaemonKit.env.capitalize)
DaemonKit.logger.info "\t Spawned: Instance #{instance.id}"
end
end
# sleep now. Wake in 2 minutes to check our status again.
sleep 120
end
Previous: Thor Ingestion Supervisor
Next: Thor Ingestion Processor