Class: Karafka::Instrumentation::Vendors::Kubernetes::LivenessListener
- Inherits:
-
BaseListener
- Object
- BaseListener
- Karafka::Instrumentation::Vendors::Kubernetes::LivenessListener
- Defined in:
- lib/karafka_ext/k8s.rb
Overview
This listener will bind itself only when Karafka will actually attempt to start and moves from initializing to running. Before that, the TCP server will NOT be active. This is done on purpose to mitigate a case where users would subscribe this listener in `karafka.rb` without checking the recommendations of conditional assignment.
In case of usage within an embedding with Puma, you need to select different port then the one used by Puma itself.
Please use `Kubernetes::SwarmLivenessListener` when operating in the swarm mode
Kubernetes HTTP listener that does not only reply when process is not fully hanging, but also allows to define max time of processing and looping.
Processes like Karafka server can hang while still being reachable. For example, in case something would hang inside of the user code, Karafka could stop polling and no new data would be processed, but process itself would still be active. This listener allows for defining of a ttl that gets bumped on each poll loop and before and after processing of a given messages batch.
Instance Method Summary collapse
-
#healthy? ⇒ String
Did we exceed any of the ttls.
-
#initialize(hostname: nil, port: 4000, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000, logger: ActiveSupport::Logger.new(STDOUT)) ⇒ LivenessListener
constructor
A new instance of LivenessListener.
- #on_app_running(_event) ⇒ Object
-
#on_app_stopped(_event) ⇒ Object
Stop the http server when we stop the process.
-
#on_connection_listener_fetch_loop(_event) ⇒ Object
Tick on each fetch.
- #on_connection_listener_stopped(_event) ⇒ Object
-
#on_connection_listener_stopping(_event) ⇒ Object
Deregister the polling tracker for given listener.
- #on_error_occurred(event) ⇒ Object
Constructor Details
#initialize(hostname: nil, port: 4000, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000, logger: ActiveSupport::Logger.new(STDOUT)) ⇒ LivenessListener
The default TTL matches the default `max.poll.interval.ms`
Returns a new instance of LivenessListener.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/karafka_ext/k8s.rb', line 128 def initialize( hostname: nil, port: 4000, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000, logger: ActiveSupport::Logger.new(STDOUT) ) # If this is set to true, it indicates unrecoverable error like fencing # While fencing can be partial (for one of the SGs), we still should consider this # as an undesired state for the whole process because it halts processing in a # non-recoverable manner forever @unrecoverable = false @polling_ttl = polling_ttl @consuming_ttl = consuming_ttl @mutex = Mutex.new @pollings = {} @consumptions = {} super(hostname: hostname, port: port, logger: logger) end |
Instance Method Details
#healthy? ⇒ String
Did we exceed any of the ttls
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/karafka_ext/k8s.rb', line 230 def healthy? time = monotonic_now if @unrecoverable @logger.error('Karafka process is in an unrecoverable state') return false end if @pollings.values.any? { |tick| (time - tick) > @polling_ttl } @logger.error('Karafka process is not polling') return false end if @consumptions.values.any? { |tick| (time - tick) > @consuming_ttl } @logger.error('Karafka process is not consuming') return false end @logger.debug('Karafka process is healthy') true end |
#on_app_running(_event) ⇒ Object
149 150 151 |
# File 'lib/karafka_ext/k8s.rb', line 149 def on_app_running(_event) start end |
#on_app_stopped(_event) ⇒ Object
Stop the http server when we stop the process
155 156 157 158 |
# File 'lib/karafka_ext/k8s.rb', line 155 def on_app_stopped(_event) stop end |
#on_connection_listener_fetch_loop(_event) ⇒ Object
Tick on each fetch
162 163 164 165 |
# File 'lib/karafka_ext/k8s.rb', line 162 def on_connection_listener_fetch_loop(_event) mark_polling_tick end |
#on_connection_listener_stopped(_event) ⇒ Object
222 223 224 225 226 |
# File 'lib/karafka_ext/k8s.rb', line 222 def on_connection_listener_stopped(_event) return if Karafka::App.done? clear_polling_tick end |
#on_connection_listener_stopping(_event) ⇒ Object
Deregister the polling tracker for given listener
208 209 210 211 212 213 214 215 216 217 |
# File 'lib/karafka_ext/k8s.rb', line 208 def on_connection_listener_stopping(_event) # We are interested in disabling tracking for given listener only if it was requested # when karafka was running. If we would always clear, it would not catch the shutdown # polling requirements. The "running" listener shutdown operations happen only when # the manager requests it for downscaling. return if Karafka::App.done? clear_polling_tick end |
#on_error_occurred(event) ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/karafka_ext/k8s.rb', line 191 def on_error_occurred(event) clear_consumption_tick clear_polling_tick error = event[:error] # We are only interested in the rdkafka errors return unless error.is_a?(Rdkafka::RdkafkaError) # We mark as unrecoverable only on certain errors that will not be fixed by retrying return unless UNRECOVERABLE_RDKAFKA_ERRORS.include?(error.code) @unrecoverable = true end |