Class: EventDrivenClient::KafkaClient

Inherits:
Object
  • Object
show all
Defined in:
lib/event_driven_client/kafka_client.rb

Instance Method Summary collapse

Constructor Details

#initialize(topic) ⇒ KafkaClient

Returns a new instance of KafkaClient.

Raises:

  • (ArgumentError)


3
4
5
6
7
# File 'lib/event_driven_client/kafka_client.rb', line 3

def initialize(topic)
  raise ArgumentError, 'Topic cannot be blank' if topic.blank?

  @topic = topic
end

Instance Method Details

#send_event(payload) ⇒ Object

ordinary sending event (syncronus)



10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/event_driven_client/kafka_client.rb', line 10

def send_event(payload)
  Karafka.producer.produce_sync(topic: @topic, payload: payload)
rescue Rdkafka::AbstractHandle::WaitTimeoutError => e
  APMErrorHandler.report "TimeoutError: Failed to send event to topic: #{@topic}", payload: payload, error: e
  raise e
rescue Rdkafka::RdkafkaError => e
  APMErrorHandler.report "RdkafkaError: Failed to send event to topic: #{@topic}", payload: payload, error: e
  raise e
rescue StandardError => e
  APMErrorHandler.report 'Failed to send event', error: e
  raise e
end

#send_event_async(payload) ⇒ Object

seding event asyncronus



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/event_driven_client/kafka_client.rb', line 24

def send_event_async(payload)
  Karafka.producer.produce_async(topic: @topic, payload: payload)
rescue Rdkafka::AbstractHandle::WaitTimeoutError => e
  APMErrorHandler.report "TimeoutError: Failed to send event to topic: #{@topic}", payload: payload, error: e
  raise e
rescue Rdkafka::RdkafkaError => e
  APMErrorHandler.report "RdkafkaError: Failed to send event to topic: #{@topic}", payload: payload, error: e
  raise e
rescue StandardError => e
  APMErrorHandler.report 'Failed to send event', error: e
  raise e
end