Fixed issues #1008 - LDAP import stucks when Scheduler gets stopped or restarted.

This commit is contained in:
Thorsten Eckel 2017-05-11 17:18:03 +02:00
parent ec6a4a199c
commit 199686dce1
9 changed files with 345 additions and 5 deletions

View file

@ -34,6 +34,25 @@ class ImportJob < ApplicationModel
save
end
# Gets called when the Scheduler gets (re-)started and this job was still
# in the queue. If `finished_at` is blank the call is piped through to
# the ImportJob backend which has to decide how to go from here. The delayed
# job will get destroyed if rescheduled? is not implemented
# as an ImportJob backend class method.
#
# @see Scheduler#cleanup_delayed
#
# @example
# import.reschedule?(delayed_job)
#
# return [Boolean] whether the ImportJob should get rescheduled (true) or destroyed (false)
def reschedule?(delayed_job)
return false if finished_at.present?
instance = name.constantize.new(self)
return false if !instance.respond_to?(:reschedule?)
instance.reschedule?(delayed_job)
end
# Convenience wrapper around the start method for starting (delayed) dry runs.
# Logs the start and end time (if ended successfully) and logs
# exceptions into result if they happen.

View file

@ -11,6 +11,9 @@ class Scheduler < ApplicationModel
Thread.abort_on_exception = true
# cleanup old background jobs
cleanup
# start worker for background jobs
worker
@ -44,6 +47,80 @@ class Scheduler < ApplicationModel
end
end
# Checks all delayed jobs that are locked and cleans them up.
# Should only get called when the Scheduler gets started.
#
# @see Scheduler#cleanup_delayed
#
# @param [Boolean] force forces the cleanup if not called in Scheduler starting context.
#
# @example
# Scheduler.cleanup
#
# @raise [RuntimeError] If called without force and not when Scheduler gets started.
#
# return [nil]
def self.cleanup(force: false)
if !force && caller_locations.first.label != 'threads'
raise 'This method should only get called when Scheduler.threads are initialized. Use `force: true` to start anyway.'
end
Delayed::Job.all.each do |job|
cleanup_delayed(job)
end
end
# Checks if the given job can be rescheduled or destroys it. Logs the action as warn.
# Works only for locked jobs. Jobs that are not locked are ignored and
# should get destroyed directly.
# Checks the delayed job object for a method called .reschedule?. The memthod is called
# with the delayed job as a parameter. The result value is expected as a Boolean. If the
# result is true the lock gets removed and the delayed job gets rescheduled. If the return
# value is false it will get destroyed which is the default behaviour.
#
# @param [Delayed::Job] job the job that should get checked for destroying/rescheduling.
#
# @example
# Scheduler.cleanup_delayed(job)
#
# return [nil]
def self.cleanup_delayed(job)
return if job.locked_at.blank?
job_name = job.name
payload_object = job.payload_object
reschedule = false
if payload_object.present?
if payload_object.respond_to?(:object)
object = payload_object.object
if object.respond_to?(:id)
job_name += " (id: #{object.id})"
end
if object.respond_to?(:reschedule?) && object.reschedule?(job)
reschedule = true
end
end
if payload_object.respond_to?(:args)
job_name += " - ARGS: #{payload_object.args.inspect}"
end
end
if reschedule
action = 'Rescheduling'
job.unlock
job.save
else
action = 'Destroyed'
job.destroy
end
Rails.logger.warn "#{action} locked delayed job: #{job_name}"
end
def self.start_job(job)
Thread.new {

View file

@ -3,7 +3,7 @@
module Import
class Base
# Checks if the able to get queued by the scheduler.
# Checks if the backend is able to get queued by the Scheduler.
#
# @example
# Import::ExampleBackend.queueable?
@ -14,7 +14,20 @@ module Import
true
end
# Initializes a new instance with a stored reference to the import job.
# Checks if the backend is able to get rescheduled in case the Scheduler
# got (re-)started while this ImportJob was running. Defaults to false.
#
# @example
# instance = Import::LDAP.new(import_job)
# instance.reschedule?(delayed_job)
# #=> false
#
# return [false]
def reschedule?(_delayed_job)
false
end
# Initializes a new instance with a stored reference to the ImportJob.
#
# @example
# instance = Import::ExampleBackend.new(import_job)

View file

@ -32,6 +32,24 @@ module Import
start_import
end
# Gets called when the Scheduler gets (re-)started and a LDAP ImportJob was still
# in the queue. The job will always get restarted to avoid the gap till the next
# run triggered by the Scheduler. The result will get updated to inform the user
# in the agent interface result view.
#
# @example
# instance = Import::LDAP.new(import_job)
# instance.reschedule?(delayed_job)
# #=> true
#
# return [true]
def reschedule?(_delayed_job)
@import_job.update_attribute(:result, {
info: 'Restarting due to scheduler restart.'
})
true
end
private
def start_import

View file

@ -23,4 +23,15 @@ RSpec.describe Import::Base do
end.to raise_error(RuntimeError)
end
end
describe '#reschedule?' do
it 'returns false by default' do
import_job = create(:import_job)
instance = described_class.new(import_job)
delayed_job = double()
expect(instance.reschedule?(delayed_job)).to be false
end
end
end

View file

@ -1,6 +1,6 @@
RSpec.shared_examples 'ImportJob backend' do
it 'responds to #queueable?' do
it 'responds to .queueable?' do
expect(described_class).to respond_to(:queueable?)
end
@ -16,8 +16,13 @@ RSpec.shared_examples 'ImportJob backend' do
end.not_to raise_error
end
it 'responds to .start' do
it 'responds to #start' do
import_job = create(:import_job)
expect(described_class.new(import_job)).to respond_to(:start)
end
it 'responds to #reschedule?' do
import_job = create(:import_job)
expect(described_class.new(import_job)).to respond_to(:reschedule?)
end
end

View file

@ -4,7 +4,7 @@ require 'lib/import/import_job_backend_examples'
RSpec.describe Import::Ldap do
it_behaves_like 'ImportJob backend'
describe '::queueable?' do
describe '.queueable?' do
it 'is queueable if LDAP integration is activated and configured' do
allow(Setting).to receive(:get).with('ldap_integration').and_return(true)
@ -86,4 +86,30 @@ RSpec.describe Import::Ldap do
end
end
end
describe '#reschedule?' do
it 'initiates always a rescheduling' do
import_job = create(:import_job)
instance = described_class.new(import_job)
delayed_job = double()
expect(instance.reschedule?(delayed_job)).to be true
end
it 'updates the result with an info text' do
import_job = create(:import_job)
instance = described_class.new(import_job)
delayed_job = double()
expect do
instance.reschedule?(delayed_job)
end.to change {
import_job.result
}
expect(import_job.result.key?(:info)).to be true
end
end
end

View file

@ -10,10 +10,28 @@ RSpec.describe ImportJob do
end
end
end
module Import
class NoRescheduleMethod
def initialize(import_job)
@import_job = import_job
end
def start
@import_job.result = { state: 'Done' }
end
def reschedule?(_delayed_job)
'invalid_but_checkable_result'
end
end
end
end
after do
Import.send(:remove_const, :Test)
Import.send(:remove_const, :NoRescheduleMethod)
end
let(:test_backend_name) { 'Import::Test' }
@ -197,4 +215,29 @@ RSpec.describe ImportJob do
end
end
describe '.reschedule?' do
it 'returns false for already finished jobs' do
instance = create(:import_job)
delayed_job = double()
instance.update_attribute(:finished_at, Time.zone.now)
expect(instance.reschedule?(delayed_job)).to be false
end
it 'returns false for backends not responding to reschedule?' do
instance = create(:import_job)
delayed_job = double()
expect(instance.reschedule?(delayed_job)).to be false
end
it 'returns the backend reschedule? value' do
instance = create(:import_job, name: 'Import::NoRescheduleMethod')
delayed_job = double()
expect(instance.reschedule?(delayed_job)).to eq 'invalid_but_checkable_result'
end
end
end

View file

@ -0,0 +1,128 @@
require 'rails_helper'
RSpec::Matchers.define_negated_matcher :not_change, :change
RSpec.describe Scheduler do
before do
module SpecSpace
class DelayedJobBackend
def self.start
# noop
end
# rubocop:disable Style/TrivialAccessors
def self.reschedule=(reschedule)
@reschedule = reschedule
end
def self.reschedule?(_delayed_job)
@reschedule || false
end
end
end
end
after do
SpecSpace.send(:remove_const, :DelayedJobBackend)
end
describe '.cleanup' do
it 'gets called by .threads' do
expect(described_class).to receive(:cleanup).and_throw(:called)
expect do
described_class.threads
end.to throw_symbol(:called)
end
context 'not called from .threads method' do
it 'throws an exception' do
expect do
described_class.cleanup
end.to raise_error(RuntimeError)
end
it 'throws no exception with force parameter' do
expect do
described_class.cleanup(force: true)
end.not_to raise_error
end
end
# helpers to avoid the throwing behaviour "describe"d above
def simulate_threads_call
threads
end
def threads
described_class.cleanup
end
it 'keeps unlocked Delayed::Job-s' do
# meta :)
described_class.delay.cleanup
expect do
simulate_threads_call
end.not_to change {
Delayed::Job.count
}
end
context 'locked Delayed::Job' do
it 'gets destroyed' do
# meta :)
described_class.delay.cleanup
# lock job (simluates interrupted scheduler task)
locked_job = Delayed::Job.last
locked_job.update_attribute(:locked_at, Time.zone.now)
expect do
simulate_threads_call
end.to change {
Delayed::Job.count
}.by(-1)
end
context 'respond to reschedule?' do
it 'gets rescheduled for positive responses' do
SpecSpace::DelayedJobBackend.reschedule = true
SpecSpace::DelayedJobBackend.delay.start
# lock job (simluates interrupted scheduler task)
locked_job = Delayed::Job.last
locked_job.update_attribute(:locked_at, Time.zone.now)
expect do
simulate_threads_call
end.to not_change {
Delayed::Job.count
}.and change {
Delayed::Job.last.locked_at
}
end
it 'gets destroyed for negative responses' do
SpecSpace::DelayedJobBackend.reschedule = false
SpecSpace::DelayedJobBackend.delay.start
# lock job (simluates interrupted scheduler task)
locked_job = Delayed::Job.last
locked_job.update_attribute(:locked_at, Time.zone.now)
expect do
simulate_threads_call
end.to change {
Delayed::Job.count
}.by(-1)
end
end
end
end
end