Class: PtOnlineSchemaChange::Monitor

Inherits:
Object
  • Object
show all
Includes:
ElasticAPM::SpanHelpers
Defined in:
app/my_lib/pt_online_schema_change/monitor.rb

Constant Summary collapse

PROGRESS_LOG_FILE =
Rails.root.join('log/pt_osc_progress.log')
DEFAULT_POLL_INTERVAL =

seconds

30
DEFAULT_LAG_THRESHOLD =

seconds

10

Class Method Summary collapse

Class Method Details

.check_replication_lag(table_name, threshold) ⇒ Object

Check and warn about replication lag



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'app/my_lib/pt_online_schema_change/monitor.rb', line 155

def self.check_replication_lag(table_name, threshold)
  lag = replication_lag
  return unless lag && lag > threshold

  BUSINESS_LOGGER.warn('High replication lag detected during PT-OSC', {
                         table_name: table_name,
                         lag_seconds: lag,
                         threshold_seconds: threshold,
                       })

  APMErrorHandler.report('High replication lag during PT-OSC', context: {
                           table_name: table_name,
                           lag: lag,
                           threshold: threshold,
                         })
end

.monitor_table(table_name, poll_interval, lag_threshold, log_to_file) ⇒ Object

Internal monitoring loop



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'app/my_lib/pt_online_schema_change/monitor.rb', line 117

def self.monitor_table(table_name, poll_interval, lag_threshold, log_to_file)
  start_time = Time.current

  while pt_osc_running?(table_name)
    break if Thread.current[:stop_requested]

    # Check replication lag
    check_replication_lag(table_name, lag_threshold)

    # Log progress
    elapsed = Time.current - start_time
    progress_message = "PT-OSC running on #{table_name} for #{elapsed.round}s"

    BUSINESS_LOGGER.info(progress_message, {
                           table_name: table_name,
                           elapsed_seconds: elapsed.round,
                           operation: 'pt_osc_progress',
                         })

    # Log to file if requested
    if log_to_file
      File.open(PROGRESS_LOG_FILE, 'a') do |f|
        f.puts "#{Time.current.iso8601} - #{progress_message}"
      end
    end

    sleep poll_interval
  end

  total_time = Time.current - start_time
  BUSINESS_LOGGER.info('PT-OSC monitoring completed', {
                         table_name: table_name,
                         total_duration_seconds: total_time.round,
                       })
end

.pt_osc_running?(table_name) ⇒ Boolean

Check if PT-OSC is currently running on a table

Parameters:

  • table_name (String)

    The table name to check

Returns:

  • (Boolean)

    True if PT-OSC is running on the table



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'app/my_lib/pt_online_schema_change/monitor.rb', line 81

def self.pt_osc_running?(table_name)
  # Check for PT-OSC trigger table (created during operation)
  trigger_table = "_#{table_name}_new"

  result = ActiveRecord::Base.connection.execute(
    "SHOW TABLES LIKE '#{trigger_table}'",
  )

  result.count > 0
rescue StandardError => e
  BUSINESS_LOGGER.error('Failed to check PT-OSC status', {
                          table_name: table_name,
                          error: e.message,
                        })
  false
end

.replication_lagInteger?

Get current replication lag in seconds

Returns:

  • (Integer, nil)

    Lag in seconds, or nil if not available



103
104
105
106
107
108
109
110
111
112
113
# File 'app/my_lib/pt_online_schema_change/monitor.rb', line 103

def self.replication_lag
  result = ActiveRecord::Base.connection.execute('SHOW SLAVE STATUS')
  row = result.first

  return nil unless row && row['Seconds_Behind_Master']

  row['Seconds_Behind_Master'].to_i
rescue StandardError => e
  BUSINESS_LOGGER.error('Failed to check replication lag', { error: e.message })
  nil
end

.start_monitoring(table_name, options = {}) ⇒ Thread

Simple monitoring setup for use in migrations

Examples:

In a migration

monitor_thread = PtOnlineSchemaChange::Monitor.start_monitoring('users')
# ... run pt-osc command ...
PtOnlineSchemaChange::Monitor.stop_monitoring(monitor_thread)

Parameters:

  • table_name (String)

    The table being altered

  • options (Hash) (defaults to: {})

    Monitoring options

Options Hash (options):

  • :poll_interval (Integer) — default: 30

    How often to check progress in seconds

  • :lag_threshold (Integer) — default: 10

    Replication lag threshold in seconds

  • :log_to_file (Boolean) — default: true

    Whether to log progress to file

Returns:

  • (Thread)

    The monitoring thread



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'app/my_lib/pt_online_schema_change/monitor.rb', line 23

def self.start_monitoring(table_name, options = {})
  poll_interval = options[:poll_interval] || DEFAULT_POLL_INTERVAL
  lag_threshold = options[:lag_threshold] || DEFAULT_LAG_THRESHOLD
  log_to_file = options.fetch(:log_to_file, true)

  BUSINESS_LOGGER.set_business_context({ table_name: table_name, operation: 'pt_osc_monitoring' })

  BUSINESS_LOGGER.info('Starting PT-OSC monitoring', {
                         table_name: table_name,
                         poll_interval: poll_interval,
                         lag_threshold: lag_threshold,
                       })

  Thread.new do
    Thread.current[:name] = "pt_osc_monitor_#{table_name}"
    monitor_table(table_name, poll_interval, lag_threshold, log_to_file)
  rescue StandardError => e
    BUSINESS_LOGGER.error('PT-OSC monitoring failed', {
                            table_name: table_name,
                            error: e.message,
                          })
    APMErrorHandler.report(e, context: { table_name: table_name, operation: 'pt_osc_monitoring' })
  end
end

.stop_monitoring(monitor_thread, timeout = 30) ⇒ Boolean

Stops monitoring and waits for thread completion

Parameters:

  • monitor_thread (Thread)

    The monitoring thread to stop

  • timeout (Integer) (defaults to: 30)

    Maximum time to wait for thread completion in seconds

Returns:

  • (Boolean)

    True if thread stopped cleanly, false if timed out



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'app/my_lib/pt_online_schema_change/monitor.rb', line 55

def self.stop_monitoring(monitor_thread, timeout = 30)
  return true unless monitor_thread&.alive?

  monitor_thread[:stop_requested] = true
  monitor_thread.join(timeout)

  if monitor_thread.alive?
    BUSINESS_LOGGER.warn('PT-OSC monitoring thread did not stop gracefully', { timeout: timeout })
    monitor_thread.kill
    false
  else
    BUSINESS_LOGGER.info('PT-OSC monitoring stopped successfully')
    true
  end
rescue StandardError => e
  BUSINESS_LOGGER.error('Error stopping PT-OSC monitoring', { error: e.message })
  APMErrorHandler.report(e, context: { operation: 'stop_pt_osc_monitoring' })
  false
end