Improved performance, move to threads.
This commit is contained in:
parent
13da832a64
commit
d206ef49ed
2 changed files with 65 additions and 59 deletions
|
@ -2,13 +2,21 @@
|
|||
# rubocop:disable Rails/Output
|
||||
class Scheduler < ApplicationModel
|
||||
|
||||
def self.run( runner, runner_count )
|
||||
# rubocop:disable Style/ClassVars
|
||||
@@jobs_started = {}
|
||||
# rubocop:enable Style/ClassVars
|
||||
|
||||
# start threads
|
||||
def self.threads
|
||||
|
||||
Thread.abort_on_exception = true
|
||||
|
||||
jobs_started = {}
|
||||
# start worker for background jobs
|
||||
worker
|
||||
|
||||
# start loop to execute scheduler jobs
|
||||
loop do
|
||||
logger.info "Scheduler running (runner #{runner} of #{runner_count})..."
|
||||
logger.info 'Scheduler running...'
|
||||
|
||||
# reconnect in case db connection is lost
|
||||
begin
|
||||
|
@ -18,24 +26,34 @@ class Scheduler < ApplicationModel
|
|||
end
|
||||
|
||||
# read/load jobs and check if it is alredy started
|
||||
jobs = Scheduler.where( 'active = ? AND prio = ?', true, runner )
|
||||
jobs = Scheduler.where( 'active = ?', true )
|
||||
jobs.each {|job|
|
||||
next if jobs_started[ job.id ]
|
||||
jobs_started[ job.id ] = true
|
||||
start_job( job, runner, runner_count )
|
||||
|
||||
# ignore job is still running
|
||||
next if @@jobs_started[ job.id ]
|
||||
|
||||
# check job.last_run
|
||||
next if job.last_run && job.period && job.last_run > ( Time.zone.now - job.period )
|
||||
|
||||
# run job as own thread
|
||||
@@jobs_started[ job.id ] = true
|
||||
start_job( job )
|
||||
}
|
||||
sleep 90
|
||||
sleep 60
|
||||
end
|
||||
end
|
||||
|
||||
def self.start_job( job, runner, runner_count )
|
||||
logger.info "started job thread for '#{job.name}' (#{job.method})..."
|
||||
def self.start_job( job )
|
||||
sleep 4
|
||||
|
||||
Thread.new {
|
||||
if job.period
|
||||
|
||||
logger.info "started job thread for '#{job.name}' (#{job.method})..."
|
||||
|
||||
# start loop for periods under 5 minutes
|
||||
if job.period && job.period <= 300
|
||||
loop do
|
||||
_start_job( job, runner, runner_count )
|
||||
_start_job( job )
|
||||
job = Scheduler.lookup( id: job.id )
|
||||
|
||||
# exit is job got deleted
|
||||
|
@ -51,26 +69,28 @@ class Scheduler < ApplicationModel
|
|||
sleep job.period
|
||||
end
|
||||
else
|
||||
_start_job( job, runner, runner_count )
|
||||
_start_job( job )
|
||||
end
|
||||
# raise "Exception from thread"
|
||||
job.pid = ''
|
||||
job.save
|
||||
logger.info " ...stopped thread for '#{job.method}'"
|
||||
ActiveRecord::Base.connection.close
|
||||
|
||||
# release thread lock
|
||||
@@jobs_started[ job.id ] = false
|
||||
}
|
||||
end
|
||||
|
||||
def self._start_job( job, runner, runner_count, try_count = 0, try_run_time = Time.zone.now )
|
||||
def self._start_job( job, try_count = 0, try_run_time = Time.zone.now )
|
||||
sleep 5
|
||||
begin
|
||||
job.last_run = Time.zone.now
|
||||
job.pid = Thread.current.object_id
|
||||
job.pid = Thread.current.object_id
|
||||
job.save
|
||||
logger.info "execute #{job.method} (runner #{runner} of #{runner_count}, try_count #{try_count})..."
|
||||
logger.info "execute #{job.method} (try_count #{try_count})..."
|
||||
eval job.method() # rubocop:disable Lint/Eval
|
||||
rescue => e
|
||||
logger.error "execute #{job.method} (runner #{runner} of #{runner_count}, try_count #{try_count}) exited with error #{ e.inspect }"
|
||||
logger.error "execute #{job.method} (try_count #{try_count}) exited with error #{ e.inspect }"
|
||||
|
||||
# reconnect in case db connection is lost
|
||||
begin
|
||||
|
@ -90,33 +110,42 @@ class Scheduler < ApplicationModel
|
|||
|
||||
# restart job again
|
||||
if try_run_max > try_count
|
||||
_start_job( job, runner, runner_count, try_count, try_run_time)
|
||||
_start_job( job, try_count, try_run_time)
|
||||
else
|
||||
raise "STOP thread for #{job.method} (runner #{runner} of #{runner_count} after #{try_count} tries"
|
||||
raise "STOP thread for #{job.method} after #{try_count} tries"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.worker
|
||||
wait = 10
|
||||
logger.info "*** Starting worker #{Delayed::Job}"
|
||||
wait = 8
|
||||
|
||||
loop do
|
||||
result = nil
|
||||
Thread.new {
|
||||
sleep wait
|
||||
|
||||
realtime = Benchmark.realtime do
|
||||
result = Delayed::Worker.new.work_off
|
||||
logger.info "*** Starting worker thread #{Delayed::Job}"
|
||||
|
||||
loop do
|
||||
result = nil
|
||||
|
||||
realtime = Benchmark.realtime do
|
||||
result = Delayed::Worker.new.work_off
|
||||
end
|
||||
|
||||
count = result.sum
|
||||
|
||||
if count.zero?
|
||||
sleep wait
|
||||
logger.debug '*** worker thread loop'
|
||||
else
|
||||
format "*** #{count} jobs processed at %.4f j/s, %d failed ...\n", count / realtime, result.last
|
||||
end
|
||||
end
|
||||
|
||||
count = result.sum
|
||||
logger.info ' ...stopped worker thread'
|
||||
ActiveRecord::Base.connection.close
|
||||
}
|
||||
|
||||
if count.zero?
|
||||
sleep(wait)
|
||||
logger.info '*** worker loop'
|
||||
else
|
||||
format "*** #{count} jobs processed at %.4f j/s, %d failed ...\n", count / realtime, result.last
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.check( name, time_warning = 10, time_critical = 20 )
|
||||
|
|
|
@ -13,30 +13,7 @@ daemon_options = {
|
|||
backtrace: true
|
||||
}
|
||||
|
||||
runner_count = 2
|
||||
|
||||
(1..runner_count).each {|count|
|
||||
name = 'scheduler_runner' + count.to_s
|
||||
Daemons.run_proc(name, daemon_options) do
|
||||
if ARGV.include?('--')
|
||||
ARGV.slice! 0..ARGV.index('--')
|
||||
else
|
||||
ARGV.clear
|
||||
end
|
||||
|
||||
Dir.chdir dir
|
||||
RAILS_ENV = ARGV.first || ENV['RAILS_ENV'] || 'development'
|
||||
|
||||
$stdout.reopen( dir + '/log/' + name + '_out.log', 'w')
|
||||
$stderr.reopen( dir + '/log/' + name + '_err.log', 'w')
|
||||
require File.join(dir, 'config', 'environment')
|
||||
require 'scheduler'
|
||||
|
||||
Scheduler.run(count, runner_count)
|
||||
end
|
||||
}
|
||||
|
||||
name = 'scheduler_worker'
|
||||
name = 'scheduler'
|
||||
Daemons.run_proc(name, daemon_options) do
|
||||
if ARGV.include?('--')
|
||||
ARGV.slice! 0..ARGV.index('--')
|
||||
|
@ -52,5 +29,5 @@ Daemons.run_proc(name, daemon_options) do
|
|||
require File.join(dir, 'config', 'environment')
|
||||
require 'scheduler'
|
||||
|
||||
Scheduler.worker
|
||||
Scheduler.threads
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue