Saturday 12 February 2011

Background File Processing Daemon in Ruby

I am writing this up because I scoured the net and could not find what I would have thought would have been a common thing to do.
We have an application that needs to watch several directories (on the server) and parse files that are placed there (via scp) by a third party. FWIW, these files represent sports betting prices.

Requirements

A background task that could be
* monitored
* run forever!
* process files instantly - parse them into ruby objects and store them into our database for use by the rails app

Our Rails app is written in 2.3.x (its been running for a while) and uses Bundler.

The Solution

After some poking around I decided to use a combination of the Daemons gem, EventMachine and FSSM.

The Daemon

This was inspired heavily by a posting on StackOverflow.

1) Install what you need
I tried to get this working successfully with Bundler, but it was a no go. So I needed to install daemons and eventmachine 'normally':
sudo gem install daemons eventmachine fssm

2) Setup the Daemon:

Setup

Usual stuff for a ruby file:
#!/usr/bin/env ruby
require 'rubygems'
require 'daemons'


We have multiple directories that need watching. So have an array:

watch = [
"/Users/smyp/development/wl/xtf/horse",
"/Users/smyp/development/wl/xtf/sport",
"/Users/smyp/development/wl/xtf/live",
"/Users/smyp/development/wl/xtf/alpha"
]

if ENV['RAILS_ENV'] == 'production'
watch = ["/home/mcdata/horse", "/home/mcdata/sport", "/home/mcdata/live", "/home/mcdata/alpha"]
end


We launch a separate daemon for each directory as we don't want a huge file in the horses directory to slow down processing in the live directory.

Daemon Config

With the daemons gem you can set things like what the process will be called. And where the pid file will reside, etc etc.

dir = File.expand_path(File.join(File.dirname(__FILE__), '..'))

daemon_options = {
:app_name => "xturf_file_monitor",
:multiple => false,
:dir_mode => :normal,
:dir => File.join(dir, 'tmp', 'pids'),
:backtrace => true
}


3) The Actual Daemon

Cue spooky music!

class PriceDaemon
attr_accessor :base_dir
def initialize(base_dir)
self.base_dir = base_dir
end

def dostuff
logger.info "About to start job for #{base_dir}"
EventMachine::run {
# Your code here
xhj = PriceFileJob.new(base_dir)
xhj.clear_backlog
FSSM.monitor(base_dir) do
create {|base, relative| xhj.clear_backlog}
update {|base, relative| xhj.clear_backlog}
end
}
end

def logger
@@logger ||= ActiveSupport::BufferedLogger.new("#{RAILS_ROOT}/log/price_file_monitor.log")
end
end


What this does is:
a) create a class that takes the directory to watch as an initialize parameter
b) do an EventMachine run that first clears out any backlog files then fire up an FSSM monitor. The FSSM monitor gives us events on create, update (and delete, but we don't care about that). As a safety measure I simply trawl through the entire directory every time a file is created or updated. This ensures that anything we missed will get caught.
We delete files ourselves after processing, so the directory should only have a few files in it anyway.

4) Spawn the Daemon

Bring on Mia Farrow!

watch.each_with_index do |base_dir, i|
Daemons.run_proc("price_daemon_#{i}", daemon_options) do
Dir.chdir dir
PriceDaemon.new(base_dir).dostuff
end
end


This will go through our array and file up a daemon for each directory. There are downsides to doing it this way - its not so easy to start and stop one (but then they shouldn't ever die, so if they do we just start and stop them all).

5) The File Processor

This of course will be specific to your operation, but, here's an outline of ours:

class PriceFileJob
attr_accessor :base_dir
def initialize(base_dir)
self.base_dir = base_dir
logger.info "watching #{base_dir}"
end

def logger
@@logger ||= Logger.new("#{RAILS_ROOT}/log/price_file_job_#{base_dir.split("/").last}.log", "daily")
end

def clear_backlog
files = Dir.new(base_dir).entries.sort_by{|c| File.stat(File.join(base_dir, c)).ctime}
files.each do |file|
process_file(file)
end
end

def process_file(file)
end

private
end


6. Capistrano

We use Capistrano to deploy, so I included some tasks in our deploy.rb

before "mc:release", "file_processors:stop"
after "mc:release", "file_processors:start"

namespace :file_processors do
desc "start processors"
task :start, :roles => :db do
run "cd #{current_path}; RAILS_ENV=#{fetch :rails_env} ruby ./script/price_file_monitor.rb start"
end

desc "get status of processors"
task :status, :roles => :db do
run "cd #{current_path}; RAILS_ENV=#{fetch :rails_env} ruby ./script/price_file_monitor.rb status"
end

desc "stop processors"
task :stop, :roles => :db do
run "cd #{current_path}; RAILS_ENV=#{fetch :rails_env} ruby ./script/price_file_monitor.rb stop"
end
end


That's it! I hope you found this interesting.

I should also write up how we monitor these processes... maybe next time!