Improved scheduler.

This commit is contained in:
Martin Edenhofer 2013-05-07 22:45:00 +02:00
parent 4487b988b1
commit d91d5e3bce
4 changed files with 89 additions and 36 deletions

View file

@ -1,36 +1,48 @@
class Scheduler < ApplicationModel class Scheduler < ApplicationModel
def self.run def self.run( worker, worker_count )
Thread.abort_on_exception = true Thread.abort_on_exception = true
# read/load jobs jobs_started = {}
jobs = Scheduler.where( :active => true )
jobs.each {|job|
self.start_job( job )
}
while true while true
puts 'Scheduler running...' puts "Scheduler running (worker #{worker} of #{worker_count})..."
sleep 60
# read/load jobs and check if it is alredy started
jobs = Scheduler.where( 'active = ? AND prio = ?', true, worker )
jobs.each {|job|
next if jobs_started[ job.id ]
jobs_started[ job.id ] = true
self.start_job( job, worker, worker_count )
}
sleep 45
end end
end end
def self.start_job(job) def self.start_job( job, worker, worker_count )
puts "started job thread for '#{job.name}' (#{job.method})..." puts "started job thread for '#{job.name}' (#{job.method})..."
sleep 1.5 sleep 4
Thread.new { Thread.new {
if job.period if job.period
while true while true
self._start_job(job) self._start_job( job, worker, worker_count )
job = Scheduler.where( :id => job.id ).first job = Scheduler.where( :id => job.id ).first
# exit is job got deleted
break if !job break if !job
# exit if job is not active anymore
break if !job.active break if !job.active
# exit if there is no loop period defined
break if !job.period break if !job.period
# wait until next run
sleep job.period sleep job.period
end end
else else
self._start_job(job) self._start_job( job, worker, worker_count )
end end
# raise "Exception from thread" # raise "Exception from thread"
job.pid = '' job.pid = ''
@ -39,11 +51,12 @@ class Scheduler < ApplicationModel
} }
end end
def self._start_job(job) def self._start_job( job, worker, worker_count )
puts "execute #{job.method}..." puts "execute #{job.method}..."
job.last_run = Time.now job.last_run = Time.now
job.pid = Thread.current.object_id job.pid = Thread.current.object_id
job.save job.save
eval job.method puts "execute #{job.method} (worker #{worker} of #{worker_count})..."
eval job.method()
end end
end end

View file

@ -0,0 +1,38 @@
require 'scheduler'
require 'setting'
class SchedulerUpdate < ActiveRecord::Migration
def up
add_column :schedulers, :prio, :integer, :null => true
Scheduler.create_or_update(
:name => 'Import OTRS diff load',
:method => 'Import::OTRS.diff_worker',
:period => 60 * 3,
:prio => 1,
:active => true,
:updated_by_id => 1,
:created_by_id => 1,
)
Scheduler.create_or_update(
:name => 'Check Channels',
:method => 'Channel.fetch',
:period => 30,
:prio => 1,
:active => true,
:updated_by_id => 1,
:created_by_id => 1,
)
Scheduler.create_or_update(
:name => 'Generate Session data',
:method => 'Session.jobs',
:period => 60,
:prio => 1,
:active => true,
:updated_by_id => 1,
:created_by_id => 1,
)
end
def down
remove_column :schedulers, :prio
end
end

View file

@ -153,13 +153,10 @@ module Import::OTRS
return return
end end
def self.diff_loop def self.diff_worker
return if !Setting.get('import_mode') return if !Setting.get('import_mode')
return if Setting.get('import_otrs_endpoint') == 'http://otrs_host/otrs' return if Setting.get('import_otrs_endpoint') == 'http://otrs_host/otrs'
while true
self.diff self.diff
sleep 30
end
end end
def self.diff def self.diff

View file

@ -6,13 +6,17 @@ require 'daemons'
dir = File.expand_path(File.join(File.dirname(__FILE__), '..')) dir = File.expand_path(File.join(File.dirname(__FILE__), '..'))
daemon_options = { daemon_options = {
:multiple => false, :multiple => true,
:dir_mode => :normal, :dir_mode => :normal,
:dir => File.join(dir, 'tmp', 'pids'), :dir => File.join(dir, 'tmp', 'pids'),
:backtrace => true :backtrace => true
} }
Daemons.run_proc('scheduler_runner', daemon_options) do worker_count = 2
(1..worker_count).each {|count|
name = 'scheduler_runner' + count.to_s
Daemons.run_proc(name, daemon_options) do
if ARGV.include?('--') if ARGV.include?('--')
ARGV.slice! 0..ARGV.index('--') ARGV.slice! 0..ARGV.index('--')
else else
@ -22,10 +26,11 @@ Daemons.run_proc('scheduler_runner', daemon_options) do
Dir.chdir dir Dir.chdir dir
RAILS_ENV = ARGV.first || ENV['RAILS_ENV'] || 'development' RAILS_ENV = ARGV.first || ENV['RAILS_ENV'] || 'development'
$stdout.reopen( dir + "/log/scheduler_out.log", "w") $stdout.reopen( dir + "/log/" + name + "_out.log", "w")
$stderr.reopen( dir + "/log/scheduler_err.log", "w") $stderr.reopen( dir + "/log/" + name + "_err.log", "w")
require File.join(dir, "config", "environment") require File.join(dir, "config", "environment")
require 'scheduler' require 'scheduler'
Scheduler.run Scheduler.run(count, worker_count)
end end
}