Skip to content

Commit

Permalink
fix(streams): Add error-handling switch
Browse files Browse the repository at this point in the history
  • Loading branch information
pandomic committed May 2, 2019
1 parent da08f69 commit 318a796
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 10 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [0.1.3] - 2019-05-02
### Changed
- `Content-type` header for the HTTP streams is set as a string to fix doubled content-type issue

## [0.1.4] - 2019-05-02
### Added
- Error-handler switch for `publish` methods

### Changed
- README to reflect the error-handler switch
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
active_record_streams (0.1.3)
active_record_streams (0.1.4)
activerecord (~> 4.2.10)
aws-sdk (~> 2.11.9)

Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ class SampleHttpReSender
def perform(table_name, message_json)
message = ActiveRecordStreams::Message.from_json(message_json)
ActiveRecordStreams.config.streams.each do |stream|
stream.publish(table_name, message)
# Use `handle_error: false` to raise exceptions instead of
# calling `error_handler`, this will let he Sidekiq to
# perform retries on it's own
stream.publish(table_name, message, handle_error: false)
end
end
end
Expand Down
13 changes: 11 additions & 2 deletions lib/active_record_streams/publishers/http_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@ def initialize(
@error_handler = error_handler
end

def publish(table_name, message)
##
# @param [String] table_name
# @param [ActiveRecordStreams::Message] message
# @param [Boolean] handle_error

def publish(table_name, message, handle_error: true)
return unless (any_table? && allowed_table?(table_name)) ||
table_name == @table_name

request.body = message.json
response = http.request(request)
assert_response_code(response)
rescue StandardError => e
raise e unless @error_handler.is_a?(Proc)
raise e unless call_error_handler?(handle_error)

@error_handler.call(self, table_name, message, e)
end
Expand Down Expand Up @@ -82,6 +87,10 @@ def assert_response_code(response)

raise StandardError, response.body
end

def call_error_handler?(handle_error)
@error_handler.is_a?(Proc) && handle_error
end
end
end
end
12 changes: 12 additions & 0 deletions lib/active_record_streams/publishers/http_stream_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@
end
end

context 'error response with error handling off' do
let(:response) { double(body: 'Error', code: 400) }
let(:error_handler) { Proc.new {} }

it 'raises exception' do
expect(error_handler).not_to receive(:call)

expect { subject.publish(actual_table_name, message, handle_error: false) }
.to raise_error(StandardError)
end
end

context 'https target' do
let(:url) { 'https://hello.world' }

Expand Down
9 changes: 7 additions & 2 deletions lib/active_record_streams/publishers/kinesis_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ def initialize(
##
# @param [String] table_name
# @param [ActiveRecordStreams::Message] message
# @param [Boolean] handle_error

def publish(table_name, message)
def publish(table_name, message, handle_error: true)
return unless (any_table? && allowed_table?(table_name)) ||
table_name == @table_name

client.publish(@stream_name, partition_key(table_name),
message.json, @overrides)
rescue StandardError => e
raise e unless @error_handler.is_a?(Proc)
raise e unless call_error_handler?(handle_error)

@error_handler.call(self, table_name, message, e)
end
Expand All @@ -55,6 +56,10 @@ def allowed_table?(table_name)
!@ignored_tables.include?(table_name)
end

def call_error_handler?(handle_error)
@error_handler.is_a?(Proc) && handle_error
end

def partition_key(table_name)
"#{table_name}-#{Time.now.utc.strftime(PARTITION_KEY_TIME_FORMAT)}"
end
Expand Down
15 changes: 15 additions & 0 deletions lib/active_record_streams/publishers/kinesis_stream_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,20 @@
subject.publish(actual_table_name, message)
end
end

context 'delivery error error with error handling off' do
let(:error_handler) { Proc.new {} }

it 'raises exception' do
expect(kinesis_client).to receive(:publish) do
raise StandardError, 'Delivery error'
end

expect(error_handler).not_to receive(:call)

expect { subject.publish(actual_table_name, message, handle_error: false) }
.to raise_error(StandardError)
end
end
end
end
9 changes: 7 additions & 2 deletions lib/active_record_streams/publishers/sns_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ def initialize(
##
# @param [String] table_name
# @param [ActiveRecordStreams::Message] message
# @param [Boolean] handle_error

def publish(table_name, message)
def publish(table_name, message, handle_error: true)
return unless (any_table? && allowed_table?(table_name)) ||
table_name == @table_name

client.publish(@topic_arn, message.json, @overrides)
rescue StandardError => e
raise e unless @error_handler.is_a?(Proc)
raise e unless call_error_handler?(handle_error)

@error_handler.call(self, table_name, message, e)
end
Expand All @@ -51,6 +52,10 @@ def allowed_table?(table_name)
!@ignored_tables.include?(table_name)
end

def call_error_handler?(handle_error)
@error_handler.is_a?(Proc) && handle_error
end

def client
@client ||= SnsClient.new
end
Expand Down
15 changes: 15 additions & 0 deletions lib/active_record_streams/publishers/sns_stream_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,20 @@
subject.publish(actual_table_name, message)
end
end

context 'delivery error with error handling off' do
let(:error_handler) { Proc.new {} }

it 'raises exception' do
expect(sns_client).to receive(:publish) do
raise StandardError, 'Delivery error'
end

expect(error_handler).not_to receive(:call)

expect { subject.publish(actual_table_name, message, handle_error: false) }
.to raise_error(StandardError)
end
end
end
end
2 changes: 1 addition & 1 deletion lib/active_record_streams/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module ActiveRecordStreams
VERSION = '0.1.3'
VERSION = '0.1.4'
end
2 changes: 1 addition & 1 deletion lib/active_record_streams/version_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

RSpec.describe ActiveRecordStreams::VERSION do
it 'has a version number' do
expect(subject).to eq('0.1.3')
expect(subject).to eq('0.1.4')
end
end

0 comments on commit 318a796

Please sign in to comment.