Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize Cursors #81

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

- [437](https://github.com/Shopify/job-iteration/pull/437) - Use minimum between per-class `job_iteration_max_job_runtime` and `JobIteration.max_job_runtime`, instead of enforcing only setting decreasing values.
Because it is possible to change the global or parent values after setting the value on a class, it is not possible to truly enforce the decreasing value constraint. Instead, we now use the minimum between the global value and per-class value. This is considered a non-breaking change, as it should not break any **existing** code, it only removes the constraint on new classes.
- [81](https://github.com/Shopify/job-iteration/pull/81) - Serialize cursors using `ActiveJob::Arguments` & deprecated unserializable cursors.
Cursor serialization has been dependent on the adapter's serialization method, which typically uses `JSON.dump` and `JSON.load`, meaning only JSON-serializable objects could be used as cursors. Using `ActiveJob::Arguments` to serialize cursors instead allows the use of any object that can be serialized using `ActiveJob::Arguments.serialize` and deserialized using `ActiveJob::Arguments.deserialize`, such as `Time` objects, which would previously be lossily serialized as strings.
This change is backwards compatible, by using a new job argument for the serialized cursor, but continuing to write to the old argument, ensuring that jobs can be processed regardless of if they are enqueued or dequeued with the old or new version of the gem.
In the event that a cursor is not serializable, the gem will fall back to the deprecated old behaviour. In Job Iteration 2.0, this fallback will be removed, and cursors will be required to be serializable, raising otherwise. To opt-in to this behaviour, set `JobIteration.enforce_serializable_cursors = true`. To support gradual migration, a per-class `job_iteration_enforce_serializable_cursors` option is also available, which overrides the global option for that class.

### Bug fixes

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ class MyJob < ApplicationJob
end
```

See [the guide on Custom Enumerator](guides/custom-enumerator.md) for details.

## Credits

This project would not be possible without these individuals (in alphabetical order):
Expand Down
56 changes: 56 additions & 0 deletions guides/custom-enumerator.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,62 @@ and you initiate the job with
LoadRefundsForChargeJob.perform_later(charge_id = "chrg_345")
```

### Cursor Serialization

Cursors should be of a [type that Active Job can serialize](https://guides.rubyonrails.org/active_job_basics.html#supported-types-for-arguments).

For example, consider:

```ruby
FancyCursor = Struct.new(:wrapped_value) do
def to_s
wrapped_value
end
end
```

```ruby
def build_enumerator(cursor:)
Enumerator.new do |yielder|
# ...something with fancy cursor...
yielder.yield 123, FancyCursor.new(:abc)
end
end
```

If this job was interrupted, Active Job would be unable to serialize
`FancyCursor`, and Job Iteration would fallback to the legacy behavior of not
serializing the cursor. This would typically result in the queue adapter
eventually serializing the cursor as JSON by calling `.to_s` on it. The cursor
would be deserialized as `:abc`, rather than the intended `FancyCursor.new(:abc)`.

To avoid this, job authors should take care to ensure that their cursor is
serializable by Active Job. This can be done in a couple ways, such as:
- [adding `GlobalID` support to the cursor class](https://guides.rubyonrails.org/active_job_basics.html#globalid)
- [implementing a custom Active Job argument serializer for the cursor class](https://guides.rubyonrails.org/active_job_basics.html#serializers)
- handling (de)serialization in the job/enumerator itself
```ruby
def build_enumerator(cursor:)
fancy_cursor = FancyCursor.new(cursor) unless cursor.nil?
Enumerator.new do |yielder|
# ...something with fancy cursor...
yielder.yield 123, FancyCursor(:abc).wrapped_value
end
end
```
Note that starting in 2.0, Job Iteration will stop supporting fallback behavior
and raise when it encounters an unserializable cursor. To opt-in to this behavior early, set
```ruby
JobIteration.enforce_serializable_cursors = true
```
or, to support gradual migration, a per-class option is also available to override the global value, if set:
```ruby
class MyJob < ActiveJob::Base
include JobIteration::Iteration
self.job_iteration_enforce_serializable_cursors = true
```


## Cursorless enumerator

Sometimes you can ignore the cursor. Consider the following custom `Enumerator` that takes items from a Redis list, which
Expand Down
21 changes: 21 additions & 0 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
require_relative "./job-iteration/enumerator_builder"
require_relative "./job-iteration/iteration"
require_relative "./job-iteration/log_subscriber"
require_relative "./job-iteration/railtie"

module JobIteration
IntegrationLoadError = Class.new(StandardError)

INTEGRATIONS = [:resque, :sidekiq]

Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally used ActiveSupport::Deprecation.warn directly, that labelled deprecations as coming from Rails. This uses our own object, but does mean that the host app's settings for ActiveSupport::Deprecation do not propagate (e.g. disallowed_deprecations).


extend self

attr_writer :logger
Expand Down Expand Up @@ -57,6 +60,24 @@ def logger
# where the throttle backoff value will take precedence over this setting.
attr_accessor :default_retry_backoff

# Set this to `true` to enforce that cursors be composed of objects capable
# of built-in (de)serialization by Active Job.
#
# JobIteration.enforce_serializable_cursors = true
#
# For more granular control, this can also be configured per job class, and
# is inherited by child classes.
#
# class MyJob < ActiveJob::Base
# include JobIteration::Iteration
# self.job_iteration_enforce_serializable_cursors = false
# # ...
# end
#
# Note that non-enforcement is deprecated and enforcement will be mandatory
# in version 2.0, at which point this config will go away.
attr_accessor :enforce_serializable_cursors

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
attr_accessor :interruption_adapter

Expand Down
108 changes: 55 additions & 53 deletions lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,6 @@ module Iteration
# The time isn't reset if the job is interrupted.
attr_accessor :total_time

class CursorError < ArgumentError
sambostock marked this conversation as resolved.
Show resolved Hide resolved
attr_reader :cursor

def initialize(message, cursor:)
super(message)
@cursor = cursor
end

def message
"#{super} (#{inspected_cursor})"
end

private

def inspected_cursor
cursor.inspect
rescue NoMethodError
# For those brave enough to try to use BasicObject as cursor. Nice try.
Object.instance_method(:inspect).bind(cursor).call
end
end

included do |_base|
define_callbacks :start
define_callbacks :shutdown
Expand All @@ -50,6 +28,12 @@ def inspected_cursor
instance_accessor: false,
instance_predicate: false,
)

class_attribute(
:job_iteration_enforce_serializable_cursors,
instance_accessor: false,
instance_predicate: false,
)
end

module ClassMethods
Expand Down Expand Up @@ -88,16 +72,25 @@ def initialize(*arguments)
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)

def serialize # @private
super.merge(
"cursor_position" => cursor_position,
iteration_job_data = {
"cursor_position" => cursor_position, # Backwards compatibility
"times_interrupted" => times_interrupted,
"total_time" => total_time,
)
}

begin
iteration_job_data["serialized_cursor_position"] = serialize_cursor(cursor_position)
rescue ActiveJob::SerializationError
raise if job_iteration_enforce_serializable_cursors?
# No point in duplicating the deprecation warning from assert_valid_cursor!
end

super.merge(iteration_job_data)
end

def deserialize(job_data) # @private
super
self.cursor_position = job_data["cursor_position"]
self.cursor_position = cursor_position_from_job_data(job_data)
self.times_interrupted = Integer(job_data["times_interrupted"] || 0)
self.total_time = Float(job_data["total_time"] || 0.0)
end
Expand Down Expand Up @@ -167,8 +160,7 @@ def iterate_with_enumerator(enumerator, arguments)
@needs_reenqueue = false

enumerator.each do |object_from_enumerator, cursor_from_enumerator|
# Deferred until 2.0.0
# assert_valid_cursor!(cursor_from_enumerator)
assert_valid_cursor!(cursor_from_enumerator)

tags = instrumentation_tags.merge(cursor_position: cursor_from_enumerator)
ActiveSupport::Notifications.instrument("each_iteration.iteration", tags) do
Expand Down Expand Up @@ -222,16 +214,16 @@ def build_enumerator(params, cursor:)
EOS
end

# The adapter must be able to serialize and deserialize the cursor back into an equivalent object.
# https://github.com/mperham/sidekiq/wiki/Best-Practices#1-make-your-job-parameters-small-and-simple
def assert_valid_cursor!(cursor)
return if serializable?(cursor)

raise CursorError.new(
"Cursor must be composed of objects capable of built-in (de)serialization: " \
"Strings, Integers, Floats, Arrays, Hashes, true, false, or nil.",
cursor: cursor,
)
serialize_cursor(cursor)
rescue ActiveJob::SerializationError
raise if job_iteration_enforce_serializable_cursors?

Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(3))
The Enumerator returned by #{self.class.name}#build_enumerator yielded a cursor which is unsafe to serialize.
See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to match the heading we added.

Suggested change
See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types
See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-serialization

This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!"
DEPRECATION_MESSAGE
end

def assert_implements_methods!
Expand Down Expand Up @@ -286,6 +278,13 @@ def job_iteration_max_job_runtime
[global_max, class_max].min
end

def job_iteration_enforce_serializable_cursors? # TODO: Add a test for the edge case of registering it afterwards
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we add this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch – yes, test_global_max_job_runtime_with_updated_value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should remove the TODO 🙂

per_class_setting = self.class.job_iteration_enforce_serializable_cursors
return per_class_setting unless per_class_setting.nil?

!!JobIteration.enforce_serializable_cursors
end

def handle_completed(completed)
case completed
when nil # someone aborted the job but wants to call the on_complete callback
Expand All @@ -305,6 +304,25 @@ def handle_completed(completed)
raise "Unexpected thrown value: #{completed.inspect}"
end

def cursor_position_from_job_data(job_data)
if job_data.key?("serialized_cursor_position")
deserialize_cursor(job_data.fetch("serialized_cursor_position"))
Comment on lines +308 to +309
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this so if a serialized cursor is found, it should always be deserializable.

else
# Backwards compatibility for
# - jobs interrupted before cursor serialization feature shipped, or
# - jobs whose cursor could not be serialized
job_data.fetch("cursor_position", nil)
end
end

def serialize_cursor(cursor)
ActiveJob::Arguments.serialize([cursor]).first
end

def deserialize_cursor(cursor)
ActiveJob::Arguments.deserialize([cursor]).first
end

def valid_cursor_parameter?(parameters)
# this condition is when people use the splat operator.
# def build_enumerator(*)
Expand All @@ -316,21 +334,5 @@ def valid_cursor_parameter?(parameters)
end
false
end

SIMPLE_SERIALIZABLE_CLASSES = [String, Integer, Float, NilClass, TrueClass, FalseClass].freeze
private_constant :SIMPLE_SERIALIZABLE_CLASSES
def serializable?(object)
# Subclasses must be excluded, hence not using is_a? or ===.
if object.instance_of?(Array)
object.all? { |element| serializable?(element) }
elsif object.instance_of?(Hash)
object.all? { |key, value| serializable?(key) && serializable?(value) }
else
SIMPLE_SERIALIZABLE_CLASSES.any? { |klass| object.instance_of?(klass) }
end
rescue NoMethodError
# BasicObject doesn't respond to instance_of, but we can't serialize it anyway
false
end
end
end
11 changes: 11 additions & 0 deletions lib/job-iteration/railtie.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

return unless defined?(Rails::Railtie)

module JobIteration
class Railtie < Rails::Railtie
initializer "job_iteration.register_deprecator" do |app|
app.deprecators[:job_iteration] = JobIteration::Deprecation
end
end
end
9 changes: 2 additions & 7 deletions test/integration/integration_behaviour.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,12 @@ module IntegrationBehaviour

test "unserializable corruption is prevented" do
skip_until_version("2.0.0")
# Cursors are serialized as JSON, but not all objects are serializable.
# time = Time.at(0).utc # => 1970-01-01 00:00:00 UTC
# json = JSON.dump(time) # => "\"1970-01-01 00:00:00 UTC\""
# string = JSON.parse(json) # => "1970-01-01 00:00:00 UTC"
# We serialized a Time, but it was deserialized as a String.
TimeCursorJob.perform_later
UnserializableCursorJob.perform_later
TerminateJob.perform_later
start_worker_and_wait

assert_equal(
JobIteration::Iteration::CursorError.name,
ActiveJob::SerializationError.name,
failed_job_error_class_name,
)
end
Expand Down
5 changes: 3 additions & 2 deletions test/support/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ def each_iteration(omg)
end
end

class TimeCursorJob < ActiveJob::Base
class UnserializableCursorJob < ActiveJob::Base
include JobIteration::Iteration
UnserializableCursor = Class.new
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're getting all of ActiveJob's serialization now, we have to explicitly create an object that cannot be serialized. BasicObject doesn't work though, because that straight up blows up ActiveJob because it doesn't respond to respond_to? 😅


def build_enumerator(cursor:)
return [["item", Time.now]].to_enum if cursor.nil?
return [["item", UnserializableCursor.new]].to_enum if cursor.nil?

raise "This should never run; cursor is unserializable!"
end
Expand Down
Loading