Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Wait to reenqueue job post-interrupt until after callbacks are finished #66

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ module Iteration
define_callbacks :start
define_callbacks :shutdown
define_callbacks :complete

rescue_from(Exception) do
reenqueue_iteration_job if reenqueue_iteration_job?
raise
end

after_perform(prepend: true, if: :reenqueue_iteration_job?) { reenqueue_iteration_job }
end

module ClassMethods
Expand Down Expand Up @@ -127,8 +134,8 @@ def iterate_with_enumerator(enumerator, arguments)
end

next unless job_should_exit?
@reenqueue_iteration_job = true
self.executions -= 1 if executions > 1
reenqueue_iteration_job
return false
end

Expand All @@ -146,6 +153,10 @@ def record_unit_of_work
end
end

def reenqueue_iteration_job?
defined?(@reenqueue_iteration_job) && @reenqueue_iteration_job
end

def reenqueue_iteration_job
ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags)
logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}")
Expand Down
15 changes: 15 additions & 0 deletions test/integration/sidekiq_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ class SidekiqIntegrationTest < ActiveSupport::TestCase
assert_equal 0, queue_size
end

test "allows callbacks to finish before reenqueuing job after interrupt" do
out, _ = capture_subprocess_io do
CallbacksJob.perform_later
start_sidekiq_and_wait
end

expected_callbacks_order = [["before_enqueue"], ["on_shutdown"], ["before_enqueue"]]
assert_equal expected_callbacks_order, out.scan(/callback: ([^\s]+)/)

TerminateJob.perform_later
start_sidekiq_and_wait

assert_equal 0, queue_size
end

private

def start_sidekiq_and_wait
Expand Down
16 changes: 16 additions & 0 deletions test/support/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,19 @@ def perform
Process.kill("TERM", Process.pid)
end
end

class CallbacksJob < IterationJob
include JobIteration::Iteration

before_enqueue { puts "callback: before_enqueue" }
on_shutdown { puts "callback: on_shutdown" }

def build_enumerator(cursor:)
enumerator_builder.times(2, cursor: cursor)
end

def each_iteration(element)
Process.kill("TERM", Process.pid) if element == 0
sleep(1)
end
end