From 5ca41c83895e327c619f2f5255f87d2b26ab8d15 Mon Sep 17 00:00:00 2001 From: Thorsten Eckel Date: Wed, 13 Nov 2019 08:03:47 +0100 Subject: [PATCH] Feature: Added centralized ActiveJob locking based on lock key. This prevents unnecessary executing/queuing of unique jobs multiple times. --- app/jobs/application_job.rb | 4 + app/jobs/checks_kb_client_notification_job.rb | 7 + app/jobs/concerns/has_active_job_lock.rb | 118 +++++++++++++++++ app/jobs/scheduled_touch_job.rb | 11 +- app/jobs/search_index_job.rb | 6 + app/jobs/sla_ticket_rebuild_escalation_job.rb | 2 + .../ticket_online_notification_seen_job.rb | 7 + app/jobs/ticket_user_ticket_counter_job.rb | 7 + app/models/active_job_lock.rb | 21 +++ db/migrate/20120101000001_create_base.rb | 9 ++ .../20191001090809_create_active_job_locks.rb | 15 +++ spec/factories/active_job_lock.rb | 6 + .../jobs/concerns/has_active_job_lock_spec.rb | 120 ++++++++++++++++++ spec/requests/integration/monitoring_spec.rb | 20 ++- spec/support/active_job.rb | 1 + 15 files changed, 340 insertions(+), 14 deletions(-) create mode 100644 app/jobs/concerns/has_active_job_lock.rb create mode 100644 app/models/active_job_lock.rb create mode 100644 db/migrate/20191001090809_create_active_job_locks.rb create mode 100644 spec/factories/active_job_lock.rb create mode 100644 spec/jobs/concerns/has_active_job_lock_spec.rb diff --git a/app/jobs/application_job.rb b/app/jobs/application_job.rb index 6c74b8203..f1d7c0233 100644 --- a/app/jobs/application_job.rb +++ b/app/jobs/application_job.rb @@ -10,6 +10,10 @@ class ApplicationJob < ActiveJob::Base # This is a workaround to sync ActiveJob#executions to Delayed::Job#attempts # until we resolve this dependency. after_enqueue do |job| + # skip update of `attempts` attribute if job wasn't queued because of ActiveJobLock + #(another job with same lock key got queued before this job could be retried) + next if job.provider_job_id.blank? + # update the column right away without loading Delayed::Job record # see: https://stackoverflow.com/a/34264580 Delayed::Job.where(id: job.provider_job_id).update_all(attempts: job.executions) # rubocop:disable Rails/SkipsModelValidations diff --git a/app/jobs/checks_kb_client_notification_job.rb b/app/jobs/checks_kb_client_notification_job.rb index 66f2c28f8..88f8963b6 100644 --- a/app/jobs/checks_kb_client_notification_job.rb +++ b/app/jobs/checks_kb_client_notification_job.rb @@ -1,4 +1,11 @@ class ChecksKbClientNotificationJob < ApplicationJob + include HasActiveJobLock + + def lock_key + # "ChecksKbClientNotificationJob/KnowledgeBase::Answer/42/destroy" + "#{self.class.name}/#{arguments[0]}/#{arguments[1]}/#{arguments[2]}" + end + def perform(klass_name, id, event) object = klass_name.constantize.find_by(id: id) return if object.blank? diff --git a/app/jobs/concerns/has_active_job_lock.rb b/app/jobs/concerns/has_active_job_lock.rb new file mode 100644 index 000000000..0c09bf92f --- /dev/null +++ b/app/jobs/concerns/has_active_job_lock.rb @@ -0,0 +1,118 @@ +module HasActiveJobLock + extend ActiveSupport::Concern + + included do + before_enqueue do |job| # rubocop:disable Style/SymbolProc + job.ensure_active_job_lock_for_enqueue! + end + + around_perform do |job, block| + job.mark_active_job_lock_as_started + + block.call + ensure + job.release_active_job_lock! + end + end + + # Defines the lock key for the current job to prevent execution of jobs with the same key. + # This is by default the name of the ActiveJob class. + # If you're in the situation where you need to have a lock_key based on + # the given arguments you can overwrite this method in your job and access + # them via `arguments`. See ActiveJob::Core for more (e.g. queue). + # + # @example + # # default + # job = UniqueActiveJob.new + # job.lock_key + # # => "UniqueActiveJob" + # + # @example + # # with lock_key: "#{self.class.name}/#{arguments[0]}/#{arguments[1]}" + # job = SearchIndexJob.new('User', 42) + # job.lock_key + # # => "SearchIndexJob/User/42" + # + # return [String] + def lock_key + self.class.name + end + + def mark_active_job_lock_as_started + release_active_job_lock_cache + + in_active_job_lock_transaction do + # a perform_now job doesn't require any locking + return if active_job_lock.blank? + return if !active_job_lock.of?(self) + + # a perform_later job started to perform and will be marked as such + active_job_lock.touch # rubocop:disable Rails/SkipsModelValidations + end + end + + def ensure_active_job_lock_for_enqueue! + release_active_job_lock_cache + + in_active_job_lock_transaction do + return if active_job_lock_for_enqueue!.present? + + ActiveJobLock.create!( + lock_key: lock_key, + active_job_id: job_id, + ) + end + end + + def release_active_job_lock! + # only delete lock if the current job is the one holding the lock + # perform_now jobs or perform_later jobs for which follow-up jobs were enqueued + # don't need to remove any locks + lock = ActiveJobLock.lock.find_by(lock_key: lock_key, active_job_id: job_id) + + if !lock + logger.debug { "Found no ActiveJobLock for #{self.class.name} (Job ID: #{job_id}) with key '#{lock_key}'." } + return + end + + logger.debug { "Deleting ActiveJobLock for #{self.class.name} (Job ID: #{job_id}) with key '#{lock_key}'." } + lock.destroy! + end + + private + + def in_active_job_lock_transaction + # re-use active DB transaction if present + return yield if ActiveRecord::Base.connection.open_transactions.nonzero? + + # start own serializable DB transaction to prevent race conditions on DB level + ActiveJobLock.transaction(isolation: :serializable) do + yield + end + rescue ActiveRecord::RecordNotUnique + existing_active_job_lock! + end + + def active_job_lock_for_enqueue! + return if active_job_lock.blank? + + # don't enqueue perform_later jobs if a job with the same + # lock key exists that hasn't started to perform yet + existing_active_job_lock! if active_job_lock.peform_pending? + + active_job_lock.tap { |lock| lock.transfer_to(self) } + end + + def active_job_lock + @active_job_lock ||= ActiveJobLock.lock.find_by(lock_key: lock_key) + end + + def release_active_job_lock_cache + @active_job_lock = nil + end + + def existing_active_job_lock! + logger.info "Won't enqueue #{self.class.name} (Job ID: #{job_id}) because of already existing job with lock key '#{lock_key}'." + throw :abort + end +end diff --git a/app/jobs/scheduled_touch_job.rb b/app/jobs/scheduled_touch_job.rb index 16909195d..4038ecb71 100644 --- a/app/jobs/scheduled_touch_job.rb +++ b/app/jobs/scheduled_touch_job.rb @@ -1,9 +1,16 @@ class ScheduledTouchJob < ApplicationJob - def perform(klass_name, id) - klass_name.constantize.find_by(id: id)&.touch # rubocop:disable Rails/SkipsModelValidations + include HasActiveJobLock + + def lock_key + # "ScheduledTouchJob/User/42" + "#{self.class.name}/#{arguments[0]}/#{arguments[1]}" end def self.touch_at(object, date) set(wait_until: date).perform_later(object.class.to_s, object.id) end + + def perform(klass_name, id) + klass_name.constantize.find_by(id: id)&.touch # rubocop:disable Rails/SkipsModelValidations + end end diff --git a/app/jobs/search_index_job.rb b/app/jobs/search_index_job.rb index ea5e358e1..f6ee8cddb 100644 --- a/app/jobs/search_index_job.rb +++ b/app/jobs/search_index_job.rb @@ -1,9 +1,15 @@ class SearchIndexJob < ApplicationJob + include HasActiveJobLock retry_on StandardError, attempts: 20, wait: lambda { |executions| executions * 10.seconds } + def lock_key + # "SearchIndexJob/User/42" + "#{self.class.name}/#{arguments[0]}/#{arguments[1]}" + end + def perform(object, o_id) @object = object @o_id = o_id diff --git a/app/jobs/sla_ticket_rebuild_escalation_job.rb b/app/jobs/sla_ticket_rebuild_escalation_job.rb index 9070f5058..f91f99f87 100644 --- a/app/jobs/sla_ticket_rebuild_escalation_job.rb +++ b/app/jobs/sla_ticket_rebuild_escalation_job.rb @@ -1,4 +1,6 @@ class SlaTicketRebuildEscalationJob < ApplicationJob + include HasActiveJobLock + def perform Cache.delete('SLA::List::Active') Ticket::Escalation.rebuild_all diff --git a/app/jobs/ticket_online_notification_seen_job.rb b/app/jobs/ticket_online_notification_seen_job.rb index 48410207e..b4d8a8836 100644 --- a/app/jobs/ticket_online_notification_seen_job.rb +++ b/app/jobs/ticket_online_notification_seen_job.rb @@ -1,4 +1,11 @@ class TicketOnlineNotificationSeenJob < ApplicationJob + include HasActiveJobLock + + def lock_key + # "TicketOnlineNotificationSeenJob/23/42" + "#{self.class.name}/#{arguments[0]}/#{arguments[1]}" + end + def perform(ticket_id, user_id) user_id = user_id || 1 diff --git a/app/jobs/ticket_user_ticket_counter_job.rb b/app/jobs/ticket_user_ticket_counter_job.rb index b459ec3aa..f6957315e 100644 --- a/app/jobs/ticket_user_ticket_counter_job.rb +++ b/app/jobs/ticket_user_ticket_counter_job.rb @@ -1,4 +1,11 @@ class TicketUserTicketCounterJob < ApplicationJob + include HasActiveJobLock + + def lock_key + # "TicketUserTicketCounterJob/23/42" + "#{self.class.name}/#{arguments[0]}/#{arguments[1]}" + end + def perform(customer_id, updated_by_id) # check if update is needed diff --git a/app/models/active_job_lock.rb b/app/models/active_job_lock.rb new file mode 100644 index 000000000..3737f0820 --- /dev/null +++ b/app/models/active_job_lock.rb @@ -0,0 +1,21 @@ +class ActiveJobLock < ApplicationModel + + def of?(active_job) + active_job.job_id == active_job_id + end + + def peform_pending? + updated_at == created_at + end + + def transfer_to(active_job) + logger.info "Transferring ActiveJobLock with id '#{id}' from active_job_id '#{active_job_id}' to active_job_id '#{active_job_id}'." + + reset_time_stamp = Time.zone.now + update!( + active_job_id: active_job.job_id, + created_at: reset_time_stamp, + updated_at: reset_time_stamp + ) + end +end diff --git a/db/migrate/20120101000001_create_base.rb b/db/migrate/20120101000001_create_base.rb index 573aa89de..38c09be6f 100644 --- a/db/migrate/20120101000001_create_base.rb +++ b/db/migrate/20120101000001_create_base.rb @@ -715,5 +715,14 @@ class CreateBase < ActiveRecord::Migration[4.2] add_index :http_logs, [:created_at] add_foreign_key :http_logs, :users, column: :created_by_id add_foreign_key :http_logs, :users, column: :updated_by_id + + create_table :active_job_locks do |t| + t.string :lock_key + t.string :active_job_id + + t.timestamps + end + add_index :active_job_locks, :lock_key, unique: true + add_index :active_job_locks, :active_job_id, unique: true end end diff --git a/db/migrate/20191001090809_create_active_job_locks.rb b/db/migrate/20191001090809_create_active_job_locks.rb new file mode 100644 index 000000000..a099d243e --- /dev/null +++ b/db/migrate/20191001090809_create_active_job_locks.rb @@ -0,0 +1,15 @@ +class CreateActiveJobLocks < ActiveRecord::Migration[5.2] + def change + # return if it's a new setup + return if !Setting.find_by(name: 'system_init_done') + + create_table :active_job_locks do |t| + t.string :lock_key + t.string :active_job_id + + t.timestamps + end + add_index :active_job_locks, :lock_key, unique: true + add_index :active_job_locks, :active_job_id, unique: true + end +end diff --git a/spec/factories/active_job_lock.rb b/spec/factories/active_job_lock.rb new file mode 100644 index 000000000..457f1cf24 --- /dev/null +++ b/spec/factories/active_job_lock.rb @@ -0,0 +1,6 @@ +FactoryBot.define do + factory :active_job_lock do + lock_key { 'UniqueActiveJob' } + active_job_id { SecureRandom.uuid } + end +end diff --git a/spec/jobs/concerns/has_active_job_lock_spec.rb b/spec/jobs/concerns/has_active_job_lock_spec.rb new file mode 100644 index 000000000..e312e0274 --- /dev/null +++ b/spec/jobs/concerns/has_active_job_lock_spec.rb @@ -0,0 +1,120 @@ +require 'rails_helper' + +RSpec.describe HasActiveJobLock, type: :job do + + before do + stub_const job_class_namespace, job_class + end + + let(:job_class_namespace) { 'UniqueActiveJob' } + + let(:job_class) do + Class.new(ApplicationJob) do + include HasActiveJobLock + + cattr_accessor :perform_counter, default: 0 + + def perform + self.class.perform_counter += 1 + end + end + end + + shared_examples 'handle locking of jobs' do + context 'performing job is present' do + + before { create(:active_job_lock, lock_key: job_class.name, created_at: 1.minute.ago, updated_at: 1.second.ago) } + + it 'allows enqueueing of perform_later jobs' do + expect { job_class.perform_later }.to have_enqueued_job(job_class).exactly(:once) + end + + it 'allows execution of perform_now jobs' do + expect { job_class.perform_now }.to change(job_class, :perform_counter).by(1) + end + end + + context 'enqueued job is present' do + + before { job_class.perform_later } + + it "won't enqueue perform_later jobs" do + expect { job_class.perform_later }.not_to have_enqueued_job(job_class) + end + + it 'allows execution of perform_now jobs' do + expect { job_class.perform_now }.to change(job_class, :perform_counter).by(1) + end + end + + context 'running perform_now job' do + + let(:job_class) do + Class.new(super()) do + + cattr_accessor :task_completed, default: false + + def perform(long_running: false) + + if long_running + sleep(0.1) until self.class.task_completed + end + + # don't pass parameters to super method + super() + end + end + end + + let!(:thread) { Thread.new { job_class.perform_now(long_running: true) } } + + after do + job_class.task_completed = true + thread.join + end + + it 'enqueues perform_later jobs' do + expect { job_class.perform_later }.to have_enqueued_job(job_class) + end + + it 'allows execution of perform_now jobs' do + expect { job_class.perform_now }.to change(job_class, :perform_counter).by(1) + end + end + + context 'dynamic lock key' do + + let(:job_class) do + Class.new(super()) do + + def lock_key + "#{super}/#{arguments[0]}/#{arguments[1]}" + end + end + end + + it 'queues one job per lock key' do + expect do + 2.times { job_class.perform_later('User', 23) } + job_class.perform_later('User', 42) + end.to have_enqueued_job(job_class).exactly(:twice) + end + end + end + + include_examples 'handle locking of jobs' + + context 'custom lock key' do + + let(:job_class) do + Class.new(super()) do + + def lock_key + 'custom_lock_key' + end + end + end + + include_examples 'handle locking of jobs' + end +end diff --git a/spec/requests/integration/monitoring_spec.rb b/spec/requests/integration/monitoring_spec.rb index dcaae378d..8612e5464 100644 --- a/spec/requests/integration/monitoring_spec.rb +++ b/spec/requests/integration/monitoring_spec.rb @@ -420,8 +420,8 @@ RSpec.describe 'Monitoring', type: :request do # health_check - scheduler job count travel 2.seconds - 8001.times do - SearchIndexJob.perform_later('Ticket', 1) + 8001.times do |fake_ticket_id| + SearchIndexJob.perform_later('Ticket', fake_ticket_id) end Scheduler.where(active: true).each do |local_scheduler| local_scheduler.last_run = Time.zone.now @@ -579,12 +579,8 @@ RSpec.describe 'Monitoring', type: :request do expect('test4').to eq(json_response['name']) expect('Test 4').to eq(json_response['display']) - jobs = Delayed::Job.all - 4.times do - jobs.each do |job| - Delayed::Worker.new.run(job) - end + Delayed::Worker.new.work_off end # health_check @@ -595,7 +591,7 @@ RSpec.describe 'Monitoring', type: :request do expect(json_response['message']).to be_truthy expect(json_response['issues']).to be_truthy expect(json_response['healthy']).to eq(false) - expect( json_response['message']).to eq("Failed to run background job #1 'SearchIndexJob' 4 time(s) with 4 attempt(s).") + expect( json_response['message']).to eq("Failed to run background job #1 'SearchIndexJob' 1 time(s) with 1 attempt(s).") # add another job manual_added = SearchIndexJob.perform_later('Ticket', 1) @@ -609,10 +605,10 @@ RSpec.describe 'Monitoring', type: :request do expect(json_response['message']).to be_truthy expect(json_response['issues']).to be_truthy expect(json_response['healthy']).to eq(false) - expect( json_response['message']).to eq("Failed to run background job #1 'SearchIndexJob' 5 time(s) with 14 attempt(s).") + expect( json_response['message']).to eq("Failed to run background job #1 'SearchIndexJob' 2 time(s) with 11 attempt(s).") # add another job - dummy_class = Class.new do + dummy_class = Class.new(ApplicationJob) do def perform puts 'work work' @@ -630,7 +626,7 @@ RSpec.describe 'Monitoring', type: :request do expect(json_response['message']).to be_truthy expect(json_response['issues']).to be_truthy expect(json_response['healthy']).to eq(false) - expect( json_response['message']).to eq("Failed to run background job #1 'Object' 1 time(s) with 5 attempt(s).;Failed to run background job #2 'SearchIndexJob' 5 time(s) with 14 attempt(s).") + expect( json_response['message']).to eq("Failed to run background job #1 'Object' 1 time(s) with 5 attempt(s).;Failed to run background job #2 'SearchIndexJob' 2 time(s) with 11 attempt(s).") # reset settings Setting.set('es_url', prev_es_config) @@ -649,7 +645,7 @@ RSpec.describe 'Monitoring', type: :request do expect(json_response['message']).to be_truthy expect(json_response['issues']).to be_truthy expect(json_response['healthy']).to eq(false) - expect(json_response['message']).to eq("16 failing background jobs;Failed to run background job #1 'Object' 5 time(s) with 25 attempt(s).;Failed to run background job #2 'SearchIndexJob' 5 time(s) with 14 attempt(s).") + expect(json_response['message']).to eq("13 failing background jobs;Failed to run background job #1 'Object' 8 time(s) with 40 attempt(s).;Failed to run background job #2 'SearchIndexJob' 2 time(s) with 11 attempt(s).") # cleanup Delayed::Job.delete_all diff --git a/spec/support/active_job.rb b/spec/support/active_job.rb index af2647cb8..41d2b64f6 100644 --- a/spec/support/active_job.rb +++ b/spec/support/active_job.rb @@ -9,6 +9,7 @@ module ZammadActiveJobHelper def clear_jobs enqueued_jobs.clear performed_jobs.clear + ActiveJobLock.destroy_all end end