Class: Karafka::Instrumentation::Vendors::Kubernetes::LivenessListener

Inherits:
BaseListener
  • Object
show all
Defined in:
lib/karafka_ext/k8s.rb

Overview

Note:

This listener will bind itself only when Karafka will actually attempt to start and moves from initializing to running. Before that, the TCP server will NOT be active. This is done on purpose to mitigate a case where users would subscribe this listener in `karafka.rb` without checking the recommendations of conditional assignment.

Note:

In case of usage within an embedding with Puma, you need to select different port then the one used by Puma itself.

Note:

Please use `Kubernetes::SwarmLivenessListener` when operating in the swarm mode

Kubernetes HTTP listener that does not only reply when process is not fully hanging, but also allows to define max time of processing and looping.

Processes like Karafka server can hang while still being reachable. For example, in case something would hang inside of the user code, Karafka could stop polling and no new data would be processed, but process itself would still be active. This listener allows for defining of a ttl that gets bumped on each poll loop and before and after processing of a given messages batch.

Instance Method Summary collapse

Constructor Details

#initialize(hostname: nil, port: 4000, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000, logger: ActiveSupport::Logger.new(STDOUT)) ⇒ LivenessListener

Note:

The default TTL matches the default `max.poll.interval.ms`

Returns a new instance of LivenessListener.

Parameters:

  • hostname (String, nil) (defaults to: nil)

    hostname or nil to bind on all

  • port (Integer) (defaults to: 4000)

    TCP port on which we want to run our HTTP status server

  • consuming_ttl (Integer) (defaults to: 5 * 60 * 1_000)

    time in ms after which we consider consumption hanging. It allows us to define max consumption time after which k8s should consider given process as hanging

  • polling_ttl (Integer) (defaults to: 5 * 60 * 1_000)

    max time in ms for polling. If polling (any) does not happen that often, process should be considered dead.

  • logger (Logger) (defaults to: ActiveSupport::Logger.new(STDOUT))

    logger instance that we want to use for logging



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/karafka_ext/k8s.rb', line 128

def initialize(
  hostname: nil,
  port: 4000,
  consuming_ttl: 5 * 60 * 1_000,
  polling_ttl: 5 * 60 * 1_000,
  logger: ActiveSupport::Logger.new(STDOUT)
)
  # If this is set to true, it indicates unrecoverable error like fencing
  # While fencing can be partial (for one of the SGs), we still should consider this
  # as an undesired state for the whole process because it halts processing in a
  # non-recoverable manner forever
  @unrecoverable = false
  @polling_ttl = polling_ttl
  @consuming_ttl = consuming_ttl
  @mutex = Mutex.new
  @pollings = {}
  @consumptions = {}
  super(hostname: hostname, port: port, logger: logger)
end

Instance Method Details

#healthy?String

Did we exceed any of the ttls

Returns:

  • (String)

    204 string if ok, 500 otherwise



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/karafka_ext/k8s.rb', line 230

def healthy?
  time = monotonic_now

  if @unrecoverable
    @logger.error('Karafka process is in an unrecoverable state')
    return false
  end
  if @pollings.values.any? { |tick| (time - tick) > @polling_ttl }
    @logger.error('Karafka process is not polling')
    return false
  end

  if @consumptions.values.any? { |tick| (time - tick) > @consuming_ttl }
    @logger.error('Karafka process is not consuming')
    return false
  end

  @logger.debug('Karafka process is healthy')
  true
end

#on_app_running(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


149
150
151
# File 'lib/karafka_ext/k8s.rb', line 149

def on_app_running(_event)
  start
end

#on_app_stopped(_event) ⇒ Object

Stop the http server when we stop the process

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


155
156
157
158
# File 'lib/karafka_ext/k8s.rb', line 155

def on_app_stopped(_event)

  stop
end

#on_connection_listener_fetch_loop(_event) ⇒ Object

Tick on each fetch

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


162
163
164
165
# File 'lib/karafka_ext/k8s.rb', line 162

def on_connection_listener_fetch_loop(_event)
  mark_polling_tick

end

#on_connection_listener_stopped(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


222
223
224
225
226
# File 'lib/karafka_ext/k8s.rb', line 222

def on_connection_listener_stopped(_event)
  return if Karafka::App.done?

  clear_polling_tick
end

#on_connection_listener_stopping(_event) ⇒ Object

Deregister the polling tracker for given listener

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


208
209
210
211
212
213
214
215
216
217
# File 'lib/karafka_ext/k8s.rb', line 208

def on_connection_listener_stopping(_event)
  # We are interested in disabling tracking for given listener only if it was requested
  # when karafka was running. If we would always clear, it would not catch the shutdown
  # polling requirements. The "running" listener shutdown operations happen only when
  # the manager requests it for downscaling.
  return if Karafka::App.done?


  clear_polling_tick
end

#on_error_occurred(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)


191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/karafka_ext/k8s.rb', line 191

def on_error_occurred(event)
  clear_consumption_tick
  clear_polling_tick

  error = event[:error]

  # We are only interested in the rdkafka errors

  return unless error.is_a?(Rdkafka::RdkafkaError)
  # We mark as unrecoverable only on certain errors that will not be fixed by retrying
  return unless UNRECOVERABLE_RDKAFKA_ERRORS.include?(error.code)

  @unrecoverable = true
end