The VendorX Ingestion Processor
Each machine in the processor swarm, when it was spawned by the supervisor, had an init.d entry which would automatically launch the processor daemon, which was yet another Ruby application written with DaemonKit.
This example includes 5 files:
- thor-ingest-processor-daemon.rb, the working section of the ingestion processor
- thor-ingest-processor.rb, the (rather expansive) configuration section for the processor daemon
- elasticsearch.rb, a module for indexing documents in Elasticsearch
- round_geopoints.rb, a processor module which parsed documents, found any examples of geotagging, and rounded them down to 5 digits.
- datasift.rb, a module for parsing a document from datasift and allowing easy traversal of the complex JSON format.
Sample #1: Remote Configuration via API
The processor daemon, on startup, makes a call to the API for a configuration object. This let us manage all configuration in one place, and broadcast it as needed.
api_config = YAML.load_file(File.expand_path('../../config/thor_api.yml', __FILE__))
response = Faraday.get("#{api_config['host']}#{api_config['uri']}")
DaemonKit.arguments.options[:app_config] = JSON.parse(response.body)
The response to the configuration request takes the form of a JSON object which tells the ingestion processors what to do with any given document. This allows us to change configurations on the fly and have all future documents be processed according to the new rules.
// This is an example of the JSON response from the configuration call.
{
"provider": "datasift",
"notifier": ["flowdock"],
"queue": {
"type": "redis",
"db": 0,
"queue": "interactions",
"pull": 10,
"sleep": 15,
},
"storage": {
"type": "elasticsearch",
"index": "thor",
"types": {
"document": "tweet",
"author": "author"
}
},
"graph": {
"type": "neo4j"
},
"transform": ["IdToInt", "AmericanizeKeys", "RoundGeopoints", "ReplaceTopics"],
}
This configuration was used to load modules (this is a place where I really wished Ruby had interfaces) that provided the methods that would be used when transforming the data pulled off the Redis queue. I took these strings and constantized them into Ruby objects that were stored as class variables within the daemon itself.
# an excerpt detailing how I took the JSON strings from the configuration object
# and turned them into addressable Ruby objects.
# The formatter is actually a set of defined "hash path" methods which would
# let me traverse a deeply-nested JSON hash
@formatter = nil
unless app_config['provider'].nil?
begin
require "modules/#{app_config['provider']}"
@formatter = constantize("VendorX::Provider::#{app_config['provider'].capitalize}")
rescue LoadError => e
raise VendorX::Errors::NotImplementedError, "provider: #{app_config['provider']}"
end
end
# Processors were designed to change the values of the JSON hash, or to add or remove
# them to fit with our desired schema. We could pass an array of them and have them
# run in sequence.
@processors = []
unless app_config['transform'].nil?
app_config['transform'].each { |tx| @processors << constantize(tx) }
end
Sample #2: The Data Processing Pipeline
Once the processing daemon is configured, actually getting data from the Redis queue and working with it is a simple process.
documents = @queue.dequeue
documents.each do |doc|
# Parse the JSON document with the formatter class
interaction = @formatter.nil? ? doc : @formatter.new(doc)
# Transform the document according to configured rules
@processors.each { |pc| interaction = pc.process(interaction }
# Upload the document to our configured storage engine
unless @storage.nil?
@storage.store_document(interaction)
@storage.store_authors(interaction) unless DaemonKit.arguments.options[:app_config]['storage']['types']['author'].nil?
end
# If configured to do so, extract authors and the relationships between them
@graph.graph_connections(interaction) if @graph.present?
end
If you remember back to our conversation at the restaurant before, we talked about how I would implement an analytics system to track frequency of certain events within the datastream itself. This would be an excellent place to do that; using the paradigm established in this example, the analytics could be configured at the API level, transmitted to the stream processor as JSON, and executed in realtime as each document was processed. We also discussed an alternate means of accomplishing the goal - transforming each ingested document to contain a subset of its data and then indexing it in Elasticsearch or a similar data storage engine. You could also perform that step here, in the same manner, or even do both at once. The goal of this application was to do a lot of work to a small set of documents, but to do so parallelized in a swarm. In this way, you avoid a lot of the overhead that a heavy analytics requirement might otherwise incur.
Previous: Thor Ingestion Supervisor