2022-01-01 13:38:12 +00:00
# Copyright (C) 2012-2022 Zammad Foundation, https://zammad-foundation.org/
2016-04-26 09:30:46 +00:00
2013-03-05 08:38:58 +00:00
class Scheduler < ApplicationModel
2021-04-12 09:49:26 +00:00
include ChecksHtmlSanitized
2018-04-26 08:22:09 +00:00
extend :: Mixin :: StartFinishLogger
2013-03-05 08:38:58 +00:00
2021-04-12 09:49:26 +00:00
sanitized_html :note
2015-05-20 23:07:13 +00:00
# rubocop:disable Style/ClassVars
@@jobs_started = { }
# rubocop:enable Style/ClassVars
# start threads
def self . threads
2013-03-05 08:38:58 +00:00
Thread . abort_on_exception = true
2013-06-12 15:59:58 +00:00
2017-05-13 13:50:28 +00:00
# reconnect in case db connection is lost
begin
ActiveRecord :: Base . connection . reconnect!
rescue = > e
logger . error " Can't reconnect to database #{ e . inspect } "
end
2017-05-11 15:18:03 +00:00
# cleanup old background jobs
cleanup
2015-05-20 23:07:13 +00:00
# start worker for background jobs
worker
# start loop to execute scheduler jobs
2015-05-05 14:10:06 +00:00
loop do
2015-05-20 23:07:13 +00:00
logger . info 'Scheduler running...'
2013-05-07 20:45:00 +00:00
2014-03-27 13:07:57 +00:00
# reconnect in case db connection is lost
begin
ActiveRecord :: Base . connection . reconnect!
rescue = > e
2015-07-03 15:18:01 +00:00
logger . error " Can't reconnect to database #{ e . inspect } "
2014-03-27 13:07:57 +00:00
end
2019-07-31 08:23:48 +00:00
# read/load jobs and check if each has already been started
2020-12-10 09:13:57 +00:00
jobs = Scheduler . where ( active : true ) . order ( prio : :asc )
2017-10-01 12:25:52 +00:00
jobs . each do | job |
2015-05-20 23:07:13 +00:00
# ignore job is still running
2018-01-09 16:46:25 +00:00
next if skip_job? ( job )
2015-05-20 23:07:13 +00:00
# check job.last_run
2016-01-05 08:48:26 +00:00
next if job . last_run && job . period && job . last_run > ( Time . zone . now - job . period )
2015-05-20 23:07:13 +00:00
# run job as own thread
2018-01-09 16:46:25 +00:00
@@jobs_started [ job . id ] = start_job ( job )
2015-06-30 22:26:02 +00:00
sleep 10
2017-10-01 12:25:52 +00:00
end
2015-05-20 23:07:13 +00:00
sleep 60
2013-03-05 08:38:58 +00:00
end
end
2018-01-09 16:46:25 +00:00
# Checks if a Scheduler Job should get started or not.
# The decision is based on if there is a running thread or not.
2019-07-31 08:23:48 +00:00
# Invalid threads get cancelled and new threads can get started.
2018-01-09 16:46:25 +00:00
#
# @param [Scheduler] job The job that should get checked for running threads.
#
# @example
# Scheduler.skip_job(job)
#
# return [Boolean]
def self . skip_job? ( job )
thread = @@jobs_started [ job . id ]
return false if thread . blank?
# check for validity of thread instance
if ! thread . respond_to? ( :status )
logger . error " Invalid thread stored for job ' #{ job . name } ' ( #{ job . method } ): #{ thread . inspect } . Deleting and resting job. "
@@jobs_started . delete ( job . id )
return false
end
# check thread state:
# http://devdocs.io/ruby~2.4/thread#method-i-status
status = thread . status
# non falsly state means it has some literal running state
if status . present?
logger . info " Running job thread for ' #{ job . name } ' ( #{ job . method } ) status is: #{ status } "
return true
end
# the following cases should not happen since the
# @@jobs_started cleanup is performed inside of the
# thread itself
# therefore we have to log an error and remove it
# from our threadpool @@jobs_started
how = 'unknownly'
if status . nil?
how = 'via an exception'
elsif status == false
how = 'normally'
end
logger . error " Job thread terminated #{ how } found for ' #{ job . name } ' ( #{ job . method } ). This should not happen. Please report. "
@@jobs_started . delete ( job . id )
false
end
2017-05-11 15:18:03 +00:00
# Checks all delayed jobs that are locked and cleans them up.
# Should only get called when the Scheduler gets started.
#
# @see Scheduler#cleanup_delayed
#
# @param [Boolean] force forces the cleanup if not called in Scheduler starting context.
#
# @example
# Scheduler.cleanup
#
# @raise [RuntimeError] If called without force and not when Scheduler gets started.
#
# return [nil]
def self . cleanup ( force : false )
2017-11-23 08:09:44 +00:00
if ! force && caller_locations ( 1 .. 1 ) . first . label != 'threads'
2022-01-19 12:19:04 +00:00
raise 'This method should only get called when Scheduler.threads are initialized. Use `force: true` to start anyway.' # rubocop:disable Zammad/DetectTranslatableString
2017-05-11 15:18:03 +00:00
end
2018-05-10 13:24:11 +00:00
start_time = Time . zone . now
2018-04-26 08:22:09 +00:00
2018-05-10 13:24:11 +00:00
cleanup_delayed_jobs ( start_time )
cleanup_import_jobs ( start_time )
end
# Checks for locked delayed jobs and tries to reschedule or destroy each of them.
#
# @param [ActiveSupport::TimeWithZone] after the time the cleanup was started
#
# @example
# Scheduler.cleanup_delayed_jobs(TimeZone.now)
#
# return [nil]
def self . cleanup_delayed_jobs ( after )
log_start_finish ( :info , " Cleanup of left over locked delayed jobs #{ after } " ) do
Delayed :: Job . where ( 'updated_at < ?' , after ) . where . not ( locked_at : nil ) . each do | job |
2018-04-26 08:22:09 +00:00
log_start_finish ( :info , " Checking left over delayed job #{ job . inspect } " ) do
cleanup_delayed ( job )
end
end
2017-05-11 15:18:03 +00:00
end
end
2018-05-10 13:24:11 +00:00
# Checks if the given delayed job can be rescheduled or destroys it. Logs the action as warn.
# Works only for locked delayed jobs. Delayed jobs that are not locked are ignored and
2017-05-11 15:18:03 +00:00
# should get destroyed directly.
2018-05-10 13:24:11 +00:00
# Checks the Delayed::Job instance for a method called .reschedule?. The method is called
# with the Delayed::Job instance as a parameter. The result value is expected to be a Boolean.
# If the result is true the lock gets removed and the delayed job gets rescheduled.
# If the return value is false it will get destroyed which is the default behaviour.
2017-05-11 15:18:03 +00:00
#
# @param [Delayed::Job] job the job that should get checked for destroying/rescheduling.
#
# @example
# Scheduler.cleanup_delayed(job)
#
# return [nil]
def self . cleanup_delayed ( job )
return if job . locked_at . blank?
job_name = job . name
payload_object = job . payload_object
reschedule = false
if payload_object . present?
if payload_object . respond_to? ( :object )
object = payload_object . object
if object . respond_to? ( :id )
job_name += " (id: #{ object . id } ) "
end
if object . respond_to? ( :reschedule? ) && object . reschedule? ( job )
reschedule = true
end
end
if payload_object . respond_to? ( :args )
job_name += " - ARGS: #{ payload_object . args . inspect } "
end
end
if reschedule
action = 'Rescheduling'
job . unlock
job . save
else
action = 'Destroyed'
job . destroy
end
2018-04-26 08:22:09 +00:00
logger . warn " #{ action } locked delayed job: #{ job_name } "
2017-05-11 15:18:03 +00:00
end
2018-05-10 13:24:11 +00:00
# Checks for killed import jobs and marks them as finished and adds a note.
#
# @param [ActiveSupport::TimeWithZone] after the time the cleanup was started
#
# @example
# Scheduler.cleanup_import_jobs(TimeZone.now)
#
# return [nil]
def self . cleanup_import_jobs ( after )
log_start_finish ( :info , " Cleanup of left over import jobs #{ after } " ) do
2021-11-15 15:58:19 +00:00
error = __ ( 'Interrupted by scheduler restart. Please restart manually or wait till next execution time.' ) . freeze
2018-05-10 13:24:11 +00:00
# we need to exclude jobs that were updated at or since we started
# cleaning up (via the #reschedule? call) because they might
# were started `.delay`-ed and are flagged for restart
ImportJob . running . where ( 'updated_at < ?' , after ) . each do | job |
job . update! (
finished_at : after ,
result : {
error : error
}
)
end
end
end
2016-01-05 08:48:26 +00:00
def self . start_job ( job )
2013-03-05 08:38:58 +00:00
2018-01-09 16:46:25 +00:00
# start job and return thread handle
2017-10-01 12:25:52 +00:00
Thread . new do
2016-08-20 19:29:22 +00:00
ApplicationHandleInfo . current = 'scheduler'
2015-05-20 23:07:13 +00:00
2019-10-10 11:51:35 +00:00
logger . debug { " Started job thread for ' #{ job . name } ' ( #{ job . method } )... " }
2015-05-20 23:07:13 +00:00
2018-01-09 16:46:25 +00:00
# start loop for periods equal or under 5 minutes
if job . period && job . period < = 5 . minutes
2018-09-05 05:14:39 +00:00
loop_count = 0
2015-05-05 14:10:06 +00:00
loop do
2018-09-05 05:14:39 +00:00
loop_count += 1
2015-06-30 22:26:02 +00:00
_start_job ( job )
job = Scheduler . lookup ( id : job . id )
2013-05-07 20:45:00 +00:00
# exit is job got deleted
2013-03-05 08:38:58 +00:00
break if ! job
2013-05-07 20:45:00 +00:00
# exit if job is not active anymore
2013-03-05 08:38:58 +00:00
break if ! job . active
2013-05-07 20:45:00 +00:00
# exit if there is no loop period defined
2013-03-05 08:38:58 +00:00
break if ! job . period
2013-05-07 20:45:00 +00:00
2018-09-05 05:14:39 +00:00
# only do a certain amount of loops in this thread
break if loop_count == 1800
2013-05-07 20:45:00 +00:00
# wait until next run
2013-03-05 08:38:58 +00:00
sleep job . period
end
else
2015-06-30 22:26:02 +00:00
_start_job ( job )
2013-03-05 08:38:58 +00:00
end
2015-05-20 23:07:13 +00:00
2018-12-03 11:27:51 +00:00
if job . present?
job . pid = ''
job . save
2019-10-10 11:51:35 +00:00
logger . debug { " ...stopped thread for ' #{ job . method } ' " }
2018-12-03 11:27:51 +00:00
# release thread lock and remove thread handle
@@jobs_started . delete ( job . id )
else
logger . warn ' ...Job got deleted while running'
end
ActiveRecord :: Base . connection . close
2017-10-01 12:25:52 +00:00
end
2013-03-05 08:38:58 +00:00
end
2016-01-05 08:48:26 +00:00
def self . _start_job ( job , try_count = 0 , try_run_time = Time . zone . now )
2019-10-10 11:51:35 +00:00
started_at = Time . zone . now
2017-09-11 11:16:08 +00:00
job . update! (
2019-10-10 11:51:35 +00:00
last_run : started_at ,
2017-09-07 13:00:12 +00:00
pid : Thread . current . object_id ,
status : 'ok' ,
error_message : '' ,
)
2015-07-03 15:30:05 +00:00
logger . info " execute #{ job . method } (try_count #{ try_count } )... "
2021-07-16 13:29:38 +00:00
eval job . method # rubocop:disable Security/Eval
2019-10-10 11:51:35 +00:00
took = Time . zone . now - started_at
logger . info " ended #{ job . method } took: #{ took } seconds. "
2015-07-03 15:30:05 +00:00
rescue = > e
2019-10-10 11:51:35 +00:00
took = Time . zone . now - started_at
logger . error " execute #{ job . method } (try_count #{ try_count } ) exited with error #{ e . inspect } in: #{ took } seconds. "
2015-07-03 15:30:05 +00:00
# reconnect in case db connection is lost
2014-03-27 13:07:57 +00:00
begin
2015-07-03 15:30:05 +00:00
ActiveRecord :: Base . connection . reconnect!
2014-03-27 13:07:57 +00:00
rescue = > e
2015-07-03 15:30:05 +00:00
logger . error " Can't reconnect to database #{ e . inspect } "
end
2014-03-27 13:07:57 +00:00
2015-07-03 15:30:05 +00:00
try_run_max = 10
try_count += 1
2014-03-27 13:07:57 +00:00
2015-07-03 15:30:05 +00:00
# reset error counter if to old
2016-01-05 08:48:26 +00:00
if try_run_time + ( 60 * 5 ) < Time . zone . now
2015-07-03 15:30:05 +00:00
try_count = 0
end
try_run_time = Time . zone . now
2014-03-27 13:07:57 +00:00
2015-07-03 15:30:05 +00:00
# restart job again
if try_run_max > try_count
2018-04-27 07:26:20 +00:00
# wait between retries (see https://github.com/zammad/zammad/issues/1950)
sleep ( try_count ) if Rails . env . production?
2016-01-05 08:48:26 +00:00
_start_job ( job , try_count , try_run_time )
2015-07-03 15:30:05 +00:00
else
2018-01-09 16:46:25 +00:00
# release thread lock and remove thread handle
@@jobs_started . delete ( job . id )
2017-05-15 11:50:55 +00:00
error = " Failed to run #{ job . method } after #{ try_count } tries #{ e . inspect } "
logger . error error
2017-09-11 11:16:08 +00:00
job . update! (
2017-09-07 13:00:12 +00:00
error_message : error ,
2018-12-19 17:31:51 +00:00
status : 'error' ,
active : false ,
2017-09-07 13:00:12 +00:00
)
2014-03-27 13:07:57 +00:00
end
2018-01-09 16:47:43 +00:00
# rescue any other Exceptions that are not StandardError or childs of it
# https://stackoverflow.com/questions/10048173/why-is-it-bad-style-to-rescue-exception-e-in-ruby
# http://rubylearning.com/satishtalim/ruby_exceptions.html
rescue Exception = > e # rubocop:disable Lint/RescueException
2019-10-10 11:51:35 +00:00
took = Time . zone . now - started_at
logger . error " execute #{ job . method } (try_count #{ try_count } ) exited with a non standard-error #{ e . inspect } in: #{ took } seconds. "
2018-01-09 16:47:43 +00:00
raise
2021-08-25 12:24:42 +00:00
ensure
ActiveSupport :: CurrentAttributes . clear_all
2013-03-05 08:38:58 +00:00
end
2013-08-04 21:56:02 +00:00
2016-04-26 09:30:46 +00:00
def self . worker ( foreground = false )
# used for tests
if foreground
2016-08-20 19:29:22 +00:00
original_interface_handle = ApplicationHandleInfo . current
ApplicationHandleInfo . current = 'scheduler'
2016-04-26 09:30:46 +00:00
original_user_id = UserInfo . current_user_id
UserInfo . current_user_id = nil
2016-08-20 19:29:22 +00:00
2016-04-26 09:30:46 +00:00
loop do
success , failure = Delayed :: Worker . new . work_off
2016-07-26 22:02:28 +00:00
if failure . nonzero?
2020-09-30 09:07:01 +00:00
raise " #{ failure } failed background jobs: #{ Delayed :: Job . where . not ( last_error : nil ) . inspect } "
2016-04-26 09:30:46 +00:00
end
2016-07-26 22:02:28 +00:00
break if success . zero?
2016-04-26 09:30:46 +00:00
end
UserInfo . current_user_id = original_user_id
2016-08-20 19:29:22 +00:00
ApplicationHandleInfo . current = original_interface_handle
2016-04-26 09:30:46 +00:00
return
end
2014-02-03 18:26:41 +00:00
2016-04-26 09:30:46 +00:00
# used for production
2018-09-17 07:01:17 +00:00
wait = 4
2017-10-01 12:25:52 +00:00
Thread . new do
2015-05-20 23:07:13 +00:00
sleep wait
2014-02-03 18:26:41 +00:00
2015-05-21 06:37:21 +00:00
logger . info " Starting worker thread #{ Delayed :: Job } "
2014-02-03 18:26:41 +00:00
2015-05-20 23:07:13 +00:00
loop do
2016-08-20 19:29:22 +00:00
ApplicationHandleInfo . current = 'scheduler'
2015-05-20 23:07:13 +00:00
result = nil
2014-02-03 18:26:41 +00:00
2015-05-20 23:07:13 +00:00
realtime = Benchmark . realtime do
2018-03-20 17:47:49 +00:00
logger . debug { " *** worker thread, #{ Delayed :: Job . all . count } in queue " }
2015-05-20 23:07:13 +00:00
result = Delayed :: Worker . new . work_off
end
count = result . sum
if count . zero?
sleep wait
2018-03-20 17:47:49 +00:00
logger . debug { '*** worker thread loop' }
2015-05-20 23:07:13 +00:00
else
2018-04-12 14:57:37 +00:00
format " *** #{ count } jobs processed at %<jps>.4f j/s, %<failed>d failed ... \n " , jps : count / realtime , failed : result . last
2015-05-20 23:07:13 +00:00
end
2014-02-03 18:26:41 +00:00
end
2015-05-20 23:07:13 +00:00
logger . info ' ...stopped worker thread'
ActiveRecord :: Base . connection . close
2017-10-01 12:25:52 +00:00
end
2015-05-20 23:07:13 +00:00
2014-02-03 18:26:41 +00:00
end
2017-09-07 16:07:48 +00:00
# This function returns a list of failed jobs
#
# @example
# Scheduler.failed_jobs
#
# return [Array]
def self . failed_jobs
where ( status : 'error' , active : false )
end
2017-09-07 15:34:32 +00:00
# This function restarts failed jobs to retry them
#
# @example
# Scheduler.restart_failed_jobs
#
# return [true]
def self . restart_failed_jobs
2017-09-07 16:07:48 +00:00
failed_jobs . each do | job |
2017-09-11 11:16:08 +00:00
job . update! ( active : true )
2017-09-07 15:34:32 +00:00
end
true
end
2013-06-12 15:59:58 +00:00
end