Feature: Added centralized ActiveJob locking based on lock key. This prevents unnecessary executing/queuing of unique jobs multiple times.

This commit is contained in:
Thorsten Eckel 2019-11-13 08:03:47 +01:00 committed by Martin Edenhofer
parent 8a5552c9f8
commit 5ca41c8389
15 changed files with 340 additions and 14 deletions

View file

@ -10,6 +10,10 @@ class ApplicationJob < ActiveJob::Base
# This is a workaround to sync ActiveJob#executions to Delayed::Job#attempts # This is a workaround to sync ActiveJob#executions to Delayed::Job#attempts
# until we resolve this dependency. # until we resolve this dependency.
after_enqueue do |job| after_enqueue do |job|
# skip update of `attempts` attribute if job wasn't queued because of ActiveJobLock
#(another job with same lock key got queued before this job could be retried)
next if job.provider_job_id.blank?
# update the column right away without loading Delayed::Job record # update the column right away without loading Delayed::Job record
# see: https://stackoverflow.com/a/34264580 # see: https://stackoverflow.com/a/34264580
Delayed::Job.where(id: job.provider_job_id).update_all(attempts: job.executions) # rubocop:disable Rails/SkipsModelValidations Delayed::Job.where(id: job.provider_job_id).update_all(attempts: job.executions) # rubocop:disable Rails/SkipsModelValidations

View file

@ -1,4 +1,11 @@
class ChecksKbClientNotificationJob < ApplicationJob class ChecksKbClientNotificationJob < ApplicationJob
include HasActiveJobLock
def lock_key
# "ChecksKbClientNotificationJob/KnowledgeBase::Answer/42/destroy"
"#{self.class.name}/#{arguments[0]}/#{arguments[1]}/#{arguments[2]}"
end
def perform(klass_name, id, event) def perform(klass_name, id, event)
object = klass_name.constantize.find_by(id: id) object = klass_name.constantize.find_by(id: id)
return if object.blank? return if object.blank?

View file

@ -0,0 +1,118 @@
module HasActiveJobLock
extend ActiveSupport::Concern
included do
before_enqueue do |job| # rubocop:disable Style/SymbolProc
job.ensure_active_job_lock_for_enqueue!
end
around_perform do |job, block|
job.mark_active_job_lock_as_started
block.call
ensure
job.release_active_job_lock!
end
end
# Defines the lock key for the current job to prevent execution of jobs with the same key.
# This is by default the name of the ActiveJob class.
# If you're in the situation where you need to have a lock_key based on
# the given arguments you can overwrite this method in your job and access
# them via `arguments`. See ActiveJob::Core for more (e.g. queue).
#
# @example
# # default
# job = UniqueActiveJob.new
# job.lock_key
# # => "UniqueActiveJob"
#
# @example
# # with lock_key: "#{self.class.name}/#{arguments[0]}/#{arguments[1]}"
# job = SearchIndexJob.new('User', 42)
# job.lock_key
# # => "SearchIndexJob/User/42"
#
# return [String]
def lock_key
self.class.name
end
def mark_active_job_lock_as_started
release_active_job_lock_cache
in_active_job_lock_transaction do
# a perform_now job doesn't require any locking
return if active_job_lock.blank?
return if !active_job_lock.of?(self)
# a perform_later job started to perform and will be marked as such
active_job_lock.touch # rubocop:disable Rails/SkipsModelValidations
end
end
def ensure_active_job_lock_for_enqueue!
release_active_job_lock_cache
in_active_job_lock_transaction do
return if active_job_lock_for_enqueue!.present?
ActiveJobLock.create!(
lock_key: lock_key,
active_job_id: job_id,
)
end
end
def release_active_job_lock!
# only delete lock if the current job is the one holding the lock
# perform_now jobs or perform_later jobs for which follow-up jobs were enqueued
# don't need to remove any locks
lock = ActiveJobLock.lock.find_by(lock_key: lock_key, active_job_id: job_id)
if !lock
logger.debug { "Found no ActiveJobLock for #{self.class.name} (Job ID: #{job_id}) with key '#{lock_key}'." }
return
end
logger.debug { "Deleting ActiveJobLock for #{self.class.name} (Job ID: #{job_id}) with key '#{lock_key}'." }
lock.destroy!
end
private
def in_active_job_lock_transaction
# re-use active DB transaction if present
return yield if ActiveRecord::Base.connection.open_transactions.nonzero?
# start own serializable DB transaction to prevent race conditions on DB level
ActiveJobLock.transaction(isolation: :serializable) do
yield
end
rescue ActiveRecord::RecordNotUnique
existing_active_job_lock!
end
def active_job_lock_for_enqueue!
return if active_job_lock.blank?
# don't enqueue perform_later jobs if a job with the same
# lock key exists that hasn't started to perform yet
existing_active_job_lock! if active_job_lock.peform_pending?
active_job_lock.tap { |lock| lock.transfer_to(self) }
end
def active_job_lock
@active_job_lock ||= ActiveJobLock.lock.find_by(lock_key: lock_key)
end
def release_active_job_lock_cache
@active_job_lock = nil
end
def existing_active_job_lock!
logger.info "Won't enqueue #{self.class.name} (Job ID: #{job_id}) because of already existing job with lock key '#{lock_key}'."
throw :abort
end
end

View file

@ -1,9 +1,16 @@
class ScheduledTouchJob < ApplicationJob class ScheduledTouchJob < ApplicationJob
def perform(klass_name, id) include HasActiveJobLock
klass_name.constantize.find_by(id: id)&.touch # rubocop:disable Rails/SkipsModelValidations
def lock_key
# "ScheduledTouchJob/User/42"
"#{self.class.name}/#{arguments[0]}/#{arguments[1]}"
end end
def self.touch_at(object, date) def self.touch_at(object, date)
set(wait_until: date).perform_later(object.class.to_s, object.id) set(wait_until: date).perform_later(object.class.to_s, object.id)
end end
def perform(klass_name, id)
klass_name.constantize.find_by(id: id)&.touch # rubocop:disable Rails/SkipsModelValidations
end
end end

View file

@ -1,9 +1,15 @@
class SearchIndexJob < ApplicationJob class SearchIndexJob < ApplicationJob
include HasActiveJobLock
retry_on StandardError, attempts: 20, wait: lambda { |executions| retry_on StandardError, attempts: 20, wait: lambda { |executions|
executions * 10.seconds executions * 10.seconds
} }
def lock_key
# "SearchIndexJob/User/42"
"#{self.class.name}/#{arguments[0]}/#{arguments[1]}"
end
def perform(object, o_id) def perform(object, o_id)
@object = object @object = object
@o_id = o_id @o_id = o_id

View file

@ -1,4 +1,6 @@
class SlaTicketRebuildEscalationJob < ApplicationJob class SlaTicketRebuildEscalationJob < ApplicationJob
include HasActiveJobLock
def perform def perform
Cache.delete('SLA::List::Active') Cache.delete('SLA::List::Active')
Ticket::Escalation.rebuild_all Ticket::Escalation.rebuild_all

View file

@ -1,4 +1,11 @@
class TicketOnlineNotificationSeenJob < ApplicationJob class TicketOnlineNotificationSeenJob < ApplicationJob
include HasActiveJobLock
def lock_key
# "TicketOnlineNotificationSeenJob/23/42"
"#{self.class.name}/#{arguments[0]}/#{arguments[1]}"
end
def perform(ticket_id, user_id) def perform(ticket_id, user_id)
user_id = user_id || 1 user_id = user_id || 1

View file

@ -1,4 +1,11 @@
class TicketUserTicketCounterJob < ApplicationJob class TicketUserTicketCounterJob < ApplicationJob
include HasActiveJobLock
def lock_key
# "TicketUserTicketCounterJob/23/42"
"#{self.class.name}/#{arguments[0]}/#{arguments[1]}"
end
def perform(customer_id, updated_by_id) def perform(customer_id, updated_by_id)
# check if update is needed # check if update is needed

View file

@ -0,0 +1,21 @@
class ActiveJobLock < ApplicationModel
def of?(active_job)
active_job.job_id == active_job_id
end
def peform_pending?
updated_at == created_at
end
def transfer_to(active_job)
logger.info "Transferring ActiveJobLock with id '#{id}' from active_job_id '#{active_job_id}' to active_job_id '#{active_job_id}'."
reset_time_stamp = Time.zone.now
update!(
active_job_id: active_job.job_id,
created_at: reset_time_stamp,
updated_at: reset_time_stamp
)
end
end

View file

@ -715,5 +715,14 @@ class CreateBase < ActiveRecord::Migration[4.2]
add_index :http_logs, [:created_at] add_index :http_logs, [:created_at]
add_foreign_key :http_logs, :users, column: :created_by_id add_foreign_key :http_logs, :users, column: :created_by_id
add_foreign_key :http_logs, :users, column: :updated_by_id add_foreign_key :http_logs, :users, column: :updated_by_id
create_table :active_job_locks do |t|
t.string :lock_key
t.string :active_job_id
t.timestamps
end
add_index :active_job_locks, :lock_key, unique: true
add_index :active_job_locks, :active_job_id, unique: true
end end
end end

View file

@ -0,0 +1,15 @@
class CreateActiveJobLocks < ActiveRecord::Migration[5.2]
def change
# return if it's a new setup
return if !Setting.find_by(name: 'system_init_done')
create_table :active_job_locks do |t|
t.string :lock_key
t.string :active_job_id
t.timestamps
end
add_index :active_job_locks, :lock_key, unique: true
add_index :active_job_locks, :active_job_id, unique: true
end
end

View file

@ -0,0 +1,6 @@
FactoryBot.define do
factory :active_job_lock do
lock_key { 'UniqueActiveJob' }
active_job_id { SecureRandom.uuid }
end
end

View file

@ -0,0 +1,120 @@
require 'rails_helper'
RSpec.describe HasActiveJobLock, type: :job do
before do
stub_const job_class_namespace, job_class
end
let(:job_class_namespace) { 'UniqueActiveJob' }
let(:job_class) do
Class.new(ApplicationJob) do
include HasActiveJobLock
cattr_accessor :perform_counter, default: 0
def perform
self.class.perform_counter += 1
end
end
end
shared_examples 'handle locking of jobs' do
context 'performing job is present' do
before { create(:active_job_lock, lock_key: job_class.name, created_at: 1.minute.ago, updated_at: 1.second.ago) }
it 'allows enqueueing of perform_later jobs' do
expect { job_class.perform_later }.to have_enqueued_job(job_class).exactly(:once)
end
it 'allows execution of perform_now jobs' do
expect { job_class.perform_now }.to change(job_class, :perform_counter).by(1)
end
end
context 'enqueued job is present' do
before { job_class.perform_later }
it "won't enqueue perform_later jobs" do
expect { job_class.perform_later }.not_to have_enqueued_job(job_class)
end
it 'allows execution of perform_now jobs' do
expect { job_class.perform_now }.to change(job_class, :perform_counter).by(1)
end
end
context 'running perform_now job' do
let(:job_class) do
Class.new(super()) do
cattr_accessor :task_completed, default: false
def perform(long_running: false)
if long_running
sleep(0.1) until self.class.task_completed
end
# don't pass parameters to super method
super()
end
end
end
let!(:thread) { Thread.new { job_class.perform_now(long_running: true) } }
after do
job_class.task_completed = true
thread.join
end
it 'enqueues perform_later jobs' do
expect { job_class.perform_later }.to have_enqueued_job(job_class)
end
it 'allows execution of perform_now jobs' do
expect { job_class.perform_now }.to change(job_class, :perform_counter).by(1)
end
end
context 'dynamic lock key' do
let(:job_class) do
Class.new(super()) do
def lock_key
"#{super}/#{arguments[0]}/#{arguments[1]}"
end
end
end
it 'queues one job per lock key' do
expect do
2.times { job_class.perform_later('User', 23) }
job_class.perform_later('User', 42)
end.to have_enqueued_job(job_class).exactly(:twice)
end
end
end
include_examples 'handle locking of jobs'
context 'custom lock key' do
let(:job_class) do
Class.new(super()) do
def lock_key
'custom_lock_key'
end
end
end
include_examples 'handle locking of jobs'
end
end

View file

@ -420,8 +420,8 @@ RSpec.describe 'Monitoring', type: :request do
# health_check - scheduler job count # health_check - scheduler job count
travel 2.seconds travel 2.seconds
8001.times do 8001.times do |fake_ticket_id|
SearchIndexJob.perform_later('Ticket', 1) SearchIndexJob.perform_later('Ticket', fake_ticket_id)
end end
Scheduler.where(active: true).each do |local_scheduler| Scheduler.where(active: true).each do |local_scheduler|
local_scheduler.last_run = Time.zone.now local_scheduler.last_run = Time.zone.now
@ -579,12 +579,8 @@ RSpec.describe 'Monitoring', type: :request do
expect('test4').to eq(json_response['name']) expect('test4').to eq(json_response['name'])
expect('Test 4').to eq(json_response['display']) expect('Test 4').to eq(json_response['display'])
jobs = Delayed::Job.all
4.times do 4.times do
jobs.each do |job| Delayed::Worker.new.work_off
Delayed::Worker.new.run(job)
end
end end
# health_check # health_check
@ -595,7 +591,7 @@ RSpec.describe 'Monitoring', type: :request do
expect(json_response['message']).to be_truthy expect(json_response['message']).to be_truthy
expect(json_response['issues']).to be_truthy expect(json_response['issues']).to be_truthy
expect(json_response['healthy']).to eq(false) expect(json_response['healthy']).to eq(false)
expect( json_response['message']).to eq("Failed to run background job #1 'SearchIndexJob' 4 time(s) with 4 attempt(s).") expect( json_response['message']).to eq("Failed to run background job #1 'SearchIndexJob' 1 time(s) with 1 attempt(s).")
# add another job # add another job
manual_added = SearchIndexJob.perform_later('Ticket', 1) manual_added = SearchIndexJob.perform_later('Ticket', 1)
@ -609,10 +605,10 @@ RSpec.describe 'Monitoring', type: :request do
expect(json_response['message']).to be_truthy expect(json_response['message']).to be_truthy
expect(json_response['issues']).to be_truthy expect(json_response['issues']).to be_truthy
expect(json_response['healthy']).to eq(false) expect(json_response['healthy']).to eq(false)
expect( json_response['message']).to eq("Failed to run background job #1 'SearchIndexJob' 5 time(s) with 14 attempt(s).") expect( json_response['message']).to eq("Failed to run background job #1 'SearchIndexJob' 2 time(s) with 11 attempt(s).")
# add another job # add another job
dummy_class = Class.new do dummy_class = Class.new(ApplicationJob) do
def perform def perform
puts 'work work' puts 'work work'
@ -630,7 +626,7 @@ RSpec.describe 'Monitoring', type: :request do
expect(json_response['message']).to be_truthy expect(json_response['message']).to be_truthy
expect(json_response['issues']).to be_truthy expect(json_response['issues']).to be_truthy
expect(json_response['healthy']).to eq(false) expect(json_response['healthy']).to eq(false)
expect( json_response['message']).to eq("Failed to run background job #1 'Object' 1 time(s) with 5 attempt(s).;Failed to run background job #2 'SearchIndexJob' 5 time(s) with 14 attempt(s).") expect( json_response['message']).to eq("Failed to run background job #1 'Object' 1 time(s) with 5 attempt(s).;Failed to run background job #2 'SearchIndexJob' 2 time(s) with 11 attempt(s).")
# reset settings # reset settings
Setting.set('es_url', prev_es_config) Setting.set('es_url', prev_es_config)
@ -649,7 +645,7 @@ RSpec.describe 'Monitoring', type: :request do
expect(json_response['message']).to be_truthy expect(json_response['message']).to be_truthy
expect(json_response['issues']).to be_truthy expect(json_response['issues']).to be_truthy
expect(json_response['healthy']).to eq(false) expect(json_response['healthy']).to eq(false)
expect(json_response['message']).to eq("16 failing background jobs;Failed to run background job #1 'Object' 5 time(s) with 25 attempt(s).;Failed to run background job #2 'SearchIndexJob' 5 time(s) with 14 attempt(s).") expect(json_response['message']).to eq("13 failing background jobs;Failed to run background job #1 'Object' 8 time(s) with 40 attempt(s).;Failed to run background job #2 'SearchIndexJob' 2 time(s) with 11 attempt(s).")
# cleanup # cleanup
Delayed::Job.delete_all Delayed::Job.delete_all

View file

@ -9,6 +9,7 @@ module ZammadActiveJobHelper
def clear_jobs def clear_jobs
enqueued_jobs.clear enqueued_jobs.clear
performed_jobs.clear performed_jobs.clear
ActiveJobLock.destroy_all
end end
end end