diff --git a/app/models/scheduler.rb b/app/models/scheduler.rb index 71ccc9ba3..1b3e5f7d3 100644 --- a/app/models/scheduler.rb +++ b/app/models/scheduler.rb @@ -2,33 +2,33 @@ class Scheduler < ApplicationModel - def self.run( worker, worker_count ) + def self.run( runner, runner_count ) Thread.abort_on_exception = true jobs_started = {} while true - puts "Scheduler running (worker #{worker} of #{worker_count})..." + puts "Scheduler running (runner #{runner} of #{runner_count})..." # read/load jobs and check if it is alredy started - jobs = Scheduler.where( 'active = ? AND prio = ?', true, worker ) + jobs = Scheduler.where( 'active = ? AND prio = ?', true, runner ) jobs.each {|job| next if jobs_started[ job.id ] jobs_started[ job.id ] = true - self.start_job( job, worker, worker_count ) + self.start_job( job, runner, runner_count ) } sleep 45 end end - def self.start_job( job, worker, worker_count ) + def self.start_job( job, runner, runner_count ) puts "started job thread for '#{job.name}' (#{job.method})..." sleep 4 Thread.new { if job.period while true - self._start_job( job, worker, worker_count ) + self._start_job( job, runner, runner_count ) job = Scheduler.where( :id => job.id ).first # exit is job got deleted @@ -44,7 +44,7 @@ class Scheduler < ApplicationModel sleep job.period end else - self._start_job( job, worker, worker_count ) + self._start_job( job, runner, runner_count ) end # raise "Exception from thread" job.pid = '' @@ -53,14 +53,38 @@ class Scheduler < ApplicationModel } end - def self._start_job( job, worker, worker_count ) - puts "execute #{job.method} (worker #{worker} of #{worker_count})..." + def self._start_job( job, runner, runner_count ) + puts "execute #{job.method} (runner #{runner} of #{runner_count})..." job.last_run = Time.now job.pid = Thread.current.object_id job.save eval job.method() end + def self.worker + wait = 10 + puts "*** Starting worker #{Delayed::Job.to_s}" + + loop do + result = nil + + realtime = Benchmark.realtime do + result = Delayed::Worker.new.work_off + end + + count = result.sum + + break if $exit + + if count.zero? + sleep(wait) + puts "*** worker loop" + else + printf "*** #{count} jobs processed at %.4f j/s, %d failed ...\n" % [count / realtime, result.last] + end + end + end + def self.check( name, time_warning = 10, time_critical = 20 ) time_warning_time = Time.now - time_warning.minutes time_critical_time = Time.now - time_critical.minutes