Module: VendorsService::GoogleReserve::Concerns::FeedUploader

Extended by:
ActiveSupport::Concern
Included in:
AvailabilityFeeds::ProcessorService, AvailabilityFeeds::SchedulerService, AvailabilityFeeds::UploaderService, MerchantFeedsService, ServiceFeedsService
Defined in:
app/services/vendors_service/google_reserve/concerns/feed_uploader.rb

Overview

rubocop:disable Metrics/ModuleLength, Metrics/MethodLength, Metrics/AbcSize

Constant Summary collapse

UPLOAD_PATH_FILES =

Cloud storage paths for upload operations

'vendors/rwg_feeds/'
UPLOAD_PATH_FRAGMENTS =
'vendors/rwg_feeds/availability_fragments/'
AVAILABILITY_FILE_NAME =

Base filename patterns

'google-reserve-availability-feed'
AVAILABILITY_FRAGMENT_NAME =
'rwg-availability-fragment'
MERCHANT_FILE_NAME =
'google-reserve-merchant-feed'
SERVICE_FILE_NAME =
'google-reserve-service-feed'
LOCAL_PATH =

Local temporary directory for file processing

'tmp/rwg_feeds'
AVAILABILITY_FEED_TYPE =

Feed type identifier for SFTP uploads

:availability_feed

Instance Method Summary collapse

Instance Method Details

#cleanup_file(file_path) ⇒ void

This method returns an undefined value.

Parameters:

  • file_path (String)

    File path to delete



113
114
115
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 113

def cleanup_file(file_path)
  File.delete(file_path) if File.exist?(file_path)
end

#compress_file(json_path, gz_path) ⇒ void

This method returns an undefined value.

Compresses JSON to gzip with integrity validation. Auto-switches to streaming for files >100MB.

Parameters:

  • json_path (String)

    Source JSON file path

  • gz_path (String)

    Destination gzip file path

Raises:

  • (ArgumentError)

    If source file doesn't exist or is empty

  • (StandardError)

    If compression or validation fails



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 125

def compress_file(json_path, gz_path)
  unless File.exist?(json_path) && File.size(json_path) > 0
    raise ArgumentError, "Source file does not exist or is empty: #{json_path}"
  end

  file_size_mb = File.size(json_path).to_f / (1024.0 * 1024.0)
  if file_size_mb > 100
    BUSINESS_LOGGER.info('Using streaming compression for large file', file_size_mb: file_size_mb.round(2))
    return compress_file_streaming(json_path, gz_path)
  end

  File.open(json_path, 'rb') do |src|
    Zlib::GzipWriter.open(gz_path) do |gz|
      IO.copy_stream(src, gz)
    end
  end

  # Verify compressed file was created successfully
  unless File.exist?(gz_path) && File.size(gz_path) > 0
    raise StandardError, "Failed to create compressed file: #{gz_path}"
  end

  begin
    Zlib::GzipReader.open(gz_path) do |gz|
      chunk_size = 1024 * 1024 # 1MB
      # Read chunks and discard; use the return value rather than
      # passing an outbuf because some Zlib::GzipReader#read
      # implementations accept only one argument.
      while gz.read(chunk_size)
        # no-op: reading and discarding verifies integrity while keeping
        # memory usage bounded
      end
    end
  rescue Zlib::GzipFile::Error => e
    raise StandardError, "Compression integrity check failed: #{e.message}"
  end
end

#compress_file_streaming(json_path, gz_path) ⇒ void

This method returns an undefined value.

Compresses large JSON using 1MB chunk streaming with proper error handling. Uses explicit file handles instead of block form to ensure proper GZIP finalization. This prevents corrupted GZIP files from being created when worker is killed.

Parameters:

  • json_path (String)

    Source JSON file path

  • gz_path (String)

    Destination gzip file path

Raises:

  • (StandardError)

    If compression fails or integrity check fails



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 171

def compress_file_streaming(json_path, gz_path)
  chunk_size = 1024 * 1024
  src_file = nil
  gz_file = nil
  gz_writer = nil

  begin
    # Open files explicitly to ensure proper closure order
    src_file = File.open(json_path, 'rb')
    gz_file = File.open(gz_path, 'wb')
    gz_writer = Zlib::GzipWriter.new(gz_file)

    # Stream data in chunks
    while (chunk = src_file.read(chunk_size))
      gz_writer.write(chunk)
    end

    # CRITICAL: Explicitly finalize GZIP stream
    # This ensures proper trailer is written even if worker is killed
    gz_writer.flush
    gz_writer.finish
  rescue StandardError => e
    # Cleanup corrupted file on any error
    File.delete(gz_path) if gz_path && File.exist?(gz_path)
    raise StandardError, "Compression failed: #{e.message}"
  ensure
    # Critical: Close in the correct order - gzip writer first, then file handle
    begin
      gz_writer&.close unless gz_writer&.closed?
    rescue StandardError => e
      BUSINESS_LOGGER.warn('Error closing gzip writer', error: e.message)
    end
    begin
      gz_file&.close unless gz_file&.closed?
    rescue StandardError
      nil
    end
    begin
      src_file&.close unless src_file&.closed?
    rescue StandardError
      nil
    end
  end

  # Verify compressed file was created
  unless File.exist?(gz_path) && File.size(gz_path) > 0
    raise StandardError, "Failed to create compressed file: #{gz_path}"
  end

  # CRITICAL: Verify GZIP integrity before allowing upload
  verify_gzip_integrity(gz_path)
end

#decompress_file(gz_path) ⇒ String

Decompresses gzip file with adaptive streaming for large files. Uses simple read for <10MB compressed, streaming for larger files.

Parameters:

  • gz_path (String)

    Compressed file path

Returns:

  • (String)

    Decompressed content



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 257

def decompress_file(gz_path)
  compressed_size_mb = File.size(gz_path).to_f / (1024.0 * 1024.0)

  if compressed_size_mb < 10
    return Zlib::GzipReader.open(gz_path, &:read)
  end

  BUSINESS_LOGGER.debug('Using streaming decompression for large file',
                        compressed_size_mb: compressed_size_mb.round(2))

  decompressed_content = String.new
  chunk_size = 1024 * 1024

  Zlib::GzipReader.open(gz_path) do |gz|
    while (chunk = gz.read(chunk_size))
      decompressed_content << chunk
    end
  end

  decompressed_content
end

#download_gzip_file_from_cloud(download_file_path, local_file_path) ⇒ void

This method returns an undefined value.

Downloads gzip from S3 with retry logic for rate limiting (critical at 2000+ scale).

Parameters:

  • download_file_path (String)

    S3 object key

  • local_file_path (String)

    Local destination path

Raises:

  • (StandardError)

    After retries exhausted



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 366

def download_gzip_file_from_cloud(download_file_path, local_file_path)
  FileUtils.mkdir_p(File.dirname(local_file_path))

  Retriable.retriable(
    tries: 3,
    base_interval: 1,
    multiplier: 2,
    on: [
      Aws::S3::Errors::ServiceError,
      Aws::S3::Errors::InternalError,
      Aws::S3::Errors::ServiceUnavailable,
      Aws::S3::Errors::SlowDown,
      Net::OpenTimeout,
      Net::ReadTimeout,
    ],
  ) do
    s3 = Aws::S3::Resource.new(region: Figaro.env.aws_region!)
    source_obj = s3.bucket(Figaro.env.AWS_DOCS_BUCKET!).object(download_file_path)
    source_obj.download_file(local_file_path)
  end
rescue StandardError => e
  APMErrorHandler.report('Failed to download gzip file from cloud storage',
                         exception: e,
                         context: { download_path: download_file_path, local_path: local_file_path })
  raise e
end

#final_feed_paths(timestamp, feed_type = :availability_feed) ⇒ Hash{Symbol => String}

Generates all file paths for final aggregated feed.

Parameters:

  • timestamp (Integer)

    Unix timestamp for filename

  • feed_type (Symbol) (defaults to: :availability_feed)

    :availability_feed, :merchant_feed, or :service_feed

Returns:

  • (Hash{Symbol => String})

    Paths hash with :local_json, :local_gz, :cloud_gz



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 89

def final_feed_paths(timestamp, feed_type = :availability_feed)
  base_name = case feed_type
              when :merchant_feed
                MERCHANT_FILE_NAME
              when :service_feed
                SERVICE_FILE_NAME
              when :availability_feed
                AVAILABILITY_FILE_NAME
              else
                raise ArgumentError, "Unknown feed type: #{feed_type}"
              end

  filename = "#{base_name}-#{timestamp}.json"
  FileUtils.mkdir_p(LOCAL_PATH) unless Dir.exist?(LOCAL_PATH)

  {
    local_json: File.join(LOCAL_PATH, filename),
    local_gz: File.join(LOCAL_PATH, "#{filename}.gz"),
    cloud_gz: "#{UPLOAD_PATH_FILES}#{filename}.gz",
  }
end

#fragment_paths(restaurant_id) ⇒ Hash{Symbol => String}

Generates all file paths for a restaurant fragment. Uses consistent timestamp across multiple calls to avoid path mismatches.

Examples:

paths = fragment_paths(123)
paths[:local_json] # => "tmp/rwg_feeds/rwg-availability-fragment-restaurant-123-1234567890.json"

Parameters:

  • restaurant_id (Integer)

    Restaurant ID for filename

Returns:

  • (Hash{Symbol => String})

    Paths hash with :local_json, :local_gz, :cloud_gz



72
73
74
75
76
77
78
79
80
81
82
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 72

def fragment_paths(restaurant_id)
  @fragment_timestamp ||= Time.current.strftime('%s').to_i
  filename = "#{AVAILABILITY_FRAGMENT_NAME}-restaurant-#{restaurant_id}-#{@fragment_timestamp}.json"
  FileUtils.mkdir_p(LOCAL_PATH) unless Dir.exist?(LOCAL_PATH)

  {
    local_json: File.join(LOCAL_PATH, filename),
    local_gz: File.join(LOCAL_PATH, "#{filename}.gz"),
    cloud_gz: "#{UPLOAD_PATH_FRAGMENTS}#{filename}.gz",
  }
end

#generate_and_upload_feed_file(json_data, feed_type, timestamp = nil) ⇒ void

This method returns an undefined value.

Complete workflow: generate file, compress, upload to S3 and SFTP, cleanup.

Parameters:

  • json_data (String)

    JSON data to write

  • feed_type (Symbol)

    :availability_feed, :merchant_feed, or :service_feed

  • timestamp (Integer, nil) (defaults to: nil)

    Unix timestamp (default: current time)



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 285

def generate_and_upload_feed_file(json_data, feed_type, timestamp = nil)
  timestamp ||= Time.current.strftime('%s').to_i
  paths = final_feed_paths(timestamp, feed_type)
  cleanup_old_feed_files(feed_type)
  File.write(paths[:local_json], json_data)

  BUSINESS_LOGGER.info(
    "#{self.class}: Generated JSON file for Google Reserve #{feed_type}",
    {
      feed_type: feed_type,
      file_path: paths[:local_json],
      timestamp: timestamp,
    },
  )

  compress_file(paths[:local_json], paths[:local_gz])
  upload_gzip_file_to_cloud(paths[:local_gz], paths[:cloud_gz])
  setup_sftp(paths[:local_json], feed_type)
  cleanup_file(paths[:local_json])
  cleanup_file(paths[:local_gz])
end

#setup_sftp(file_path, feed_type) ⇒ void

This method returns an undefined value.

Uploads feed to Google Reserve SFTP server.

Parameters:

  • file_path (String, Pathname)

    Local file path

  • feed_type (Symbol)

    :availability_feed, :merchant_feed, or :service_feed



398
399
400
401
402
403
404
405
406
407
408
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 398

def setup_sftp(file_path, feed_type)
  sftp_config = build_sftp_config(feed_type)

  perform_sftp_upload(file_path, sftp_config[:uri], sftp_config[:user], sftp_config[:port], sftp_config[:key])
  log_upload_success(file_path, sftp_config[:uri], sftp_config[:user], feed_type)
rescue StandardError => e
  if sftp_config
    log_upload_failure(e, file_path, sftp_config[:uri], sftp_config[:user], feed_type)
  end
  raise e
end

#upload_gzip_file_to_cloud(local_gzip_path, cloud_path) ⇒ void

This method returns an undefined value.

Uploads gzip to S3 with retry logic for transient errors.

Parameters:

  • local_gzip_path (String)

    Local gzip file path

  • cloud_path (String)

    S3 object key

Raises:

  • (StandardError)

    After retries exhausted



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 313

def upload_gzip_file_to_cloud(local_gzip_path, cloud_path)
  Retriable.retriable(
    tries: 3,
    base_interval: 1,
    multiplier: 2,
    on: [
      Aws::S3::Errors::XAmzContentSHA256Mismatch,
      Aws::S3::Errors::InternalError,
      Aws::S3::Errors::ServiceUnavailable,
      Aws::S3::Errors::SlowDown,
    ],
  ) do
    unless File.exist?(local_gzip_path) && File.size(local_gzip_path) > 0
      raise ArgumentError, "File does not exist or is empty: #{local_gzip_path}"
    end

    s3 = Aws::S3::Resource.new(region: Figaro.env.aws_region!)
    target_obj = s3.bucket(Figaro.env.AWS_DOCS_BUCKET!).object(cloud_path)
    file_size = File.size(local_gzip_path)
    BUSINESS_LOGGER.debug('Uploading gzip file to cloud storage',
                          local_path: local_gzip_path,
                          cloud_path: cloud_path,
                          file_size: file_size)

    target_obj.upload_file(local_gzip_path)

    BUSINESS_LOGGER.info('Successfully uploaded gzip file to cloud storage',
                         local_path: local_gzip_path,
                         cloud_path: cloud_path,
                         file_size: file_size)
  end
rescue StandardError => e
  error_context = {
    local_path: local_gzip_path,
    cloud_path: cloud_path,
    file_exists: File.exist?(local_gzip_path),
    file_size: File.exist?(local_gzip_path) ? File.size(local_gzip_path) : 0,
    error_class: e.class.name,
    error_message: e.message,
  }

  APMErrorHandler.report('Failed to upload gzip file to cloud storage',
                         exception: e,
                         context: error_context)
  raise e
end

#verify_gzip_integrity(gz_path) ⇒ void

This method returns an undefined value.

Verifies GZIP file integrity by attempting full decompression. This catches corrupted files before they're uploaded to S3.

Parameters:

  • gz_path (String)

    Path to GZIP file to verify

Raises:

  • (StandardError)

    If GZIP file is corrupted or incomplete



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 230

def verify_gzip_integrity(gz_path)
  BUSINESS_LOGGER.debug('Verifying GZIP integrity', file: gz_path)

  Zlib::GzipReader.open(gz_path) do |gz|
    # Read entire file in chunks to verify integrity
    # Discard data to keep memory usage low
    while gz.read(1024 * 1024)
      # Just reading verifies the GZIP stream is valid
    end
  end

  BUSINESS_LOGGER.debug('GZIP integrity verified', file: gz_path)
rescue Zlib::GzipFile::Error => e
  # Delete corrupted file
  File.delete(gz_path) if File.exist?(gz_path)
  raise StandardError, "GZIP integrity check failed - file corrupted: #{e.message}"
rescue StandardError => e
  # Delete corrupted file
  File.delete(gz_path) if File.exist?(gz_path)
  raise StandardError, "GZIP verification failed: #{e.message}"
end