From d206ef49ed740b0cc14ae7b3a94cce0719df8526 Mon Sep 17 00:00:00 2001 From: Martin Edenhofer Date: Thu, 21 May 2015 01:07:13 +0200 Subject: [PATCH] Improved performance, move to threads. --- app/models/scheduler.rb | 97 ++++++++++++++++++++++++++--------------- script/scheduler.rb | 27 +----------- 2 files changed, 65 insertions(+), 59 deletions(-) diff --git a/app/models/scheduler.rb b/app/models/scheduler.rb index 8070e571b..388daf993 100644 --- a/app/models/scheduler.rb +++ b/app/models/scheduler.rb @@ -2,13 +2,21 @@ # rubocop:disable Rails/Output class Scheduler < ApplicationModel - def self.run( runner, runner_count ) + # rubocop:disable Style/ClassVars + @@jobs_started = {} + # rubocop:enable Style/ClassVars + + # start threads + def self.threads Thread.abort_on_exception = true - jobs_started = {} + # start worker for background jobs + worker + + # start loop to execute scheduler jobs loop do - logger.info "Scheduler running (runner #{runner} of #{runner_count})..." + logger.info 'Scheduler running...' # reconnect in case db connection is lost begin @@ -18,24 +26,34 @@ class Scheduler < ApplicationModel end # read/load jobs and check if it is alredy started - jobs = Scheduler.where( 'active = ? AND prio = ?', true, runner ) + jobs = Scheduler.where( 'active = ?', true ) jobs.each {|job| - next if jobs_started[ job.id ] - jobs_started[ job.id ] = true - start_job( job, runner, runner_count ) + + # ignore job is still running + next if @@jobs_started[ job.id ] + + # check job.last_run + next if job.last_run && job.period && job.last_run > ( Time.zone.now - job.period ) + + # run job as own thread + @@jobs_started[ job.id ] = true + start_job( job ) } - sleep 90 + sleep 60 end end - def self.start_job( job, runner, runner_count ) - logger.info "started job thread for '#{job.name}' (#{job.method})..." + def self.start_job( job ) sleep 4 Thread.new { - if job.period + + logger.info "started job thread for '#{job.name}' (#{job.method})..." + + # start loop for periods under 5 minutes + if job.period && job.period <= 300 loop do - _start_job( job, runner, runner_count ) + _start_job( job ) job = Scheduler.lookup( id: job.id ) # exit is job got deleted @@ -51,26 +69,28 @@ class Scheduler < ApplicationModel sleep job.period end else - _start_job( job, runner, runner_count ) + _start_job( job ) end - # raise "Exception from thread" job.pid = '' job.save logger.info " ...stopped thread for '#{job.method}'" ActiveRecord::Base.connection.close + + # release thread lock + @@jobs_started[ job.id ] = false } end - def self._start_job( job, runner, runner_count, try_count = 0, try_run_time = Time.zone.now ) + def self._start_job( job, try_count = 0, try_run_time = Time.zone.now ) sleep 5 begin job.last_run = Time.zone.now - job.pid = Thread.current.object_id + job.pid = Thread.current.object_id job.save - logger.info "execute #{job.method} (runner #{runner} of #{runner_count}, try_count #{try_count})..." + logger.info "execute #{job.method} (try_count #{try_count})..." eval job.method() # rubocop:disable Lint/Eval rescue => e - logger.error "execute #{job.method} (runner #{runner} of #{runner_count}, try_count #{try_count}) exited with error #{ e.inspect }" + logger.error "execute #{job.method} (try_count #{try_count}) exited with error #{ e.inspect }" # reconnect in case db connection is lost begin @@ -90,33 +110,42 @@ class Scheduler < ApplicationModel # restart job again if try_run_max > try_count - _start_job( job, runner, runner_count, try_count, try_run_time) + _start_job( job, try_count, try_run_time) else - raise "STOP thread for #{job.method} (runner #{runner} of #{runner_count} after #{try_count} tries" + raise "STOP thread for #{job.method} after #{try_count} tries" end end end def self.worker - wait = 10 - logger.info "*** Starting worker #{Delayed::Job}" + wait = 8 - loop do - result = nil + Thread.new { + sleep wait - realtime = Benchmark.realtime do - result = Delayed::Worker.new.work_off + logger.info "*** Starting worker thread #{Delayed::Job}" + + loop do + result = nil + + realtime = Benchmark.realtime do + result = Delayed::Worker.new.work_off + end + + count = result.sum + + if count.zero? + sleep wait + logger.debug '*** worker thread loop' + else + format "*** #{count} jobs processed at %.4f j/s, %d failed ...\n", count / realtime, result.last + end end - count = result.sum + logger.info ' ...stopped worker thread' + ActiveRecord::Base.connection.close + } - if count.zero? - sleep(wait) - logger.info '*** worker loop' - else - format "*** #{count} jobs processed at %.4f j/s, %d failed ...\n", count / realtime, result.last - end - end end def self.check( name, time_warning = 10, time_critical = 20 ) diff --git a/script/scheduler.rb b/script/scheduler.rb index dc3644b43..a87f9bce2 100755 --- a/script/scheduler.rb +++ b/script/scheduler.rb @@ -13,30 +13,7 @@ daemon_options = { backtrace: true } -runner_count = 2 - -(1..runner_count).each {|count| - name = 'scheduler_runner' + count.to_s - Daemons.run_proc(name, daemon_options) do - if ARGV.include?('--') - ARGV.slice! 0..ARGV.index('--') - else - ARGV.clear - end - - Dir.chdir dir - RAILS_ENV = ARGV.first || ENV['RAILS_ENV'] || 'development' - - $stdout.reopen( dir + '/log/' + name + '_out.log', 'w') - $stderr.reopen( dir + '/log/' + name + '_err.log', 'w') - require File.join(dir, 'config', 'environment') - require 'scheduler' - - Scheduler.run(count, runner_count) - end -} - -name = 'scheduler_worker' +name = 'scheduler' Daemons.run_proc(name, daemon_options) do if ARGV.include?('--') ARGV.slice! 0..ARGV.index('--') @@ -52,5 +29,5 @@ Daemons.run_proc(name, daemon_options) do require File.join(dir, 'config', 'environment') require 'scheduler' - Scheduler.worker + Scheduler.threads end