2016-10-19 03:11:36 +00:00
|
|
|
# Copyright (C) 2012-2016 Zammad Foundation, http://zammad-foundation.org/
|
2016-04-26 09:30:46 +00:00
|
|
|
|
2013-03-05 08:38:58 +00:00
|
|
|
class Scheduler < ApplicationModel
|
|
|
|
|
2015-05-20 23:07:13 +00:00
|
|
|
# rubocop:disable Style/ClassVars
|
|
|
|
@@jobs_started = {}
|
|
|
|
# rubocop:enable Style/ClassVars
|
|
|
|
|
|
|
|
# start threads
|
|
|
|
def self.threads
|
2013-03-05 08:38:58 +00:00
|
|
|
|
|
|
|
Thread.abort_on_exception = true
|
2013-06-12 15:59:58 +00:00
|
|
|
|
2017-05-11 15:18:03 +00:00
|
|
|
# cleanup old background jobs
|
|
|
|
cleanup
|
|
|
|
|
2015-05-20 23:07:13 +00:00
|
|
|
# start worker for background jobs
|
|
|
|
worker
|
|
|
|
|
|
|
|
# start loop to execute scheduler jobs
|
2015-05-05 14:10:06 +00:00
|
|
|
loop do
|
2015-05-20 23:07:13 +00:00
|
|
|
logger.info 'Scheduler running...'
|
2013-05-07 20:45:00 +00:00
|
|
|
|
2014-03-27 13:07:57 +00:00
|
|
|
# reconnect in case db connection is lost
|
|
|
|
begin
|
|
|
|
ActiveRecord::Base.connection.reconnect!
|
|
|
|
rescue => e
|
2015-07-03 15:18:01 +00:00
|
|
|
logger.error "Can't reconnect to database #{e.inspect}"
|
2014-03-27 13:07:57 +00:00
|
|
|
end
|
|
|
|
|
2013-05-07 20:45:00 +00:00
|
|
|
# read/load jobs and check if it is alredy started
|
2016-01-05 08:48:26 +00:00
|
|
|
jobs = Scheduler.where('active = ?', true).order('prio ASC')
|
2016-06-30 20:04:48 +00:00
|
|
|
jobs.each { |job|
|
2015-05-20 23:07:13 +00:00
|
|
|
|
|
|
|
# ignore job is still running
|
|
|
|
next if @@jobs_started[ job.id ]
|
|
|
|
|
|
|
|
# check job.last_run
|
2016-01-05 08:48:26 +00:00
|
|
|
next if job.last_run && job.period && job.last_run > (Time.zone.now - job.period)
|
2015-05-20 23:07:13 +00:00
|
|
|
|
|
|
|
# run job as own thread
|
|
|
|
@@jobs_started[ job.id ] = true
|
2015-06-30 22:26:02 +00:00
|
|
|
start_job(job)
|
|
|
|
sleep 10
|
2013-05-07 20:45:00 +00:00
|
|
|
}
|
2015-05-20 23:07:13 +00:00
|
|
|
sleep 60
|
2013-03-05 08:38:58 +00:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2017-05-11 15:18:03 +00:00
|
|
|
# Checks all delayed jobs that are locked and cleans them up.
|
|
|
|
# Should only get called when the Scheduler gets started.
|
|
|
|
#
|
|
|
|
# @see Scheduler#cleanup_delayed
|
|
|
|
#
|
|
|
|
# @param [Boolean] force forces the cleanup if not called in Scheduler starting context.
|
|
|
|
#
|
|
|
|
# @example
|
|
|
|
# Scheduler.cleanup
|
|
|
|
#
|
|
|
|
# @raise [RuntimeError] If called without force and not when Scheduler gets started.
|
|
|
|
#
|
|
|
|
# return [nil]
|
|
|
|
def self.cleanup(force: false)
|
|
|
|
|
|
|
|
if !force && caller_locations.first.label != 'threads'
|
|
|
|
raise 'This method should only get called when Scheduler.threads are initialized. Use `force: true` to start anyway.'
|
|
|
|
end
|
|
|
|
|
|
|
|
Delayed::Job.all.each do |job|
|
|
|
|
cleanup_delayed(job)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
# Checks if the given job can be rescheduled or destroys it. Logs the action as warn.
|
|
|
|
# Works only for locked jobs. Jobs that are not locked are ignored and
|
|
|
|
# should get destroyed directly.
|
|
|
|
# Checks the delayed job object for a method called .reschedule?. The memthod is called
|
|
|
|
# with the delayed job as a parameter. The result value is expected as a Boolean. If the
|
|
|
|
# result is true the lock gets removed and the delayed job gets rescheduled. If the return
|
|
|
|
# value is false it will get destroyed which is the default behaviour.
|
|
|
|
#
|
|
|
|
# @param [Delayed::Job] job the job that should get checked for destroying/rescheduling.
|
|
|
|
#
|
|
|
|
# @example
|
|
|
|
# Scheduler.cleanup_delayed(job)
|
|
|
|
#
|
|
|
|
# return [nil]
|
|
|
|
def self.cleanup_delayed(job)
|
|
|
|
return if job.locked_at.blank?
|
|
|
|
|
|
|
|
job_name = job.name
|
|
|
|
payload_object = job.payload_object
|
|
|
|
reschedule = false
|
|
|
|
if payload_object.present?
|
|
|
|
if payload_object.respond_to?(:object)
|
|
|
|
object = payload_object.object
|
|
|
|
|
|
|
|
if object.respond_to?(:id)
|
|
|
|
job_name += " (id: #{object.id})"
|
|
|
|
end
|
|
|
|
|
|
|
|
if object.respond_to?(:reschedule?) && object.reschedule?(job)
|
|
|
|
reschedule = true
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
if payload_object.respond_to?(:args)
|
|
|
|
job_name += " - ARGS: #{payload_object.args.inspect}"
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
if reschedule
|
|
|
|
action = 'Rescheduling'
|
|
|
|
job.unlock
|
|
|
|
job.save
|
|
|
|
else
|
|
|
|
action = 'Destroyed'
|
|
|
|
job.destroy
|
|
|
|
end
|
|
|
|
|
|
|
|
Rails.logger.warn "#{action} locked delayed job: #{job_name}"
|
|
|
|
end
|
|
|
|
|
2016-01-05 08:48:26 +00:00
|
|
|
def self.start_job(job)
|
2013-03-05 08:38:58 +00:00
|
|
|
|
|
|
|
Thread.new {
|
2016-08-20 19:29:22 +00:00
|
|
|
ApplicationHandleInfo.current = 'scheduler'
|
2015-05-20 23:07:13 +00:00
|
|
|
|
2015-05-21 06:37:21 +00:00
|
|
|
logger.info "Started job thread for '#{job.name}' (#{job.method})..."
|
2015-05-20 23:07:13 +00:00
|
|
|
|
|
|
|
# start loop for periods under 5 minutes
|
|
|
|
if job.period && job.period <= 300
|
2015-05-05 14:10:06 +00:00
|
|
|
loop do
|
2015-06-30 22:26:02 +00:00
|
|
|
_start_job(job)
|
|
|
|
job = Scheduler.lookup(id: job.id)
|
2013-05-07 20:45:00 +00:00
|
|
|
|
|
|
|
# exit is job got deleted
|
2013-03-05 08:38:58 +00:00
|
|
|
break if !job
|
2013-05-07 20:45:00 +00:00
|
|
|
|
|
|
|
# exit if job is not active anymore
|
2013-03-05 08:38:58 +00:00
|
|
|
break if !job.active
|
2013-05-07 20:45:00 +00:00
|
|
|
|
|
|
|
# exit if there is no loop period defined
|
2013-03-05 08:38:58 +00:00
|
|
|
break if !job.period
|
2013-05-07 20:45:00 +00:00
|
|
|
|
|
|
|
# wait until next run
|
2013-03-05 08:38:58 +00:00
|
|
|
sleep job.period
|
|
|
|
end
|
|
|
|
else
|
2015-06-30 22:26:02 +00:00
|
|
|
_start_job(job)
|
2013-03-05 08:38:58 +00:00
|
|
|
end
|
|
|
|
job.pid = ''
|
|
|
|
job.save
|
2014-10-26 12:17:00 +00:00
|
|
|
logger.info " ...stopped thread for '#{job.method}'"
|
2015-02-16 12:41:46 +00:00
|
|
|
ActiveRecord::Base.connection.close
|
2015-05-20 23:07:13 +00:00
|
|
|
|
|
|
|
# release thread lock
|
|
|
|
@@jobs_started[ job.id ] = false
|
2013-03-05 08:38:58 +00:00
|
|
|
}
|
|
|
|
end
|
|
|
|
|
2016-01-05 08:48:26 +00:00
|
|
|
def self._start_job(job, try_count = 0, try_run_time = Time.zone.now)
|
2015-07-03 15:30:05 +00:00
|
|
|
job.last_run = Time.zone.now
|
|
|
|
job.pid = Thread.current.object_id
|
|
|
|
job.save
|
|
|
|
logger.info "execute #{job.method} (try_count #{try_count})..."
|
|
|
|
eval job.method() # rubocop:disable Lint/Eval
|
|
|
|
rescue => e
|
|
|
|
logger.error "execute #{job.method} (try_count #{try_count}) exited with error #{e.inspect}"
|
|
|
|
|
|
|
|
# reconnect in case db connection is lost
|
2014-03-27 13:07:57 +00:00
|
|
|
begin
|
2015-07-03 15:30:05 +00:00
|
|
|
ActiveRecord::Base.connection.reconnect!
|
2014-03-27 13:07:57 +00:00
|
|
|
rescue => e
|
2015-07-03 15:30:05 +00:00
|
|
|
logger.error "Can't reconnect to database #{e.inspect}"
|
|
|
|
end
|
2014-03-27 13:07:57 +00:00
|
|
|
|
2015-07-03 15:30:05 +00:00
|
|
|
try_run_max = 10
|
|
|
|
try_count += 1
|
2014-03-27 13:07:57 +00:00
|
|
|
|
2015-07-03 15:30:05 +00:00
|
|
|
# reset error counter if to old
|
2016-01-05 08:48:26 +00:00
|
|
|
if try_run_time + (60 * 5) < Time.zone.now
|
2015-07-03 15:30:05 +00:00
|
|
|
try_count = 0
|
|
|
|
end
|
|
|
|
try_run_time = Time.zone.now
|
2014-03-27 13:07:57 +00:00
|
|
|
|
2015-07-03 15:30:05 +00:00
|
|
|
# restart job again
|
|
|
|
if try_run_max > try_count
|
2016-01-05 08:48:26 +00:00
|
|
|
_start_job(job, try_count, try_run_time)
|
2015-07-03 15:30:05 +00:00
|
|
|
else
|
|
|
|
raise "STOP thread for #{job.method} after #{try_count} tries"
|
2014-03-27 13:07:57 +00:00
|
|
|
end
|
2013-03-05 08:38:58 +00:00
|
|
|
end
|
2013-08-04 21:56:02 +00:00
|
|
|
|
2016-04-26 09:30:46 +00:00
|
|
|
def self.worker(foreground = false)
|
|
|
|
|
|
|
|
# used for tests
|
|
|
|
if foreground
|
2016-08-20 19:29:22 +00:00
|
|
|
original_interface_handle = ApplicationHandleInfo.current
|
|
|
|
ApplicationHandleInfo.current = 'scheduler'
|
|
|
|
|
2016-04-26 09:30:46 +00:00
|
|
|
original_user_id = UserInfo.current_user_id
|
|
|
|
UserInfo.current_user_id = nil
|
2016-08-20 19:29:22 +00:00
|
|
|
|
2016-04-26 09:30:46 +00:00
|
|
|
loop do
|
|
|
|
success, failure = Delayed::Worker.new.work_off
|
2016-07-26 22:02:28 +00:00
|
|
|
if failure.nonzero?
|
2016-04-26 09:30:46 +00:00
|
|
|
raise "ERROR: #{failure} failed background jobs: #{Delayed::Job.where('last_error IS NOT NULL').inspect}"
|
|
|
|
end
|
2016-07-26 22:02:28 +00:00
|
|
|
break if success.zero?
|
2016-04-26 09:30:46 +00:00
|
|
|
end
|
|
|
|
UserInfo.current_user_id = original_user_id
|
2016-08-20 19:29:22 +00:00
|
|
|
ApplicationHandleInfo.current = original_interface_handle
|
2016-04-26 09:30:46 +00:00
|
|
|
return
|
|
|
|
end
|
2014-02-03 18:26:41 +00:00
|
|
|
|
2016-04-26 09:30:46 +00:00
|
|
|
# used for production
|
|
|
|
wait = 8
|
2015-05-20 23:07:13 +00:00
|
|
|
Thread.new {
|
|
|
|
sleep wait
|
2014-02-03 18:26:41 +00:00
|
|
|
|
2015-05-21 06:37:21 +00:00
|
|
|
logger.info "Starting worker thread #{Delayed::Job}"
|
2014-02-03 18:26:41 +00:00
|
|
|
|
2015-05-20 23:07:13 +00:00
|
|
|
loop do
|
2016-08-20 19:29:22 +00:00
|
|
|
ApplicationHandleInfo.current = 'scheduler'
|
2015-05-20 23:07:13 +00:00
|
|
|
result = nil
|
2014-02-03 18:26:41 +00:00
|
|
|
|
2015-05-20 23:07:13 +00:00
|
|
|
realtime = Benchmark.realtime do
|
2016-04-26 09:30:46 +00:00
|
|
|
logger.debug "*** worker thread, #{Delayed::Job.all.count} in queue"
|
2015-05-20 23:07:13 +00:00
|
|
|
result = Delayed::Worker.new.work_off
|
|
|
|
end
|
|
|
|
|
|
|
|
count = result.sum
|
|
|
|
|
|
|
|
if count.zero?
|
|
|
|
sleep wait
|
|
|
|
logger.debug '*** worker thread loop'
|
|
|
|
else
|
|
|
|
format "*** #{count} jobs processed at %.4f j/s, %d failed ...\n", count / realtime, result.last
|
|
|
|
end
|
2014-02-03 18:26:41 +00:00
|
|
|
end
|
2015-05-20 23:07:13 +00:00
|
|
|
|
|
|
|
logger.info ' ...stopped worker thread'
|
|
|
|
ActiveRecord::Base.connection.close
|
|
|
|
}
|
|
|
|
|
2014-02-03 18:26:41 +00:00
|
|
|
end
|
|
|
|
|
2013-06-12 15:59:58 +00:00
|
|
|
end
|