Skip to content

Commit

Permalink
Serialize cursor_position
Browse files Browse the repository at this point in the history
Previously, `cursor_position` was handed as-is to the queue adapter. This could
lead to the queue adapter corrupting cursors of certain classes. For example,
if given a `Time` cursor, Sidekiq would save it as JSON by calling `to_s`,
resulting in the deserialized cursor being a `String` instead of a `Time`.

To prevent this, we now leverage `ActiveJob::Arguments` to (de)serialize the
`cursor_position` and ensure it will make the round trip safely.

However, as this is a breaking change (as unsafe cursors would previously be
accepted, but possibly corrupted, whereas they would now be rejected), we begin
by rescuing (de)serialization failures and emitting a deprecation warning.

Starting in Job Iteration version 2.0, the deprecation warning will be removed,
and (de)serialization failure will raise.

Application owners can opt-in to the 2.0 behavior either globally by setting

    JobIteration.enforce_serializable_cursors = true

or on an inheritable per-class basis by setting

    class MyJob < ActiveJob::Base
      include JobIteration::Iteration
      self.job_iteration_enforce_serializable_cursors = true
      # ...
    end
  • Loading branch information
sambostock committed Nov 20, 2023
1 parent f6bd25c commit f39525d
Show file tree
Hide file tree
Showing 8 changed files with 391 additions and 158 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

- [437](https://github.com/Shopify/job-iteration/pull/437) - Use minimum between per-class `job_iteration_max_job_runtime` and `JobIteration.max_job_runtime`, instead of enforcing only setting decreasing values.
Because it is possible to change the global or parent values after setting the value on a class, it is not possible to truly enforce the decreasing value constraint. Instead, we now use the minimum between the global value and per-class value. This is considered a non-breaking change, as it should not break any **existing** code, it only removes the constraint on new classes.
- [81](https://github.com/Shopify/job-iteration/pull/81) - Serialize cursors using `ActiveJob::Arguments` & deprecated unserializable cursors.
Cursor serialization has been dependent on the adapter's serialization method, which typically uses `JSON.dump` and `JSON.load`, meaning only JSON-serializable objects could be used as cursors. Using `ActiveJob::Arguments` to serialize cursors instead allows the use of any object that can be serialized using `ActiveJob::Arguments.serialize` and deserialized using `ActiveJob::Arguments.deserialize`, such as `Time` objects, which would previously be lossily serialized as strings.
This change is backwards compatible, by using a new job argument for the serialized cursor, but continuing to write to the old argument, ensuring that jobs can be processed regardless of if they are enqueued or dequeued with the old or new version of the gem.
In the event that a cursor is not serializable, the gem will fall back to the deprecated old behaviour. In Job Iteration 2.0, this fallback will be removed, and cursors will be required to be serializable, raising otherwise. To opt-in to this behaviour, set `JobIteration.enforce_serializable_cursors = true`. To support gradual migration, a per-class `job_iteration_enforce_serializable_cursors` option is also available, which overrides the global option for that class.

### Bug fixes

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ class MyJob < ApplicationJob
end
```

See [the guide on Custom Enumerator](guides/custom-enumerator.md) for details.

## Credits

This project would not be possible without these individuals (in alphabetical order):
Expand Down
57 changes: 57 additions & 0 deletions guides/custom-enumerator.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,68 @@ and you initiate the job with
LoadRefundsForChargeJob.perform_later(charge_id = "chrg_345")
```

### Cursor Serialization

Cursors should be of a [type that Active Job can serialize](https://guides.rubyonrails.org/active_job_basics.html#supported-types-for-arguments).

For example, consider:

```ruby
FancyCursor = Struct.new(:wrapped_value) do
def to_s
wrapped_value
end
end
```

```ruby
def build_enumerator(cursor:)
Enumerator.new do |yielder|
# ...something with fancy cursor...
yielder.yield 123, FancyCursor.new(:abc)
end
end
```

If this job was interrupted, Active Job would be unable to serialize
`FancyCursor`, and Job Iteration would fallback to the legacy behavior of not
serializing the cursor. This would typically result in the queue adapter
eventually serializing the cursor as JSON by calling `.to_s` on it. The cursor
would be deserialized as `:abc`, rather than the intended `FancyCursor.new(:abc)`.

To avoid this, job authors should take care to ensure that their cursor is
serializable by Active Job. This can be done in a couple ways, such as:
- [adding `GlobalID` support to the cursor class](https://guides.rubyonrails.org/active_job_basics.html#globalid)
- [implementing a custom Active Job argument serializer for the cursor class](https://guides.rubyonrails.org/active_job_basics.html#serializers)
- handling (de)serialization in the job/enumerator itself
```ruby
def build_enumerator(cursor:)
fancy_cursor = FancyCursor.new(cursor) unless cursor.nil?
Enumerator.new do |yielder|
# ...something with fancy cursor...
yielder.yield 123, FancyCursor(:abc).wrapped_value
end
end
```
Note that starting in 2.0, Job Iteration will stop supporting fallback behavior
and raise when it encounters an unserializable cursor. To opt-in to this behavior early, set
```ruby
JobIteration.enforce_serializable_cursors = true
```
or, to support gradual migration, a per-class option is also available to override the global value, if set:
```ruby
class MyJob < ActiveJob::Base
include JobIteration::Iteration
self.job_iteration_enforce_serializable_cursors = true
```


## Cursorless enumerator

Sometimes you can ignore the cursor. Consider the following custom `Enumerator` that takes items from a Redis list, which
is essentially a queue. Even if this job doesn't need to persist a cursor in order to resume, it can still use
`Iteration`'s signal handling to finish `each_iteration` and gracefully terminate.
`Iteration`'s signal handling to finish `each_iteration` and gracefully terminate.
```ruby
class RedisPopListJob < ActiveJob::Base
Expand Down
20 changes: 20 additions & 0 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ module JobIteration

INTEGRATIONS = [:resque, :sidekiq]

Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration")

extend self

attr_writer :logger
Expand Down Expand Up @@ -57,6 +59,24 @@ def logger
# where the throttle backoff value will take precedence over this setting.
attr_accessor :default_retry_backoff

# Set this to `true` to enforce that cursors be composed of objects capable
# of built-in (de)serialization by Active Job.
#
# JobIteration.enforce_serializable_cursors = true
#
# For more granular control, this can also be configured per job class, and
# is inherited by child classes.
#
# class MyJob < ActiveJob::Base
# include JobIteration::Iteration
# self.job_iteration_enforce_serializable_cursors = false
# # ...
# end
#
# Note that non-enforcement is deprecated and enforcement will be mandatory
# in version 2.0, at which point this config will go away.
attr_accessor :enforce_serializable_cursors

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
attr_accessor :interruption_adapter

Expand Down
109 changes: 57 additions & 52 deletions lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,6 @@ module Iteration
# The time isn't reset if the job is interrupted.
attr_accessor :total_time

class CursorError < ArgumentError
attr_reader :cursor

def initialize(message, cursor:)
super(message)
@cursor = cursor
end

def message
"#{super} (#{inspected_cursor})"
end

private

def inspected_cursor
cursor.inspect
rescue NoMethodError
# For those brave enough to try to use BasicObject as cursor. Nice try.
Object.instance_method(:inspect).bind(cursor).call
end
end

included do |_base|
define_callbacks :start
define_callbacks :shutdown
Expand All @@ -50,6 +28,12 @@ def inspected_cursor
instance_accessor: false,
instance_predicate: false,
)

class_attribute(
:job_iteration_enforce_serializable_cursors,
instance_accessor: false,
instance_predicate: false,
)
end

module ClassMethods
Expand Down Expand Up @@ -88,16 +72,25 @@ def initialize(*arguments)
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)

def serialize # @private
super.merge(
"cursor_position" => cursor_position,
iteration_job_data = {
"cursor_position" => cursor_position, # Backwards compatibility
"times_interrupted" => times_interrupted,
"total_time" => total_time,
)
}

begin
iteration_job_data["serialized_cursor_position"] = serialize_cursor(cursor_position)
rescue ActiveJob::SerializationError
raise if job_iteration_enforce_serializable_cursors?
# No point in duplicating the deprecation warning from assert_valid_cursor!
end

super.merge(iteration_job_data)
end

def deserialize(job_data) # @private
super
self.cursor_position = job_data["cursor_position"]
self.cursor_position = cursor_position_from_job_data(job_data)
self.times_interrupted = Integer(job_data["times_interrupted"] || 0)
self.total_time = Float(job_data["total_time"] || 0.0)
end
Expand Down Expand Up @@ -167,8 +160,7 @@ def iterate_with_enumerator(enumerator, arguments)
@needs_reenqueue = false

enumerator.each do |object_from_enumerator, cursor_from_enumerator|
# Deferred until 2.0.0
# assert_valid_cursor!(cursor_from_enumerator)
assert_valid_cursor!(cursor_from_enumerator)

tags = instrumentation_tags.merge(cursor_position: cursor_from_enumerator)
ActiveSupport::Notifications.instrument("each_iteration.iteration", tags) do
Expand Down Expand Up @@ -222,16 +214,19 @@ def build_enumerator(params, cursor:)
EOS
end

# The adapter must be able to serialize and deserialize the cursor back into an equivalent object.
# https://github.com/mperham/sidekiq/wiki/Best-Practices#1-make-your-job-parameters-small-and-simple
def assert_valid_cursor!(cursor)
return if serializable?(cursor)
serialize_cursor(cursor)
true
rescue ActiveJob::SerializationError
raise if job_iteration_enforce_serializable_cursors?

raise CursorError.new(
"Cursor must be composed of objects capable of built-in (de)serialization: " \
"Strings, Integers, Floats, Arrays, Hashes, true, false, or nil.",
cursor: cursor,
)
Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(3))
The Enumerator returned by #{self.class.name}#build_enumerator yielded a cursor which is unsafe to serialize.
See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types
This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!"
DEPRECATION_MESSAGE

false
end

def assert_implements_methods!
Expand Down Expand Up @@ -286,6 +281,13 @@ def job_iteration_max_job_runtime
[global_max, class_max].min
end

def job_iteration_enforce_serializable_cursors? # TODO: Add a test for the edge case of registering it afterwards
per_class_setting = self.class.job_iteration_enforce_serializable_cursors
return per_class_setting unless per_class_setting.nil?

!!JobIteration.enforce_serializable_cursors
end

def handle_completed(completed)
case completed
when nil # someone aborted the job but wants to call the on_complete callback
Expand All @@ -305,6 +307,25 @@ def handle_completed(completed)
raise "Unexpected thrown value: #{completed.inspect}"
end

def cursor_position_from_job_data(job_data)
if job_data.key?("serialized_cursor_position")
deserialize_cursor(job_data.fetch("serialized_cursor_position"))
else
# Backwards compatibility for
# - jobs interrupted before cursor serialization feature shipped, or
# - jobs whose cursor could not be serialized
job_data.fetch("cursor_position", nil)
end
end

def serialize_cursor(cursor)
ActiveJob::Arguments.serialize([cursor]).first
end

def deserialize_cursor(cursor)
ActiveJob::Arguments.deserialize([cursor]).first
end

def valid_cursor_parameter?(parameters)
# this condition is when people use the splat operator.
# def build_enumerator(*)
Expand All @@ -316,21 +337,5 @@ def valid_cursor_parameter?(parameters)
end
false
end

SIMPLE_SERIALIZABLE_CLASSES = [String, Integer, Float, NilClass, TrueClass, FalseClass].freeze
private_constant :SIMPLE_SERIALIZABLE_CLASSES
def serializable?(object)
# Subclasses must be excluded, hence not using is_a? or ===.
if object.instance_of?(Array)
object.all? { |element| serializable?(element) }
elsif object.instance_of?(Hash)
object.all? { |key, value| serializable?(key) && serializable?(value) }
else
SIMPLE_SERIALIZABLE_CLASSES.any? { |klass| object.instance_of?(klass) }
end
rescue NoMethodError
# BasicObject doesn't respond to instance_of, but we can't serialize it anyway
false
end
end
end
9 changes: 2 additions & 7 deletions test/integration/integration_behaviour.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,12 @@ module IntegrationBehaviour

test "unserializable corruption is prevented" do
skip_until_version("2.0.0")
# Cursors are serialized as JSON, but not all objects are serializable.
# time = Time.at(0).utc # => 1970-01-01 00:00:00 UTC
# json = JSON.dump(time) # => "\"1970-01-01 00:00:00 UTC\""
# string = JSON.parse(json) # => "1970-01-01 00:00:00 UTC"
# We serialized a Time, but it was deserialized as a String.
TimeCursorJob.perform_later
UnserializableCursorJob.perform_later
TerminateJob.perform_later
start_worker_and_wait

assert_equal(
JobIteration::Iteration::CursorError.name,
ActiveJob::SerializationError.name,
failed_job_error_class_name,
)
end
Expand Down
5 changes: 3 additions & 2 deletions test/support/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ def each_iteration(omg)
end
end

class TimeCursorJob < ActiveJob::Base
class UnserializableCursorJob < ActiveJob::Base
include JobIteration::Iteration
UnserializableCursor = Class.new

def build_enumerator(cursor:)
return [["item", Time.now]].to_enum if cursor.nil?
return [["item", UnserializableCursor.new]].to_enum if cursor.nil?

raise "This should never run; cursor is unserializable!"
end
Expand Down
Loading

0 comments on commit f39525d

Please sign in to comment.