Class: Tagging::RestaurantTagToRestaurantsWorker

Inherits:
ApplicationWorker show all
Includes:
DistanceHelper
Defined in:
app/workers/tagging/restaurant_tag_to_restaurants_worker.rb

Instance Method Summary collapse

Methods included from DistanceHelper

#decide_primary_tag, #find_distance_between

Methods inherited from ApplicationWorker

unlimited_retry

Instance Method Details

#perform(restaurant_tag_id) ⇒ Object

This worker will tag all restaurants within defined distance from RestaurantTag Coordinates



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'app/workers/tagging/restaurant_tag_to_restaurants_worker.rb', line 9

def perform(restaurant_tag_id)
  restaurant_tag = RestaurantTag.find(restaurant_tag_id)

  max_distance = restaurant_tag.radius
  affected_restaurants = Set.new

  ActiveRecord::Base.transaction do
    restaurant_tag.lock!

    # Capture current tag associations
    current_restaurant_ids = restaurant_tag.restaurant_tags_restaurants.pluck(:restaurant_id)

    # Step 1: find distance to each restaurant and tag if within radius
    restaurant_data = Restaurant.active.not_expired.where(city_id: restaurant_tag.city_id,
                                                          is_skip_auto_tag_location: false).find_each.map do |restaurant|
                        distance = find_distance_between(restaurant: restaurant, restaurant_tag: restaurant_tag)
                        {
                          restaurant: restaurant,
                          distance: distance,
                        }
                      end.select { |restaurant| restaurant[:distance].present? }

    valid_restaurants = restaurant_data.select do |data|
      distance = data[:distance]
      distance <= max_distance
    end

    if valid_restaurants.present?
      restaurant_tags = valid_restaurants.map do |data|
        RestaurantTagsRestaurant.new(
          restaurant_tag_id: restaurant_tag.id, restaurant_id: data[:restaurant].id, distance_to_restaurant: data[:distance],
        )
      end

      # Step 2: store the restaurant data that are getting affected by this operation
      restaurant_tag.restaurants.each do |restaurant|
        affected_restaurants.add restaurant.id
      end

      # Step 3: delete old primary tag and secondary tags
      PrimaryTag.where(restaurant_tag_id: restaurant_tag.id).delete_all
      RestaurantTagsRestaurant.where(restaurant_tag_id: restaurant_tag.id).delete_all

      # Determine added and removed tags
      new_restaurant_ids = restaurant_tags.map(&:restaurant_id)
      added_restaurant_ids = new_restaurant_ids - current_restaurant_ids
      removed_restaurant_ids = current_restaurant_ids - new_restaurant_ids

      # Step 4: Insert the new data
      RestaurantTagsRestaurant.import! restaurant_tags,
                                       on_duplicate_key_update: %i[restaurant_id restaurant_tag_id], raise_error: true

      # Step 5: Send update events for added and removed tags to Kafka
      EventDrivenWorkers::HhSearch::ProducerWorker.perform_async(
        EventDrivenClient::Constants::RESTAURANTS_TAGS_TOPIC,
        EventDrivenClient::Constants::UPDATE_EVENT,
        restaurant_tag.id,
        { add: added_restaurant_ids, remove: removed_restaurant_ids },
      )
    end
  end

  restaurant_tag.restaurants.reload
  restaurant_tag.restaurants.each do |restaurant|
    affected_restaurants.add restaurant.id
  end

  # Decide primary location for each affected restaurant
  affected_restaurants.to_a.each do |restaurant_id|
    decide_primary_tag(restaurant_id)
    CleanCompactRestaurantCacheWorker.perform_async(restaurant_id)
  end
  Tagging::SyncRestaurantTagsTotalsTableWorker.perform_async([], [restaurant_tag_id])
end