diff --git a/app/models/import_job.rb b/app/models/import_job.rb index 335fd2a06..4623aba5c 100644 --- a/app/models/import_job.rb +++ b/app/models/import_job.rb @@ -5,6 +5,8 @@ class ImportJob < ApplicationModel store :payload store :result + scope :running, -> { where(finished_at: nil, dry_run: false).where.not(started_at: nil) } + # Starts the import backend class based on the name attribute. # Import backend class is initialized with the current instance. # Logs the start and end time (if ended successfully) and logs diff --git a/app/models/scheduler.rb b/app/models/scheduler.rb index e3b1baa60..5aa54e7ca 100644 --- a/app/models/scheduler.rb +++ b/app/models/scheduler.rb @@ -121,9 +121,24 @@ class Scheduler < ApplicationModel raise 'This method should only get called when Scheduler.threads are initialized. Use `force: true` to start anyway.' end - log_start_finish(:info, 'Cleanup of left over locked delayed jobs') do + start_time = Time.zone.now - Delayed::Job.all.each do |job| + cleanup_delayed_jobs(start_time) + cleanup_import_jobs(start_time) + end + + # Checks for locked delayed jobs and tries to reschedule or destroy each of them. + # + # @param [ActiveSupport::TimeWithZone] after the time the cleanup was started + # + # @example + # Scheduler.cleanup_delayed_jobs(TimeZone.now) + # + # return [nil] + def self.cleanup_delayed_jobs(after) + log_start_finish(:info, "Cleanup of left over locked delayed jobs #{after}") do + + Delayed::Job.where('updated_at < ?', after).where.not(locked_at: nil).each do |job| log_start_finish(:info, "Checking left over delayed job #{job.inspect}") do cleanup_delayed(job) end @@ -131,13 +146,13 @@ class Scheduler < ApplicationModel end end - # Checks if the given job can be rescheduled or destroys it. Logs the action as warn. - # Works only for locked jobs. Jobs that are not locked are ignored and + # Checks if the given delayed job can be rescheduled or destroys it. Logs the action as warn. + # Works only for locked delayed jobs. Delayed jobs that are not locked are ignored and # should get destroyed directly. - # Checks the delayed job object for a method called .reschedule?. The memthod is called - # with the delayed job as a parameter. The result value is expected as a Boolean. If the - # result is true the lock gets removed and the delayed job gets rescheduled. If the return - # value is false it will get destroyed which is the default behaviour. + # Checks the Delayed::Job instance for a method called .reschedule?. The method is called + # with the Delayed::Job instance as a parameter. The result value is expected to be a Boolean. + # If the result is true the lock gets removed and the delayed job gets rescheduled. + # If the return value is false it will get destroyed which is the default behaviour. # # @param [Delayed::Job] job the job that should get checked for destroying/rescheduling. # @@ -181,6 +196,33 @@ class Scheduler < ApplicationModel logger.warn "#{action} locked delayed job: #{job_name}" end + # Checks for killed import jobs and marks them as finished and adds a note. + # + # @param [ActiveSupport::TimeWithZone] after the time the cleanup was started + # + # @example + # Scheduler.cleanup_import_jobs(TimeZone.now) + # + # return [nil] + def self.cleanup_import_jobs(after) + log_start_finish(:info, "Cleanup of left over import jobs #{after}") do + error = 'Interrupted by scheduler restart. Please restart manually or wait till next execution time.'.freeze + + # we need to exclude jobs that were updated at or since we started + # cleaning up (via the #reschedule? call) because they might + # were started `.delay`-ed and are flagged for restart + ImportJob.running.where('updated_at < ?', after).each do |job| + + job.update!( + finished_at: after, + result: { + error: error + } + ) + end + end + end + def self.start_job(job) # start job and return thread handle diff --git a/spec/models/scheduler_spec.rb b/spec/models/scheduler_spec.rb index aef9511cc..0f11afc74 100644 --- a/spec/models/scheduler_spec.rb +++ b/spec/models/scheduler_spec.rb @@ -99,56 +99,24 @@ RSpec.describe Scheduler do described_class.cleanup end - it 'keeps unlocked Delayed::Job-s' do - # meta :) - described_class.delay.cleanup + context 'Delayed::Job' do - expect do - simulate_threads_call - end.not_to change { - Delayed::Job.count - } - end - - context 'locked Delayed::Job' do - - it 'gets destroyed' do + it 'keeps unlocked' do # meta :) described_class.delay.cleanup - # lock job (simluates interrupted scheduler task) - locked_job = Delayed::Job.last - locked_job.update!(locked_at: Time.zone.now) - expect do simulate_threads_call - end.to change { + end.not_to change { Delayed::Job.count - }.by(-1) + } end - context 'respond to reschedule?' do + context 'locked' do - it 'gets rescheduled for positive responses' do - SpecSpace::DelayedJobBackend.reschedule = true - SpecSpace::DelayedJobBackend.delay.start - - # lock job (simluates interrupted scheduler task) - locked_job = Delayed::Job.last - locked_job.update!(locked_at: Time.zone.now) - - expect do - simulate_threads_call - end.to not_change { - Delayed::Job.count - }.and change { - Delayed::Job.last.locked_at - } - end - - it 'gets destroyed for negative responses' do - SpecSpace::DelayedJobBackend.reschedule = false - SpecSpace::DelayedJobBackend.delay.start + it 'gets destroyed' do + # meta :) + described_class.delay.cleanup # lock job (simluates interrupted scheduler task) locked_job = Delayed::Job.last @@ -160,6 +128,78 @@ RSpec.describe Scheduler do Delayed::Job.count }.by(-1) end + + context 'respond to reschedule?' do + + it 'gets rescheduled for positive responses' do + SpecSpace::DelayedJobBackend.reschedule = true + SpecSpace::DelayedJobBackend.delay.start + + # lock job (simluates interrupted scheduler task) + locked_job = Delayed::Job.last + locked_job.update!(locked_at: Time.zone.now) + + expect do + simulate_threads_call + end.to not_change { + Delayed::Job.count + }.and change { + Delayed::Job.last.locked_at + } + end + + it 'gets destroyed for negative responses' do + SpecSpace::DelayedJobBackend.reschedule = false + SpecSpace::DelayedJobBackend.delay.start + + # lock job (simluates interrupted scheduler task) + locked_job = Delayed::Job.last + locked_job.update!(locked_at: Time.zone.now) + + expect do + simulate_threads_call + end.to change { + Delayed::Job.count + }.by(-1) + end + end + end + end + + context 'ImportJob' do + + context 'affected job' do + + let(:job) { create(:import_job, started_at: 5.minutes.ago) } + + it 'finishes stuck jobs' do + + expect do + simulate_threads_call + end.to change { + job.reload.finished_at + } + end + + it 'adds an error message to the result' do + + expect do + simulate_threads_call + end.to change { + job.reload.result[:error] + } + end + end + + it "doesn't change jobs added after stop" do + + job = create(:import_job) + + expect do + simulate_threads_call + end.not_to change { + job.reload + } end end end