From 199686dce1aa53431802c65279b1027703091e93 Mon Sep 17 00:00:00 2001 From: Thorsten Eckel Date: Thu, 11 May 2017 17:18:03 +0200 Subject: [PATCH] Fixed issues #1008 - LDAP import stucks when Scheduler gets stopped or restarted. --- app/models/import_job.rb | 19 +++ app/models/scheduler.rb | 77 +++++++++++ lib/import/base.rb | 17 ++- lib/import/ldap.rb | 18 +++ spec/lib/import/base_spec.rb | 11 ++ .../lib/import/import_job_backend_examples.rb | 9 +- spec/lib/import/ldap_spec.rb | 28 +++- spec/models/import_job_spec.rb | 43 ++++++ spec/models/scheduler_spec.rb | 128 ++++++++++++++++++ 9 files changed, 345 insertions(+), 5 deletions(-) create mode 100644 spec/models/scheduler_spec.rb diff --git a/app/models/import_job.rb b/app/models/import_job.rb index c0b6a2d2e..9192b98bd 100644 --- a/app/models/import_job.rb +++ b/app/models/import_job.rb @@ -34,6 +34,25 @@ class ImportJob < ApplicationModel save end + # Gets called when the Scheduler gets (re-)started and this job was still + # in the queue. If `finished_at` is blank the call is piped through to + # the ImportJob backend which has to decide how to go from here. The delayed + # job will get destroyed if rescheduled? is not implemented + # as an ImportJob backend class method. + # + # @see Scheduler#cleanup_delayed + # + # @example + # import.reschedule?(delayed_job) + # + # return [Boolean] whether the ImportJob should get rescheduled (true) or destroyed (false) + def reschedule?(delayed_job) + return false if finished_at.present? + instance = name.constantize.new(self) + return false if !instance.respond_to?(:reschedule?) + instance.reschedule?(delayed_job) + end + # Convenience wrapper around the start method for starting (delayed) dry runs. # Logs the start and end time (if ended successfully) and logs # exceptions into result if they happen. diff --git a/app/models/scheduler.rb b/app/models/scheduler.rb index 47ff10290..0b2825abe 100644 --- a/app/models/scheduler.rb +++ b/app/models/scheduler.rb @@ -11,6 +11,9 @@ class Scheduler < ApplicationModel Thread.abort_on_exception = true + # cleanup old background jobs + cleanup + # start worker for background jobs worker @@ -44,6 +47,80 @@ class Scheduler < ApplicationModel end end + # Checks all delayed jobs that are locked and cleans them up. + # Should only get called when the Scheduler gets started. + # + # @see Scheduler#cleanup_delayed + # + # @param [Boolean] force forces the cleanup if not called in Scheduler starting context. + # + # @example + # Scheduler.cleanup + # + # @raise [RuntimeError] If called without force and not when Scheduler gets started. + # + # return [nil] + def self.cleanup(force: false) + + if !force && caller_locations.first.label != 'threads' + raise 'This method should only get called when Scheduler.threads are initialized. Use `force: true` to start anyway.' + end + + Delayed::Job.all.each do |job| + cleanup_delayed(job) + end + end + + # Checks if the given job can be rescheduled or destroys it. Logs the action as warn. + # Works only for locked jobs. Jobs that are not locked are ignored and + # should get destroyed directly. + # Checks the delayed job object for a method called .reschedule?. The memthod is called + # with the delayed job as a parameter. The result value is expected as a Boolean. If the + # result is true the lock gets removed and the delayed job gets rescheduled. If the return + # value is false it will get destroyed which is the default behaviour. + # + # @param [Delayed::Job] job the job that should get checked for destroying/rescheduling. + # + # @example + # Scheduler.cleanup_delayed(job) + # + # return [nil] + def self.cleanup_delayed(job) + return if job.locked_at.blank? + + job_name = job.name + payload_object = job.payload_object + reschedule = false + if payload_object.present? + if payload_object.respond_to?(:object) + object = payload_object.object + + if object.respond_to?(:id) + job_name += " (id: #{object.id})" + end + + if object.respond_to?(:reschedule?) && object.reschedule?(job) + reschedule = true + end + end + + if payload_object.respond_to?(:args) + job_name += " - ARGS: #{payload_object.args.inspect}" + end + end + + if reschedule + action = 'Rescheduling' + job.unlock + job.save + else + action = 'Destroyed' + job.destroy + end + + Rails.logger.warn "#{action} locked delayed job: #{job_name}" + end + def self.start_job(job) Thread.new { diff --git a/lib/import/base.rb b/lib/import/base.rb index 3bfe7f41f..331772f1c 100644 --- a/lib/import/base.rb +++ b/lib/import/base.rb @@ -3,7 +3,7 @@ module Import class Base - # Checks if the able to get queued by the scheduler. + # Checks if the backend is able to get queued by the Scheduler. # # @example # Import::ExampleBackend.queueable? @@ -14,7 +14,20 @@ module Import true end - # Initializes a new instance with a stored reference to the import job. + # Checks if the backend is able to get rescheduled in case the Scheduler + # got (re-)started while this ImportJob was running. Defaults to false. + # + # @example + # instance = Import::LDAP.new(import_job) + # instance.reschedule?(delayed_job) + # #=> false + # + # return [false] + def reschedule?(_delayed_job) + false + end + + # Initializes a new instance with a stored reference to the ImportJob. # # @example # instance = Import::ExampleBackend.new(import_job) diff --git a/lib/import/ldap.rb b/lib/import/ldap.rb index e817762e3..5b82e60a4 100644 --- a/lib/import/ldap.rb +++ b/lib/import/ldap.rb @@ -32,6 +32,24 @@ module Import start_import end + # Gets called when the Scheduler gets (re-)started and a LDAP ImportJob was still + # in the queue. The job will always get restarted to avoid the gap till the next + # run triggered by the Scheduler. The result will get updated to inform the user + # in the agent interface result view. + # + # @example + # instance = Import::LDAP.new(import_job) + # instance.reschedule?(delayed_job) + # #=> true + # + # return [true] + def reschedule?(_delayed_job) + @import_job.update_attribute(:result, { + info: 'Restarting due to scheduler restart.' + }) + true + end + private def start_import diff --git a/spec/lib/import/base_spec.rb b/spec/lib/import/base_spec.rb index d1323ed7a..148c6c5f6 100644 --- a/spec/lib/import/base_spec.rb +++ b/spec/lib/import/base_spec.rb @@ -23,4 +23,15 @@ RSpec.describe Import::Base do end.to raise_error(RuntimeError) end end + + describe '#reschedule?' do + + it 'returns false by default' do + import_job = create(:import_job) + instance = described_class.new(import_job) + delayed_job = double() + + expect(instance.reschedule?(delayed_job)).to be false + end + end end diff --git a/spec/lib/import/import_job_backend_examples.rb b/spec/lib/import/import_job_backend_examples.rb index a72c3e74e..acca289e5 100644 --- a/spec/lib/import/import_job_backend_examples.rb +++ b/spec/lib/import/import_job_backend_examples.rb @@ -1,6 +1,6 @@ RSpec.shared_examples 'ImportJob backend' do - it 'responds to #queueable?' do + it 'responds to .queueable?' do expect(described_class).to respond_to(:queueable?) end @@ -16,8 +16,13 @@ RSpec.shared_examples 'ImportJob backend' do end.not_to raise_error end - it 'responds to .start' do + it 'responds to #start' do import_job = create(:import_job) expect(described_class.new(import_job)).to respond_to(:start) end + + it 'responds to #reschedule?' do + import_job = create(:import_job) + expect(described_class.new(import_job)).to respond_to(:reschedule?) + end end diff --git a/spec/lib/import/ldap_spec.rb b/spec/lib/import/ldap_spec.rb index 645c45a2c..1359c8904 100644 --- a/spec/lib/import/ldap_spec.rb +++ b/spec/lib/import/ldap_spec.rb @@ -4,7 +4,7 @@ require 'lib/import/import_job_backend_examples' RSpec.describe Import::Ldap do it_behaves_like 'ImportJob backend' - describe '::queueable?' do + describe '.queueable?' do it 'is queueable if LDAP integration is activated and configured' do allow(Setting).to receive(:get).with('ldap_integration').and_return(true) @@ -86,4 +86,30 @@ RSpec.describe Import::Ldap do end end end + + describe '#reschedule?' do + + it 'initiates always a rescheduling' do + import_job = create(:import_job) + instance = described_class.new(import_job) + delayed_job = double() + + expect(instance.reschedule?(delayed_job)).to be true + end + + it 'updates the result with an info text' do + import_job = create(:import_job) + instance = described_class.new(import_job) + delayed_job = double() + + expect do + instance.reschedule?(delayed_job) + end.to change { + import_job.result + } + + expect(import_job.result.key?(:info)).to be true + end + + end end diff --git a/spec/models/import_job_spec.rb b/spec/models/import_job_spec.rb index 638277451..adb6d1a88 100644 --- a/spec/models/import_job_spec.rb +++ b/spec/models/import_job_spec.rb @@ -10,10 +10,28 @@ RSpec.describe ImportJob do end end end + + module Import + class NoRescheduleMethod + + def initialize(import_job) + @import_job = import_job + end + + def start + @import_job.result = { state: 'Done' } + end + + def reschedule?(_delayed_job) + 'invalid_but_checkable_result' + end + end + end end after do Import.send(:remove_const, :Test) + Import.send(:remove_const, :NoRescheduleMethod) end let(:test_backend_name) { 'Import::Test' } @@ -197,4 +215,29 @@ RSpec.describe ImportJob do end end + describe '.reschedule?' do + + it 'returns false for already finished jobs' do + instance = create(:import_job) + delayed_job = double() + + instance.update_attribute(:finished_at, Time.zone.now) + + expect(instance.reschedule?(delayed_job)).to be false + end + + it 'returns false for backends not responding to reschedule?' do + instance = create(:import_job) + delayed_job = double() + + expect(instance.reschedule?(delayed_job)).to be false + end + + it 'returns the backend reschedule? value' do + instance = create(:import_job, name: 'Import::NoRescheduleMethod') + delayed_job = double() + + expect(instance.reschedule?(delayed_job)).to eq 'invalid_but_checkable_result' + end + end end diff --git a/spec/models/scheduler_spec.rb b/spec/models/scheduler_spec.rb new file mode 100644 index 000000000..6f9302e65 --- /dev/null +++ b/spec/models/scheduler_spec.rb @@ -0,0 +1,128 @@ +require 'rails_helper' + +RSpec::Matchers.define_negated_matcher :not_change, :change + +RSpec.describe Scheduler do + + before do + module SpecSpace + class DelayedJobBackend + + def self.start + # noop + end + + # rubocop:disable Style/TrivialAccessors + def self.reschedule=(reschedule) + @reschedule = reschedule + end + + def self.reschedule?(_delayed_job) + @reschedule || false + end + end + end + end + + after do + SpecSpace.send(:remove_const, :DelayedJobBackend) + end + + describe '.cleanup' do + + it 'gets called by .threads' do + expect(described_class).to receive(:cleanup).and_throw(:called) + expect do + described_class.threads + end.to throw_symbol(:called) + end + + context 'not called from .threads method' do + + it 'throws an exception' do + expect do + described_class.cleanup + end.to raise_error(RuntimeError) + end + + it 'throws no exception with force parameter' do + expect do + described_class.cleanup(force: true) + end.not_to raise_error + end + end + + # helpers to avoid the throwing behaviour "describe"d above + def simulate_threads_call + threads + end + + def threads + described_class.cleanup + end + + it 'keeps unlocked Delayed::Job-s' do + # meta :) + described_class.delay.cleanup + + expect do + simulate_threads_call + end.not_to change { + Delayed::Job.count + } + end + + context 'locked Delayed::Job' do + + it 'gets destroyed' do + # meta :) + described_class.delay.cleanup + + # lock job (simluates interrupted scheduler task) + locked_job = Delayed::Job.last + locked_job.update_attribute(:locked_at, Time.zone.now) + + expect do + simulate_threads_call + end.to change { + Delayed::Job.count + }.by(-1) + end + + context 'respond to reschedule?' do + + it 'gets rescheduled for positive responses' do + SpecSpace::DelayedJobBackend.reschedule = true + SpecSpace::DelayedJobBackend.delay.start + + # lock job (simluates interrupted scheduler task) + locked_job = Delayed::Job.last + locked_job.update_attribute(:locked_at, Time.zone.now) + + expect do + simulate_threads_call + end.to not_change { + Delayed::Job.count + }.and change { + Delayed::Job.last.locked_at + } + end + + it 'gets destroyed for negative responses' do + SpecSpace::DelayedJobBackend.reschedule = false + SpecSpace::DelayedJobBackend.delay.start + + # lock job (simluates interrupted scheduler task) + locked_job = Delayed::Job.last + locked_job.update_attribute(:locked_at, Time.zone.now) + + expect do + simulate_threads_call + end.to change { + Delayed::Job.count + }.by(-1) + end + end + end + end +end