-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serialize Cursors #81
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,28 +18,6 @@ module Iteration | |||||
# The time isn't reset if the job is interrupted. | ||||||
attr_accessor :total_time | ||||||
|
||||||
class CursorError < ArgumentError | ||||||
sambostock marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
attr_reader :cursor | ||||||
|
||||||
def initialize(message, cursor:) | ||||||
super(message) | ||||||
@cursor = cursor | ||||||
end | ||||||
|
||||||
def message | ||||||
"#{super} (#{inspected_cursor})" | ||||||
end | ||||||
|
||||||
private | ||||||
|
||||||
def inspected_cursor | ||||||
cursor.inspect | ||||||
rescue NoMethodError | ||||||
# For those brave enough to try to use BasicObject as cursor. Nice try. | ||||||
Object.instance_method(:inspect).bind(cursor).call | ||||||
end | ||||||
end | ||||||
|
||||||
included do |_base| | ||||||
define_callbacks :start | ||||||
define_callbacks :shutdown | ||||||
|
@@ -50,6 +28,12 @@ def inspected_cursor | |||||
instance_accessor: false, | ||||||
instance_predicate: false, | ||||||
) | ||||||
|
||||||
class_attribute( | ||||||
:job_iteration_enforce_serializable_cursors, | ||||||
instance_accessor: false, | ||||||
instance_predicate: false, | ||||||
) | ||||||
end | ||||||
|
||||||
module ClassMethods | ||||||
|
@@ -88,16 +72,25 @@ def initialize(*arguments) | |||||
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true) | ||||||
|
||||||
def serialize # @private | ||||||
super.merge( | ||||||
"cursor_position" => cursor_position, | ||||||
iteration_job_data = { | ||||||
"cursor_position" => cursor_position, # Backwards compatibility | ||||||
"times_interrupted" => times_interrupted, | ||||||
"total_time" => total_time, | ||||||
) | ||||||
} | ||||||
|
||||||
begin | ||||||
iteration_job_data["serialized_cursor_position"] = serialize_cursor(cursor_position) | ||||||
rescue ActiveJob::SerializationError | ||||||
raise if job_iteration_enforce_serializable_cursors? | ||||||
# No point in duplicating the deprecation warning from assert_valid_cursor! | ||||||
end | ||||||
|
||||||
super.merge(iteration_job_data) | ||||||
end | ||||||
|
||||||
def deserialize(job_data) # @private | ||||||
super | ||||||
self.cursor_position = job_data["cursor_position"] | ||||||
self.cursor_position = cursor_position_from_job_data(job_data) | ||||||
self.times_interrupted = Integer(job_data["times_interrupted"] || 0) | ||||||
self.total_time = Float(job_data["total_time"] || 0.0) | ||||||
end | ||||||
|
@@ -167,8 +160,7 @@ def iterate_with_enumerator(enumerator, arguments) | |||||
@needs_reenqueue = false | ||||||
|
||||||
enumerator.each do |object_from_enumerator, cursor_from_enumerator| | ||||||
# Deferred until 2.0.0 | ||||||
# assert_valid_cursor!(cursor_from_enumerator) | ||||||
assert_valid_cursor!(cursor_from_enumerator) | ||||||
|
||||||
tags = instrumentation_tags.merge(cursor_position: cursor_from_enumerator) | ||||||
ActiveSupport::Notifications.instrument("each_iteration.iteration", tags) do | ||||||
|
@@ -222,16 +214,16 @@ def build_enumerator(params, cursor:) | |||||
EOS | ||||||
end | ||||||
|
||||||
# The adapter must be able to serialize and deserialize the cursor back into an equivalent object. | ||||||
# https://github.com/mperham/sidekiq/wiki/Best-Practices#1-make-your-job-parameters-small-and-simple | ||||||
def assert_valid_cursor!(cursor) | ||||||
return if serializable?(cursor) | ||||||
|
||||||
raise CursorError.new( | ||||||
"Cursor must be composed of objects capable of built-in (de)serialization: " \ | ||||||
"Strings, Integers, Floats, Arrays, Hashes, true, false, or nil.", | ||||||
cursor: cursor, | ||||||
) | ||||||
serialize_cursor(cursor) | ||||||
rescue ActiveJob::SerializationError | ||||||
raise if job_iteration_enforce_serializable_cursors? | ||||||
|
||||||
Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(3)) | ||||||
The Enumerator returned by #{self.class.name}#build_enumerator yielded a cursor which is unsafe to serialize. | ||||||
See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to match the heading we added.
Suggested change
|
||||||
This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!" | ||||||
DEPRECATION_MESSAGE | ||||||
end | ||||||
|
||||||
def assert_implements_methods! | ||||||
|
@@ -286,6 +278,13 @@ def job_iteration_max_job_runtime | |||||
[global_max, class_max].min | ||||||
end | ||||||
|
||||||
def job_iteration_enforce_serializable_cursors? # TODO: Add a test for the edge case of registering it afterwards | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did we add this test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch – yes, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then we should remove the TODO 🙂 |
||||||
per_class_setting = self.class.job_iteration_enforce_serializable_cursors | ||||||
return per_class_setting unless per_class_setting.nil? | ||||||
|
||||||
!!JobIteration.enforce_serializable_cursors | ||||||
end | ||||||
|
||||||
def handle_completed(completed) | ||||||
case completed | ||||||
when nil # someone aborted the job but wants to call the on_complete callback | ||||||
|
@@ -305,6 +304,25 @@ def handle_completed(completed) | |||||
raise "Unexpected thrown value: #{completed.inspect}" | ||||||
end | ||||||
|
||||||
def cursor_position_from_job_data(job_data) | ||||||
if job_data.key?("serialized_cursor_position") | ||||||
deserialize_cursor(job_data.fetch("serialized_cursor_position")) | ||||||
Comment on lines
+308
to
+309
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've updated this so if a serialized cursor is found, it should always be deserializable. |
||||||
else | ||||||
# Backwards compatibility for | ||||||
# - jobs interrupted before cursor serialization feature shipped, or | ||||||
# - jobs whose cursor could not be serialized | ||||||
job_data.fetch("cursor_position", nil) | ||||||
end | ||||||
end | ||||||
|
||||||
def serialize_cursor(cursor) | ||||||
ActiveJob::Arguments.serialize([cursor]).first | ||||||
end | ||||||
|
||||||
def deserialize_cursor(cursor) | ||||||
ActiveJob::Arguments.deserialize([cursor]).first | ||||||
end | ||||||
|
||||||
def valid_cursor_parameter?(parameters) | ||||||
# this condition is when people use the splat operator. | ||||||
# def build_enumerator(*) | ||||||
|
@@ -316,21 +334,5 @@ def valid_cursor_parameter?(parameters) | |||||
end | ||||||
false | ||||||
end | ||||||
|
||||||
SIMPLE_SERIALIZABLE_CLASSES = [String, Integer, Float, NilClass, TrueClass, FalseClass].freeze | ||||||
private_constant :SIMPLE_SERIALIZABLE_CLASSES | ||||||
def serializable?(object) | ||||||
# Subclasses must be excluded, hence not using is_a? or ===. | ||||||
if object.instance_of?(Array) | ||||||
object.all? { |element| serializable?(element) } | ||||||
elsif object.instance_of?(Hash) | ||||||
object.all? { |key, value| serializable?(key) && serializable?(value) } | ||||||
else | ||||||
SIMPLE_SERIALIZABLE_CLASSES.any? { |klass| object.instance_of?(klass) } | ||||||
end | ||||||
rescue NoMethodError | ||||||
# BasicObject doesn't respond to instance_of, but we can't serialize it anyway | ||||||
false | ||||||
end | ||||||
end | ||||||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
# frozen_string_literal: true | ||
|
||
return unless defined?(Rails::Railtie) | ||
|
||
module JobIteration | ||
class Railtie < Rails::Railtie | ||
initializer "job_iteration.register_deprecator" do |app| | ||
app.deprecators[:job_iteration] = JobIteration::Deprecation | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,11 +15,12 @@ def each_iteration(omg) | |
end | ||
end | ||
|
||
class TimeCursorJob < ActiveJob::Base | ||
class UnserializableCursorJob < ActiveJob::Base | ||
include JobIteration::Iteration | ||
UnserializableCursor = Class.new | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we're getting all of |
||
|
||
def build_enumerator(cursor:) | ||
return [["item", Time.now]].to_enum if cursor.nil? | ||
return [["item", UnserializableCursor.new]].to_enum if cursor.nil? | ||
|
||
raise "This should never run; cursor is unserializable!" | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally used
ActiveSupport::Deprecation.warn
directly, that labelled deprecations as coming from Rails. This uses our own object, but does mean that the host app's settings forActiveSupport::Deprecation
do not propagate (e.g.disallowed_deprecations
).