Module: VendorsService::GoogleReserve::Concerns::FeedUploader
- Extended by:
- ActiveSupport::Concern
- Included in:
- AvailabilityFeeds::ProcessorService, AvailabilityFeeds::SchedulerService, AvailabilityFeeds::UploaderService, MerchantFeedsService, ServiceFeedsService
- Defined in:
- app/services/vendors_service/google_reserve/concerns/feed_uploader.rb
Overview
rubocop:disable Metrics/ModuleLength, Metrics/MethodLength, Metrics/AbcSize
Constant Summary collapse
- UPLOAD_PATH_FILES =
Cloud storage paths for upload operations
'vendors/rwg_feeds/'- UPLOAD_PATH_FRAGMENTS =
'vendors/rwg_feeds/availability_fragments/'- AVAILABILITY_FILE_NAME =
Base filename patterns
'google-reserve-availability-feed'- AVAILABILITY_FRAGMENT_NAME =
'rwg-availability-fragment'- MERCHANT_FILE_NAME =
'google-reserve-merchant-feed'- SERVICE_FILE_NAME =
'google-reserve-service-feed'- LOCAL_PATH =
Local temporary directory for file processing
'tmp/rwg_feeds'- AVAILABILITY_FEED_TYPE =
Feed type identifier for SFTP uploads
:availability_feed
Instance Method Summary collapse
- #cleanup_file(file_path) ⇒ void
-
#compress_file(json_path, gz_path) ⇒ void
Compresses JSON to gzip with integrity validation.
-
#compress_file_streaming(json_path, gz_path) ⇒ void
Compresses large JSON using 1MB chunk streaming with proper error handling.
-
#decompress_file(gz_path) ⇒ String
Decompresses gzip file with adaptive streaming for large files.
-
#download_gzip_file_from_cloud(download_file_path, local_file_path) ⇒ void
Downloads gzip from S3 with retry logic for rate limiting (critical at 2000+ scale).
-
#final_feed_paths(timestamp, feed_type = :availability_feed) ⇒ Hash{Symbol => String}
Generates all file paths for final aggregated feed.
-
#fragment_paths(restaurant_id) ⇒ Hash{Symbol => String}
Generates all file paths for a restaurant fragment.
-
#generate_and_upload_feed_file(json_data, feed_type, timestamp = nil) ⇒ void
Complete workflow: generate file, compress, upload to S3 and SFTP, cleanup.
-
#setup_sftp(file_path, feed_type) ⇒ void
Uploads feed to Google Reserve SFTP server.
-
#upload_gzip_file_to_cloud(local_gzip_path, cloud_path) ⇒ void
Uploads gzip to S3 with retry logic for transient errors.
-
#verify_gzip_integrity(gz_path) ⇒ void
Verifies GZIP file integrity by attempting full decompression.
Instance Method Details
#cleanup_file(file_path) ⇒ void
This method returns an undefined value.
113 114 115 |
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 113 def cleanup_file(file_path) File.delete(file_path) if File.exist?(file_path) end |
#compress_file(json_path, gz_path) ⇒ void
This method returns an undefined value.
Compresses JSON to gzip with integrity validation. Auto-switches to streaming for files >100MB.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 125 def compress_file(json_path, gz_path) unless File.exist?(json_path) && File.size(json_path) > 0 raise ArgumentError, "Source file does not exist or is empty: #{json_path}" end file_size_mb = File.size(json_path).to_f / (1024.0 * 1024.0) if file_size_mb > 100 BUSINESS_LOGGER.info('Using streaming compression for large file', file_size_mb: file_size_mb.round(2)) return compress_file_streaming(json_path, gz_path) end File.open(json_path, 'rb') do |src| Zlib::GzipWriter.open(gz_path) do |gz| IO.copy_stream(src, gz) end end # Verify compressed file was created successfully unless File.exist?(gz_path) && File.size(gz_path) > 0 raise StandardError, "Failed to create compressed file: #{gz_path}" end begin Zlib::GzipReader.open(gz_path) do |gz| chunk_size = 1024 * 1024 # 1MB # Read chunks and discard; use the return value rather than # passing an outbuf because some Zlib::GzipReader#read # implementations accept only one argument. while gz.read(chunk_size) # no-op: reading and discarding verifies integrity while keeping # memory usage bounded end end rescue Zlib::GzipFile::Error => e raise StandardError, "Compression integrity check failed: #{e.}" end end |
#compress_file_streaming(json_path, gz_path) ⇒ void
This method returns an undefined value.
Compresses large JSON using 1MB chunk streaming with proper error handling. Uses explicit file handles instead of block form to ensure proper GZIP finalization. This prevents corrupted GZIP files from being created when worker is killed.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 171 def compress_file_streaming(json_path, gz_path) chunk_size = 1024 * 1024 src_file = nil gz_file = nil gz_writer = nil begin # Open files explicitly to ensure proper closure order src_file = File.open(json_path, 'rb') gz_file = File.open(gz_path, 'wb') gz_writer = Zlib::GzipWriter.new(gz_file) # Stream data in chunks while (chunk = src_file.read(chunk_size)) gz_writer.write(chunk) end # CRITICAL: Explicitly finalize GZIP stream # This ensures proper trailer is written even if worker is killed gz_writer.flush gz_writer.finish rescue StandardError => e # Cleanup corrupted file on any error File.delete(gz_path) if gz_path && File.exist?(gz_path) raise StandardError, "Compression failed: #{e.}" ensure # Critical: Close in the correct order - gzip writer first, then file handle begin gz_writer&.close unless gz_writer&.closed? rescue StandardError => e BUSINESS_LOGGER.warn('Error closing gzip writer', error: e.) end begin gz_file&.close unless gz_file&.closed? rescue StandardError nil end begin src_file&.close unless src_file&.closed? rescue StandardError nil end end # Verify compressed file was created unless File.exist?(gz_path) && File.size(gz_path) > 0 raise StandardError, "Failed to create compressed file: #{gz_path}" end # CRITICAL: Verify GZIP integrity before allowing upload verify_gzip_integrity(gz_path) end |
#decompress_file(gz_path) ⇒ String
Decompresses gzip file with adaptive streaming for large files. Uses simple read for <10MB compressed, streaming for larger files.
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 257 def decompress_file(gz_path) compressed_size_mb = File.size(gz_path).to_f / (1024.0 * 1024.0) if compressed_size_mb < 10 return Zlib::GzipReader.open(gz_path, &:read) end BUSINESS_LOGGER.debug('Using streaming decompression for large file', compressed_size_mb: compressed_size_mb.round(2)) decompressed_content = String.new chunk_size = 1024 * 1024 Zlib::GzipReader.open(gz_path) do |gz| while (chunk = gz.read(chunk_size)) decompressed_content << chunk end end decompressed_content end |
#download_gzip_file_from_cloud(download_file_path, local_file_path) ⇒ void
This method returns an undefined value.
Downloads gzip from S3 with retry logic for rate limiting (critical at 2000+ scale).
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 366 def download_gzip_file_from_cloud(download_file_path, local_file_path) FileUtils.mkdir_p(File.dirname(local_file_path)) Retriable.retriable( tries: 3, base_interval: 1, multiplier: 2, on: [ Aws::S3::Errors::ServiceError, Aws::S3::Errors::InternalError, Aws::S3::Errors::ServiceUnavailable, Aws::S3::Errors::SlowDown, Net::OpenTimeout, Net::ReadTimeout, ], ) do s3 = Aws::S3::Resource.new(region: Figaro.env.aws_region!) source_obj = s3.bucket(Figaro.env.AWS_DOCS_BUCKET!).object(download_file_path) source_obj.download_file(local_file_path) end rescue StandardError => e APMErrorHandler.report('Failed to download gzip file from cloud storage', exception: e, context: { download_path: download_file_path, local_path: local_file_path }) raise e end |
#final_feed_paths(timestamp, feed_type = :availability_feed) ⇒ Hash{Symbol => String}
Generates all file paths for final aggregated feed.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 89 def final_feed_paths(, feed_type = :availability_feed) base_name = case feed_type when :merchant_feed MERCHANT_FILE_NAME when :service_feed SERVICE_FILE_NAME when :availability_feed AVAILABILITY_FILE_NAME else raise ArgumentError, "Unknown feed type: #{feed_type}" end filename = "#{base_name}-#{}.json" FileUtils.mkdir_p(LOCAL_PATH) unless Dir.exist?(LOCAL_PATH) { local_json: File.join(LOCAL_PATH, filename), local_gz: File.join(LOCAL_PATH, "#{filename}.gz"), cloud_gz: "#{UPLOAD_PATH_FILES}#{filename}.gz", } end |
#fragment_paths(restaurant_id) ⇒ Hash{Symbol => String}
Generates all file paths for a restaurant fragment. Uses consistent timestamp across multiple calls to avoid path mismatches.
72 73 74 75 76 77 78 79 80 81 82 |
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 72 def fragment_paths(restaurant_id) @fragment_timestamp ||= Time.current.strftime('%s').to_i filename = "#{AVAILABILITY_FRAGMENT_NAME}-restaurant-#{restaurant_id}-#{@fragment_timestamp}.json" FileUtils.mkdir_p(LOCAL_PATH) unless Dir.exist?(LOCAL_PATH) { local_json: File.join(LOCAL_PATH, filename), local_gz: File.join(LOCAL_PATH, "#{filename}.gz"), cloud_gz: "#{UPLOAD_PATH_FRAGMENTS}#{filename}.gz", } end |
#generate_and_upload_feed_file(json_data, feed_type, timestamp = nil) ⇒ void
This method returns an undefined value.
Complete workflow: generate file, compress, upload to S3 and SFTP, cleanup.
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 285 def generate_and_upload_feed_file(json_data, feed_type, = nil) ||= Time.current.strftime('%s').to_i paths = final_feed_paths(, feed_type) cleanup_old_feed_files(feed_type) File.write(paths[:local_json], json_data) BUSINESS_LOGGER.info( "#{self.class}: Generated JSON file for Google Reserve #{feed_type}", { feed_type: feed_type, file_path: paths[:local_json], timestamp: , }, ) compress_file(paths[:local_json], paths[:local_gz]) upload_gzip_file_to_cloud(paths[:local_gz], paths[:cloud_gz]) setup_sftp(paths[:local_json], feed_type) cleanup_file(paths[:local_json]) cleanup_file(paths[:local_gz]) end |
#setup_sftp(file_path, feed_type) ⇒ void
This method returns an undefined value.
Uploads feed to Google Reserve SFTP server.
398 399 400 401 402 403 404 405 406 407 408 |
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 398 def setup_sftp(file_path, feed_type) sftp_config = build_sftp_config(feed_type) perform_sftp_upload(file_path, sftp_config[:uri], sftp_config[:user], sftp_config[:port], sftp_config[:key]) log_upload_success(file_path, sftp_config[:uri], sftp_config[:user], feed_type) rescue StandardError => e if sftp_config log_upload_failure(e, file_path, sftp_config[:uri], sftp_config[:user], feed_type) end raise e end |
#upload_gzip_file_to_cloud(local_gzip_path, cloud_path) ⇒ void
This method returns an undefined value.
Uploads gzip to S3 with retry logic for transient errors.
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 313 def upload_gzip_file_to_cloud(local_gzip_path, cloud_path) Retriable.retriable( tries: 3, base_interval: 1, multiplier: 2, on: [ Aws::S3::Errors::XAmzContentSHA256Mismatch, Aws::S3::Errors::InternalError, Aws::S3::Errors::ServiceUnavailable, Aws::S3::Errors::SlowDown, ], ) do unless File.exist?(local_gzip_path) && File.size(local_gzip_path) > 0 raise ArgumentError, "File does not exist or is empty: #{local_gzip_path}" end s3 = Aws::S3::Resource.new(region: Figaro.env.aws_region!) target_obj = s3.bucket(Figaro.env.AWS_DOCS_BUCKET!).object(cloud_path) file_size = File.size(local_gzip_path) BUSINESS_LOGGER.debug('Uploading gzip file to cloud storage', local_path: local_gzip_path, cloud_path: cloud_path, file_size: file_size) target_obj.upload_file(local_gzip_path) BUSINESS_LOGGER.info('Successfully uploaded gzip file to cloud storage', local_path: local_gzip_path, cloud_path: cloud_path, file_size: file_size) end rescue StandardError => e error_context = { local_path: local_gzip_path, cloud_path: cloud_path, file_exists: File.exist?(local_gzip_path), file_size: File.exist?(local_gzip_path) ? File.size(local_gzip_path) : 0, error_class: e.class.name, error_message: e., } APMErrorHandler.report('Failed to upload gzip file to cloud storage', exception: e, context: error_context) raise e end |
#verify_gzip_integrity(gz_path) ⇒ void
This method returns an undefined value.
Verifies GZIP file integrity by attempting full decompression. This catches corrupted files before they're uploaded to S3.
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'app/services/vendors_service/google_reserve/concerns/feed_uploader.rb', line 230 def verify_gzip_integrity(gz_path) BUSINESS_LOGGER.debug('Verifying GZIP integrity', file: gz_path) Zlib::GzipReader.open(gz_path) do |gz| # Read entire file in chunks to verify integrity # Discard data to keep memory usage low while gz.read(1024 * 1024) # Just reading verifies the GZIP stream is valid end end BUSINESS_LOGGER.debug('GZIP integrity verified', file: gz_path) rescue Zlib::GzipFile::Error => e # Delete corrupted file File.delete(gz_path) if File.exist?(gz_path) raise StandardError, "GZIP integrity check failed - file corrupted: #{e.}" rescue StandardError => e # Delete corrupted file File.delete(gz_path) if File.exist?(gz_path) raise StandardError, "GZIP verification failed: #{e.}" end |