Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interruption hook #104

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ def on_complete(*filters, &blk)
set_callback(:complete, :after, *filters, &blk)
end

def stop_iterating_on(interrupt_proc)
interruptors << interrupt_proc
end

def interruptors
@interruptors ||= [
->(job) { JobIteration.max_job_runtime && job.start_time && (Time.now.utc - job.start_time) > JobIteration.max_job_runtime },
->(_) { JobIteration.interruption_adapter.call },
]
@interruptors
end

private

def ban_perform_definition
Expand Down Expand Up @@ -256,11 +268,8 @@ def output_interrupt_summary
end

def job_should_exit?
if ::JobIteration.max_job_runtime && start_time && (Time.now.utc - start_time) > ::JobIteration.max_job_runtime
return true
end

JobIteration.interruption_adapter.call || (defined?(super) && super)
self.class.interruptors.any? { |interrupt_proc| interrupt_proc.call(self) } ||
(defined?(super) && super)
end

def run_complete_callbacks?(completed)
Expand Down
27 changes: 27 additions & 0 deletions test/unit/iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,27 @@ def each_iteration(*)
end
end

class JobThatInterruptsItself < ActiveJob::Base
include JobIteration::Iteration

cattr_accessor :iterations_performed, instance_accessor: false
self.iterations_performed = []

stop_iterating_on(->(job) { job.class.iterations_performed.size >= 1 })

def build_enumerator(_params, cursor:)
enumerator_builder.build_array_enumerator([1, 2, 3], cursor: cursor)
end

def each_iteration(record, _params)
self.class.iterations_performed << record
end
end

setup do
JobThatInterruptsItself.iterations_performed = []
end

def test_jobs_that_define_build_enumerator_and_each_iteration_will_not_raise
push(JobWithRightMethods, "walrus" => "best")
work_one_job
Expand Down Expand Up @@ -242,6 +263,12 @@ def test_jobs_using_on_complete_have_accurate_total_time
end
end

def test_jobs_can_define_custom_interruption_triggers
JobThatInterruptsItself.perform_now({})

assert_equal([1], JobThatInterruptsItself.iterations_performed)
end

private

def assert_raises_cursor_error(&block)
Expand Down