Fixes #2014 - Import job (LDAP/Exchange) hangs on scheduler restart.

This commit is contained in:
Thorsten Eckel 2018-05-10 15:24:11 +02:00
parent 163a5f2f26
commit c4ea22470a
3 changed files with 132 additions and 48 deletions

View file

@ -5,6 +5,8 @@ class ImportJob < ApplicationModel
store :payload
store :result
scope :running, -> { where(finished_at: nil, dry_run: false).where.not(started_at: nil) }
# Starts the import backend class based on the name attribute.
# Import backend class is initialized with the current instance.
# Logs the start and end time (if ended successfully) and logs

View file

@ -121,9 +121,24 @@ class Scheduler < ApplicationModel
raise 'This method should only get called when Scheduler.threads are initialized. Use `force: true` to start anyway.'
end
log_start_finish(:info, 'Cleanup of left over locked delayed jobs') do
start_time = Time.zone.now
Delayed::Job.all.each do |job|
cleanup_delayed_jobs(start_time)
cleanup_import_jobs(start_time)
end
# Checks for locked delayed jobs and tries to reschedule or destroy each of them.
#
# @param [ActiveSupport::TimeWithZone] after the time the cleanup was started
#
# @example
# Scheduler.cleanup_delayed_jobs(TimeZone.now)
#
# return [nil]
def self.cleanup_delayed_jobs(after)
log_start_finish(:info, "Cleanup of left over locked delayed jobs #{after}") do
Delayed::Job.where('updated_at < ?', after).where.not(locked_at: nil).each do |job|
log_start_finish(:info, "Checking left over delayed job #{job.inspect}") do
cleanup_delayed(job)
end
@ -131,13 +146,13 @@ class Scheduler < ApplicationModel
end
end
# Checks if the given job can be rescheduled or destroys it. Logs the action as warn.
# Works only for locked jobs. Jobs that are not locked are ignored and
# Checks if the given delayed job can be rescheduled or destroys it. Logs the action as warn.
# Works only for locked delayed jobs. Delayed jobs that are not locked are ignored and
# should get destroyed directly.
# Checks the delayed job object for a method called .reschedule?. The memthod is called
# with the delayed job as a parameter. The result value is expected as a Boolean. If the
# result is true the lock gets removed and the delayed job gets rescheduled. If the return
# value is false it will get destroyed which is the default behaviour.
# Checks the Delayed::Job instance for a method called .reschedule?. The method is called
# with the Delayed::Job instance as a parameter. The result value is expected to be a Boolean.
# If the result is true the lock gets removed and the delayed job gets rescheduled.
# If the return value is false it will get destroyed which is the default behaviour.
#
# @param [Delayed::Job] job the job that should get checked for destroying/rescheduling.
#
@ -181,6 +196,33 @@ class Scheduler < ApplicationModel
logger.warn "#{action} locked delayed job: #{job_name}"
end
# Checks for killed import jobs and marks them as finished and adds a note.
#
# @param [ActiveSupport::TimeWithZone] after the time the cleanup was started
#
# @example
# Scheduler.cleanup_import_jobs(TimeZone.now)
#
# return [nil]
def self.cleanup_import_jobs(after)
log_start_finish(:info, "Cleanup of left over import jobs #{after}") do
error = 'Interrupted by scheduler restart. Please restart manually or wait till next execution time.'.freeze
# we need to exclude jobs that were updated at or since we started
# cleaning up (via the #reschedule? call) because they might
# were started `.delay`-ed and are flagged for restart
ImportJob.running.where('updated_at < ?', after).each do |job|
job.update!(
finished_at: after,
result: {
error: error
}
)
end
end
end
def self.start_job(job)
# start job and return thread handle

View file

@ -99,56 +99,24 @@ RSpec.describe Scheduler do
described_class.cleanup
end
it 'keeps unlocked Delayed::Job-s' do
# meta :)
described_class.delay.cleanup
context 'Delayed::Job' do
expect do
simulate_threads_call
end.not_to change {
Delayed::Job.count
}
end
context 'locked Delayed::Job' do
it 'gets destroyed' do
it 'keeps unlocked' do
# meta :)
described_class.delay.cleanup
# lock job (simluates interrupted scheduler task)
locked_job = Delayed::Job.last
locked_job.update!(locked_at: Time.zone.now)
expect do
simulate_threads_call
end.to change {
end.not_to change {
Delayed::Job.count
}.by(-1)
}
end
context 'respond to reschedule?' do
context 'locked' do
it 'gets rescheduled for positive responses' do
SpecSpace::DelayedJobBackend.reschedule = true
SpecSpace::DelayedJobBackend.delay.start
# lock job (simluates interrupted scheduler task)
locked_job = Delayed::Job.last
locked_job.update!(locked_at: Time.zone.now)
expect do
simulate_threads_call
end.to not_change {
Delayed::Job.count
}.and change {
Delayed::Job.last.locked_at
}
end
it 'gets destroyed for negative responses' do
SpecSpace::DelayedJobBackend.reschedule = false
SpecSpace::DelayedJobBackend.delay.start
it 'gets destroyed' do
# meta :)
described_class.delay.cleanup
# lock job (simluates interrupted scheduler task)
locked_job = Delayed::Job.last
@ -160,6 +128,78 @@ RSpec.describe Scheduler do
Delayed::Job.count
}.by(-1)
end
context 'respond to reschedule?' do
it 'gets rescheduled for positive responses' do
SpecSpace::DelayedJobBackend.reschedule = true
SpecSpace::DelayedJobBackend.delay.start
# lock job (simluates interrupted scheduler task)
locked_job = Delayed::Job.last
locked_job.update!(locked_at: Time.zone.now)
expect do
simulate_threads_call
end.to not_change {
Delayed::Job.count
}.and change {
Delayed::Job.last.locked_at
}
end
it 'gets destroyed for negative responses' do
SpecSpace::DelayedJobBackend.reschedule = false
SpecSpace::DelayedJobBackend.delay.start
# lock job (simluates interrupted scheduler task)
locked_job = Delayed::Job.last
locked_job.update!(locked_at: Time.zone.now)
expect do
simulate_threads_call
end.to change {
Delayed::Job.count
}.by(-1)
end
end
end
end
context 'ImportJob' do
context 'affected job' do
let(:job) { create(:import_job, started_at: 5.minutes.ago) }
it 'finishes stuck jobs' do
expect do
simulate_threads_call
end.to change {
job.reload.finished_at
}
end
it 'adds an error message to the result' do
expect do
simulate_threads_call
end.to change {
job.reload.result[:error]
}
end
end
it "doesn't change jobs added after stop" do
job = create(:import_job)
expect do
simulate_threads_call
end.not_to change {
job.reload
}
end
end
end