From d91d5e3bce87bf32f12f45d8ee17f441ce1ed9aa Mon Sep 17 00:00:00 2001 From: Martin Edenhofer Date: Tue, 7 May 2013 22:45:00 +0200 Subject: [PATCH] Improved scheduler. --- app/models/scheduler.rb | 41 ++++++++++++------- db/migrate/20130502000001_scheduler_update.rb | 38 +++++++++++++++++ lib/import/otrs.rb | 7 +--- script/scheduler.rb | 39 ++++++++++-------- 4 files changed, 89 insertions(+), 36 deletions(-) create mode 100644 db/migrate/20130502000001_scheduler_update.rb diff --git a/app/models/scheduler.rb b/app/models/scheduler.rb index cdb09180b..82b5d4775 100644 --- a/app/models/scheduler.rb +++ b/app/models/scheduler.rb @@ -1,36 +1,48 @@ class Scheduler < ApplicationModel - def self.run + def self.run( worker, worker_count ) Thread.abort_on_exception = true - # read/load jobs - jobs = Scheduler.where( :active => true ) - jobs.each {|job| - self.start_job( job ) - } + jobs_started = {} while true - puts 'Scheduler running...' - sleep 60 + puts "Scheduler running (worker #{worker} of #{worker_count})..." + + # read/load jobs and check if it is alredy started + jobs = Scheduler.where( 'active = ? AND prio = ?', true, worker ) + jobs.each {|job| + next if jobs_started[ job.id ] + jobs_started[ job.id ] = true + self.start_job( job, worker, worker_count ) + } + sleep 45 end end - def self.start_job(job) + def self.start_job( job, worker, worker_count ) puts "started job thread for '#{job.name}' (#{job.method})..." - sleep 1.5 + sleep 4 Thread.new { if job.period while true - self._start_job(job) + self._start_job( job, worker, worker_count ) job = Scheduler.where( :id => job.id ).first + + # exit is job got deleted break if !job + + # exit if job is not active anymore break if !job.active + + # exit if there is no loop period defined break if !job.period + + # wait until next run sleep job.period end else - self._start_job(job) + self._start_job( job, worker, worker_count ) end # raise "Exception from thread" job.pid = '' @@ -39,11 +51,12 @@ class Scheduler < ApplicationModel } end - def self._start_job(job) + def self._start_job( job, worker, worker_count ) puts "execute #{job.method}..." job.last_run = Time.now job.pid = Thread.current.object_id job.save - eval job.method + puts "execute #{job.method} (worker #{worker} of #{worker_count})..." + eval job.method() end end \ No newline at end of file diff --git a/db/migrate/20130502000001_scheduler_update.rb b/db/migrate/20130502000001_scheduler_update.rb new file mode 100644 index 000000000..573cb4215 --- /dev/null +++ b/db/migrate/20130502000001_scheduler_update.rb @@ -0,0 +1,38 @@ +require 'scheduler' +require 'setting' +class SchedulerUpdate < ActiveRecord::Migration + def up + add_column :schedulers, :prio, :integer, :null => true + Scheduler.create_or_update( + :name => 'Import OTRS diff load', + :method => 'Import::OTRS.diff_worker', + :period => 60 * 3, + :prio => 1, + :active => true, + :updated_by_id => 1, + :created_by_id => 1, + ) + Scheduler.create_or_update( + :name => 'Check Channels', + :method => 'Channel.fetch', + :period => 30, + :prio => 1, + :active => true, + :updated_by_id => 1, + :created_by_id => 1, + ) + Scheduler.create_or_update( + :name => 'Generate Session data', + :method => 'Session.jobs', + :period => 60, + :prio => 1, + :active => true, + :updated_by_id => 1, + :created_by_id => 1, + ) + end + + def down + remove_column :schedulers, :prio + end +end diff --git a/lib/import/otrs.rb b/lib/import/otrs.rb index 9dcba7682..1179ee0da 100644 --- a/lib/import/otrs.rb +++ b/lib/import/otrs.rb @@ -153,13 +153,10 @@ module Import::OTRS return end - def self.diff_loop + def self.diff_worker return if !Setting.get('import_mode') return if Setting.get('import_otrs_endpoint') == 'http://otrs_host/otrs' - while true - self.diff - sleep 30 - end + self.diff end def self.diff diff --git a/script/scheduler.rb b/script/scheduler.rb index 07bc7cb20..fa7538b17 100755 --- a/script/scheduler.rb +++ b/script/scheduler.rb @@ -6,26 +6,31 @@ require 'daemons' dir = File.expand_path(File.join(File.dirname(__FILE__), '..')) daemon_options = { - :multiple => false, + :multiple => true, :dir_mode => :normal, :dir => File.join(dir, 'tmp', 'pids'), :backtrace => true } -Daemons.run_proc('scheduler_runner', daemon_options) do - if ARGV.include?('--') - ARGV.slice! 0..ARGV.index('--') - else - ARGV.clear +worker_count = 2 + +(1..worker_count).each {|count| + name = 'scheduler_runner' + count.to_s + Daemons.run_proc(name, daemon_options) do + if ARGV.include?('--') + ARGV.slice! 0..ARGV.index('--') + else + ARGV.clear + end + + Dir.chdir dir + RAILS_ENV = ARGV.first || ENV['RAILS_ENV'] || 'development' + + $stdout.reopen( dir + "/log/" + name + "_out.log", "w") + $stderr.reopen( dir + "/log/" + name + "_err.log", "w") + require File.join(dir, "config", "environment") + require 'scheduler' + + Scheduler.run(count, worker_count) end - - Dir.chdir dir - RAILS_ENV = ARGV.first || ENV['RAILS_ENV'] || 'development' - - $stdout.reopen( dir + "/log/scheduler_out.log", "w") - $stderr.reopen( dir + "/log/scheduler_err.log", "w") - require File.join(dir, "config", "environment") - require 'scheduler' - - Scheduler.run -end +} \ No newline at end of file