We have an application that needs to watch several directories (on the server) and parse files that are placed there (via scp) by a third party. FWIW, these files represent sports betting prices.
Requirements
A background task that could be
* monitored
* run forever!
* process files instantly - parse them into ruby objects and store them into our database for use by the rails app
Our Rails app is written in 2.3.x (its been running for a while) and uses Bundler.
The Solution
After some poking around I decided to use a combination of the Daemons gem, EventMachine and FSSM.
The Daemon
This was inspired heavily by a posting on StackOverflow.
1) Install what you need
I tried to get this working successfully with Bundler, but it was a no go. So I needed to install daemons and eventmachine 'normally':
sudo gem install daemons eventmachine fssm
2) Setup the Daemon:
Setup
Usual stuff for a ruby file:
#!/usr/bin/env ruby
require 'rubygems'
require 'daemons'
We have multiple directories that need watching. So have an array:
watch = [
  "/Users/smyp/development/wl/xtf/horse", 
  "/Users/smyp/development/wl/xtf/sport",
  "/Users/smyp/development/wl/xtf/live",
  "/Users/smyp/development/wl/xtf/alpha"
]
if ENV['RAILS_ENV'] == 'production'
  watch = ["/home/mcdata/horse", "/home/mcdata/sport", "/home/mcdata/live", "/home/mcdata/alpha"]
end
We launch a separate daemon for each directory as we don't want a huge file in the horses directory to slow down processing in the live directory.
Daemon Config
With the daemons gem you can set things like what the process will be called. And where the pid file will reside, etc etc.
dir = File.expand_path(File.join(File.dirname(__FILE__), '..'))
daemon_options = {
  :app_name   => "xturf_file_monitor",
  :multiple   => false,
  :dir_mode   => :normal,
  :dir        => File.join(dir, 'tmp', 'pids'),
  :backtrace  => true
}
3) The Actual Daemon
Cue spooky music!
class PriceDaemon
  attr_accessor :base_dir
  def initialize(base_dir)
    self.base_dir = base_dir
  end
  def dostuff
    logger.info "About to start job for #{base_dir}"
    EventMachine::run {
      # Your code here
      xhj = PriceFileJob.new(base_dir)
      xhj.clear_backlog
      FSSM.monitor(base_dir) do
        create {|base, relative| xhj.clear_backlog}
        update {|base, relative| xhj.clear_backlog}
      end
    }
  end
  def logger
    @@logger ||= ActiveSupport::BufferedLogger.new("#{RAILS_ROOT}/log/price_file_monitor.log")
  end
end
What this does is:
a) create a class that takes the directory to watch as an initialize parameter
b) do an EventMachine run that first clears out any backlog files then fire up an FSSM monitor. The FSSM monitor gives us events on create, update (and delete, but we don't care about that). As a safety measure I simply trawl through the entire directory every time a file is created or updated. This ensures that anything we missed will get caught.
We delete files ourselves after processing, so the directory should only have a few files in it anyway.
4) Spawn the Daemon
Bring on Mia Farrow!
watch.each_with_index do |base_dir, i|
  Daemons.run_proc("price_daemon_#{i}", daemon_options) do
    Dir.chdir dir
    PriceDaemon.new(base_dir).dostuff
  end
end
This will go through our array and file up a daemon for each directory. There are downsides to doing it this way - its not so easy to start and stop one (but then they shouldn't ever die, so if they do we just start and stop them all).
5) The File Processor
This of course will be specific to your operation, but, here's an outline of ours:
class PriceFileJob
  attr_accessor :base_dir
  def initialize(base_dir)
    self.base_dir = base_dir
    logger.info "watching #{base_dir}"
  end
  def logger
    @@logger ||= Logger.new("#{RAILS_ROOT}/log/price_file_job_#{base_dir.split("/").last}.log", "daily")
  end
  def clear_backlog
    files = Dir.new(base_dir).entries.sort_by{|c| File.stat(File.join(base_dir, c)).ctime}
    files.each do |file|
      process_file(file)
    end
  end
  
  def process_file(file)
  end
  
  private
end
6. Capistrano
We use Capistrano to deploy, so I included some tasks in our deploy.rb
before "mc:release", "file_processors:stop"
after "mc:release", "file_processors:start"
namespace :file_processors do
  desc "start processors"
  task :start, :roles => :db do
    run "cd #{current_path}; RAILS_ENV=#{fetch :rails_env} ruby ./script/price_file_monitor.rb start"
  end
  desc "get status of processors"
  task :status, :roles => :db do
    run "cd #{current_path}; RAILS_ENV=#{fetch :rails_env} ruby ./script/price_file_monitor.rb status"
  end
  desc "stop processors"
  task :stop, :roles => :db do
    run "cd #{current_path}; RAILS_ENV=#{fetch :rails_env} ruby ./script/price_file_monitor.rb stop"
  end
end
That's it! I hope you found this interesting.
I should also write up how we monitor these processes... maybe next time!
 
No comments:
Post a Comment