Class: EventDrivenClient::KafkaClient
- Inherits:
-
Object
- Object
- EventDrivenClient::KafkaClient
- Defined in:
- lib/event_driven_client/kafka_client.rb
Instance Method Summary collapse
-
#initialize(topic) ⇒ KafkaClient
constructor
A new instance of KafkaClient.
-
#send_event(payload) ⇒ Object
ordinary sending event (syncronus).
-
#send_event_async(payload) ⇒ Object
seding event asyncronus.
Constructor Details
#initialize(topic) ⇒ KafkaClient
Returns a new instance of KafkaClient.
3 4 5 6 7 |
# File 'lib/event_driven_client/kafka_client.rb', line 3 def initialize(topic) raise ArgumentError, 'Topic cannot be blank' if topic.blank? @topic = topic end |
Instance Method Details
#send_event(payload) ⇒ Object
ordinary sending event (syncronus)
10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/event_driven_client/kafka_client.rb', line 10 def send_event(payload) Karafka.producer.produce_sync(topic: @topic, payload: payload) rescue Rdkafka::AbstractHandle::WaitTimeoutError => e APMErrorHandler.report "TimeoutError: Failed to send event to topic: #{@topic}", payload: payload, error: e raise e rescue Rdkafka::RdkafkaError => e APMErrorHandler.report "RdkafkaError: Failed to send event to topic: #{@topic}", payload: payload, error: e raise e rescue StandardError => e APMErrorHandler.report 'Failed to send event', error: e raise e end |
#send_event_async(payload) ⇒ Object
seding event asyncronus
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/event_driven_client/kafka_client.rb', line 24 def send_event_async(payload) Karafka.producer.produce_async(topic: @topic, payload: payload) rescue Rdkafka::AbstractHandle::WaitTimeoutError => e APMErrorHandler.report "TimeoutError: Failed to send event to topic: #{@topic}", payload: payload, error: e raise e rescue Rdkafka::RdkafkaError => e APMErrorHandler.report "RdkafkaError: Failed to send event to topic: #{@topic}", payload: payload, error: e raise e rescue StandardError => e APMErrorHandler.report 'Failed to send event', error: e raise e end |