diff --git a/CHANGELOG.md b/CHANGELOG.md index de2d002..1da7010 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.9.6] + +### Added +- Enhanced Sentry integration + +### Deprecated +- `Eventboss::ErrorHandlers::Sentry` is now deprecated in favor of `Eventboss::Sentry::ErrorHandler` + ## [1.9.2] - 2025-01-21 - Fix typo in instance var during shut down diff --git a/README.md b/README.md index 23b6aee..282e981 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,17 @@ Eventboss.configure do |config| end ``` +### Sentry Integration + +Eventboss provides built-in integration with [Sentry](https://sentry.io/) for error monitoring and performance tracking. The simplest way to enable Sentry integration is to require the configuration module: + +```ruby +require 'eventboss/sentry/configure' +``` + +For more advanced configuration options, you can manually configure the integration. Please inspect [lib/eventboss/sentry/configure.rb](lib/eventboss/sentry/configure.rb) to see options. +``` + ### Middlewares Server middlewares intercept the execution of your `Listeners`. You can use to extract and run common functions on every message received. diff --git a/lib/eventboss/error_handlers/sentry.rb b/lib/eventboss/error_handlers/sentry.rb index 5672eb3..aa71be9 100644 --- a/lib/eventboss/error_handlers/sentry.rb +++ b/lib/eventboss/error_handlers/sentry.rb @@ -1,6 +1,14 @@ module Eventboss module ErrorHandlers class Sentry + def initialize + warn "[DEPRECATED] Eventboss::ErrorHandlers::Sentry is deprecated. " \ + "Use Eventboss::Sentry::ErrorHandler instead. " \ + "For automatic configuration, require 'eventboss/sentry/configure'. " \ + "This class will be removed in a future version." + super + end + def call(exception, context = {}) eventboss_context = { component: 'eventboss' } eventboss_context[:action] = context[:processor].class.to_s if context[:processor] diff --git a/lib/eventboss/long_poller.rb b/lib/eventboss/long_poller.rb index 67845c5..0f684b9 100644 --- a/lib/eventboss/long_poller.rb +++ b/lib/eventboss/long_poller.rb @@ -73,7 +73,9 @@ def fetch_messages @client.receive_message( queue_url: queue.url, max_number_of_messages: 10, - wait_time_seconds: TIME_WAIT + wait_time_seconds: TIME_WAIT, + attribute_names: ['SentTimestamp', 'ApproximateReceiveCount'], + message_attribute_names: ['sentry-trace', 'baggage', 'sentry_user'] ).messages end end diff --git a/lib/eventboss/publisher.rb b/lib/eventboss/publisher.rb index c62aa22..d539618 100644 --- a/lib/eventboss/publisher.rb +++ b/lib/eventboss/publisher.rb @@ -10,19 +10,48 @@ def initialize(event_name, sns_client, configuration, opts = {}) end def publish(payload) - topic_arn = Topic.build_arn(event_name: event_name, source_app: source) - sns_client.publish( - topic_arn: topic_arn, - message: json_payload(payload) - ) + with_sentry_span do + sns_client.publish(**build_sns_params(payload)) + end end private attr_reader :event_name, :sns_client, :configuration, :source + def sentry_enabled? + defined?(::Eventboss::Sentry::Integration) && ::Sentry.initialized? + end + + def build_sns_params(payload) + { + topic_arn: Topic.build_arn(event_name: event_name, source_app: source), + message: json_payload(payload), + message_attributes: sentry_enabled? ? build_sns_message_attributes : {} + } + end + + def with_sentry_span + return yield unless sentry_enabled? + + queue_name = Queue.build_name(destination: source, event_name: event_name, env: Eventboss.env, source_app: source) + + ::Sentry.with_child_span(op: 'queue.publish', description: "Eventboss push #{source}/#{event_name}") do |span| + span.set_data(::Sentry::Span::DataConventions::MESSAGING_DESTINATION_NAME, ::Eventboss::Sentry::Context.queue_name_for_sentry(queue_name)) + + message = yield + + span.set_data(::Sentry::Span::DataConventions::MESSAGING_MESSAGE_ID, message.message_id) + message + end + end + def json_payload(payload) payload.is_a?(String) ? payload : payload.to_json end + + def build_sns_message_attributes + ::Eventboss::Sentry::Context.build_sns_message_attributes + end end -end +end \ No newline at end of file diff --git a/lib/eventboss/sentry/configure.rb b/lib/eventboss/sentry/configure.rb new file mode 100644 index 0000000..b3e6e82 --- /dev/null +++ b/lib/eventboss/sentry/configure.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +require_relative 'integration' + +# Auto configure eventboss to use sentry + +Eventboss.configure do |config| + config.server_middleware.add Eventboss::Sentry::ServerMiddleware + config.error_handlers << Eventboss::Sentry::ErrorHandler.new +end + diff --git a/lib/eventboss/sentry/context.rb b/lib/eventboss/sentry/context.rb new file mode 100644 index 0000000..05bc9bc --- /dev/null +++ b/lib/eventboss/sentry/context.rb @@ -0,0 +1,35 @@ +module Eventboss + module Sentry + class Context + # since sentry has env selector, we can remove it from queue names + QUEUES_WITHOUT_ENV = Hash.new do |hash, key| + hash[key] = key + .gsub(/-#{Eventboss.env}-deadletter$/, '-ENV-deadletter') + .gsub(/-#{Eventboss.env}$/, '-ENV') + end + + def self.queue_name_for_sentry(queue_name) + QUEUES_WITHOUT_ENV[queue_name] + end + + # Constructs SNS message attributes for Sentry trace propagation. + def self.build_sns_message_attributes + attributes = ::Sentry.get_trace_propagation_headers + .slice('sentry-trace', 'baggage') + .transform_values do |header_value| + { string_value: header_value, data_type: 'String' } + end + + user = ::Sentry.get_current_scope&.user + if user && !user.empty? + attributes['sentry_user'] = { + string_value: user.to_json, + data_type: 'String' + } + end + + attributes + end + end + end +end \ No newline at end of file diff --git a/lib/eventboss/sentry/error_handler.rb b/lib/eventboss/sentry/error_handler.rb new file mode 100644 index 0000000..2c3a694 --- /dev/null +++ b/lib/eventboss/sentry/error_handler.rb @@ -0,0 +1,15 @@ +module Eventboss + module Sentry + class ErrorHandler + def call(exception, _context = {}) + return unless ::Sentry.initialized? + + Eventboss::Sentry::Integration.capture_exception( + exception, + contexts: { eventboss: { } }, + hint: { background: false } + ) + end + end + end +end diff --git a/lib/eventboss/sentry/integration.rb b/lib/eventboss/sentry/integration.rb new file mode 100644 index 0000000..3401d54 --- /dev/null +++ b/lib/eventboss/sentry/integration.rb @@ -0,0 +1,15 @@ +require 'sentry-ruby' +require 'sentry/integrable' +require_relative 'error_handler' +require_relative 'context' +require_relative 'server_middleware' + +module Eventboss + module Sentry + class Integration + extend ::Sentry::Integrable + + register_integration name: "eventboss", version: Eventboss::VERSION + end + end +end diff --git a/lib/eventboss/sentry/server_middleware.rb b/lib/eventboss/sentry/server_middleware.rb new file mode 100644 index 0000000..003d66a --- /dev/null +++ b/lib/eventboss/sentry/server_middleware.rb @@ -0,0 +1,95 @@ +module Eventboss + module Sentry + class ServerMiddleware < Eventboss::Middleware::Base + OP_NAME = 'queue.process' + SPAN_ORIGIN = 'auto.queue.eventboss' + + def call(work) + return yield unless ::Sentry.initialized? + + ::Sentry.clone_hub_to_current_thread + scope = ::Sentry.get_current_scope + scope.clear + if (user = extract_sentry_user(work)) + scope.set_user(user) + end + scope.set_tags(queue: extract_queue_name(work), message_id: work.message.message_id) + scope.set_transaction_name(extract_transaction_name(work), source: :task) + transaction = start_transaction(scope, work) + + if transaction + scope.set_span(transaction) + transaction.set_data(::Sentry::Span::DataConventions::MESSAGING_MESSAGE_ID, work.message.message_id) + transaction.set_data(::Sentry::Span::DataConventions::MESSAGING_DESTINATION_NAME, extract_queue_name(work)) + + if (latency = extract_latency(work.message)) + transaction.set_data(::Sentry::Span::DataConventions::MESSAGING_MESSAGE_RECEIVE_LATENCY, latency) + end + + if (retry_count = extract_receive_count(work.message)) + transaction.set_data(::Sentry::Span::DataConventions::MESSAGING_MESSAGE_RETRY_COUNT, retry_count) + end + end + + begin + yield + rescue StandardError + finish_transaction(transaction, 500) + raise + end + + finish_transaction(transaction, 200) + end + + def start_transaction(scope, work) + options = { + name: scope.transaction_name, + source: scope.transaction_source, + op: OP_NAME, + origin: SPAN_ORIGIN + } + + env = { + 'sentry-trace' => work.message.message_attributes['sentry-trace']&.string_value, + 'baggage' => work.message.message_attributes['baggage']&.string_value + } + + transaction = ::Sentry.continue_trace(env, **options) + ::Sentry.start_transaction(transaction: transaction, **options) + end + + def finish_transaction(transaction, status) + return unless transaction + + transaction.set_http_status(status) + transaction.finish + end + + def extract_sentry_user(work) + if (value = work.message.message_attributes["sentry_user"]&.string_value) + JSON.parse(value) + end + end + + def extract_transaction_name(work) + "Eventboss/#{work.listener.to_s}" + end + + def extract_queue_name(work) + ::Eventboss::Sentry::Context.queue_name_for_sentry(work.queue.name) + end + + def extract_latency(message) + if sent_timestamp = message.attributes.fetch('SentTimestamp', nil) + Time.now - Time.at(sent_timestamp.to_i / 1000.0) + end + end + + def extract_receive_count(message) + if receive_count = message.attributes.fetch('ApproximateReceiveCount', nil) + receive_count.to_i - 1 + end + end + end + end +end \ No newline at end of file diff --git a/lib/eventboss/version.rb b/lib/eventboss/version.rb index a11dd83..ec1f59d 100644 --- a/lib/eventboss/version.rb +++ b/lib/eventboss/version.rb @@ -1,3 +1,3 @@ module Eventboss - VERSION = "1.9.5" + VERSION = "1.9.6" end diff --git a/spec/eventboss/long_poller_spec.rb b/spec/eventboss/long_poller_spec.rb index b10f7cb..236fbc9 100644 --- a/spec/eventboss/long_poller_spec.rb +++ b/spec/eventboss/long_poller_spec.rb @@ -31,7 +31,9 @@ it 'calls client with proper attributes' do expect(client).to receive(:receive_message) - .with(queue_url: 'url', max_number_of_messages: 10, wait_time_seconds: 10) + .with(queue_url: 'url', max_number_of_messages: 10, wait_time_seconds: 10, + attribute_names: ["SentTimestamp", "ApproximateReceiveCount"], + message_attribute_names: ["sentry-trace", "baggage", "sentry_user"]) subject.fetch_and_dispatch end diff --git a/spec/eventboss/publisher_spec.rb b/spec/eventboss/publisher_spec.rb index 1d56de8..75ad99f 100644 --- a/spec/eventboss/publisher_spec.rb +++ b/spec/eventboss/publisher_spec.rb @@ -26,7 +26,8 @@ it 'publishes to sns with source app name by default' do expect(sns_client).to receive(:publish).with( topic_arn: "arn:aws:sns:::eventboss-app_name1-event_name-ping", - message: "{}" + message: "{}", + message_attributes: {} ) expect(subject).to eq(sns_response) end @@ -53,7 +54,8 @@ it 'publishes to sns without app name' do expect(sns_client).to receive(:publish).with( topic_arn: "arn:aws:sns:::eventboss-event_name-ping", - message: "{}" + message: "{}", + message_attributes: {} ) subject end