Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Gustavo Caso committed Dec 23, 2019
1 parent 136117f commit 55baa66
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 19 deletions.
33 changes: 20 additions & 13 deletions lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module Iteration
)

define_callbacks :start
define_callbacks :reenqueue
define_callbacks :shutdown
define_callbacks :complete
end
Expand All @@ -32,6 +33,10 @@ def on_shutdown(*filters, &blk)
set_callback(:shutdown, :after, *filters, &blk)
end

def on_reenqueue(*filters, &blk)
set_callback(:reenqueue, :before, *filters, &blk)
end

def on_complete(*filters, &blk)
set_callback(:complete, :after, *filters, &blk)
end
Expand Down Expand Up @@ -74,6 +79,18 @@ def retry_job(*)
@retried = true
end

def reenqueue_iteration_job(options = {})
self.executions -= 1 if executions > 1
ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags)
logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}")

adjust_total_time
self.times_interrupted += 1

self.already_in_queue = true if respond_to?(:already_in_queue=)
retry_job(options)
end

private

def enumerator_builder
Expand Down Expand Up @@ -123,8 +140,9 @@ def iterate_with_enumerator(enumerator, arguments)
end

next unless job_should_exit?
self.executions -= 1 if executions > 1
reenqueue_iteration_job
run_callbacks(:reenqueue) do
reenqueue_iteration_job
end
return false
end

Expand All @@ -137,17 +155,6 @@ def record_unit_of_work
end
end

def reenqueue_iteration_job
ActiveSupport::Notifications.instrument("interrupted.iteration", iteration_instrumentation_tags)
logger.info("[JobIteration::Iteration] Interrupting and re-enqueueing the job cursor_position=#{cursor_position}")

adjust_total_time
self.times_interrupted += 1

self.already_in_queue = true if respond_to?(:already_in_queue=)
retry_job
end

def adjust_total_time
self.total_time += (Time.now.utc.to_f - start_time.to_f).round(6)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/job-iteration/throttle_enumerator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def to_enum
@enum.each do |*val|
if should_throttle?
ActiveSupport::Notifications.instrument("throttled.iteration", job_class: @job.class.name)
@job.retry_job(wait: @backoff)
@job.reenqueue_iteration_job(wait: @backoff)
throw(:abort, :skip_complete_callbacks)
end

Expand Down
33 changes: 33 additions & 0 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class SimpleIterationJob < ActiveJob::Base
self.on_complete_called = 0
cattr_accessor :on_shutdown_called, instance_accessor: false
self.on_shutdown_called = 0
cattr_accessor :on_reenqueue_called, instance_accessor: false
self.on_reenqueue_called = 0

on_start do
self.class.on_start_called += 1
Expand All @@ -26,6 +28,10 @@ class SimpleIterationJob < ActiveJob::Base
on_shutdown do
self.class.on_shutdown_called += 1
end

on_reenqueue do
self.class.on_reenqueue_called += 1
end
end

class MultiArgumentIterationJob < SimpleIterationJob
Expand Down Expand Up @@ -61,6 +67,12 @@ def each_iteration(record)
end
end

class ActiveRecordIterationJobHaltReenqueue < ActiveRecordIterationJob
on_reenqueue do |job|
throw(:abort) if job.times_interrupted > 0
end
end

class BatchActiveRecordIterationJob < SimpleIterationJob
def build_enumerator(cursor:)
enumerator_builder.active_record_on_batches(
Expand Down Expand Up @@ -297,6 +309,7 @@ def setup
klass.on_start_called = 0
klass.on_complete_called = 0
klass.on_shutdown_called = 0
klass.on_reenqueue_called = 0
end
JobShouldExitJob.records_performed = []
super
Expand Down Expand Up @@ -329,6 +342,7 @@ def test_works_with_private_methods
assert_equal(1, PrivateIterationJob.on_start_called)
assert_equal(1, PrivateIterationJob.on_complete_called)
assert_equal(1, PrivateIterationJob.on_shutdown_called)
assert_equal(0, PrivateIterationJob.on_reenqueue_called)
end

def test_failing_job
Expand Down Expand Up @@ -379,6 +393,7 @@ def test_active_record_job

assert_equal(0, ActiveRecordIterationJob.on_complete_called)
work_one_job
assert_equal(1, ActiveRecordIterationJob.on_reenqueue_called)

assert_equal(2, ActiveRecordIterationJob.records_performed.size)

Expand All @@ -389,6 +404,7 @@ def test_active_record_job

work_one_job
assert_equal(4, ActiveRecordIterationJob.records_performed.size)
assert_equal(2, ActiveRecordIterationJob.on_reenqueue_called)

job = peek_into_queue
assert_equal(2, job.times_interrupted)
Expand All @@ -401,6 +417,23 @@ def test_active_record_job
assert_equal(2, ActiveRecordIterationJob.on_shutdown_called)
end

def test_active_record_job_halt_reenqueue
iterate_exact_times(3.times)

push(ActiveRecordIterationJobHaltReenqueue)
assert_jobs_in_queue(1)

work_one_job
assert_equal(1, ActiveRecordIterationJob.on_reenqueue_called)
assert_equal(3, ActiveRecordIterationJob.records_performed.size)
assert_jobs_in_queue(1)

work_one_job
assert_equal(2, ActiveRecordIterationJob.on_reenqueue_called)
assert_equal(6, ActiveRecordIterationJob.records_performed.size)
assert_jobs_in_queue(0) # By throwing abort on the job we avoided the job to be reenqueue, so there should be no jobs in the queue
end

def test_activerecord_batches_complete
push(BatchActiveRecordIterationJob)
processed_records = Product.order(:id).pluck(:id)
Expand Down
32 changes: 27 additions & 5 deletions test/unit/throttle_enumerator_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class IterationThrottleJob < ActiveJob::Base

cattr_accessor :on_complete_called, instance_accessor: false
self.on_complete_called = 0
cattr_accessor :on_reenqueue_called, instance_accessor: false
self.on_reenqueue_called = 0

cattr_accessor :should_throttle_sequence, instance_accessor: false
self.should_throttle_sequence = []
Expand All @@ -20,6 +22,10 @@ class IterationThrottleJob < ActiveJob::Base
self.class.on_complete_called += 1
end

on_reenqueue do
self.class.on_reenqueue_called += 1
end

def build_enumerator(_params, cursor:)
enumerator_builder.build_throttle_enumerator(
enumerator_builder.build_array_enumerator(
Expand All @@ -36,13 +42,18 @@ def each_iteration(record, _params)
end
end

setup do
IterationThrottleJob.iterations_performed = []
class IterationThrottleJobHaltReenqueue < IterationThrottleJob
on_reenqueue do |_job|
throw(:abort)
end
end

teardown do
IterationThrottleJob.on_complete_called = 0
IterationThrottleJob.should_throttle_sequence = []
setup do
IterationThrottleJob.descendants.each do |klass|
klass.iterations_performed = []
klass.on_complete_called = 0
klass.on_reenqueue_called = 0
end
end

test "throttle enumerator proxies wrapped enumerator" do
Expand Down Expand Up @@ -92,6 +103,17 @@ def each_iteration(record, _params)
assert_equal [1], IterationThrottleJob.iterations_performed
end

test "do not push back to queue if reenqueue callback abort" do
IterationThrottleJobHaltReenqueue.should_throttle_sequence = [false, true, false]

IterationThrottleJobHaltReenqueue.perform_now({})

enqueued = ActiveJob::Base.queue_adapter.enqueued_jobs
assert_equal 0, enqueued.size

assert_equal [1], IterationThrottleJobHaltReenqueue.iterations_performed
end

test "does not pushed back to queue if not throttle" do
assert_predicate ActiveJob::Base.queue_adapter.enqueued_jobs, :empty?

Expand Down

0 comments on commit 55baa66

Please sign in to comment.