From 018e5e303fd85f05ec7684fb4f1152b1ac6d9794 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Mon, 28 Jul 2025 10:20:12 +0200 Subject: [PATCH] Fix jobs being added to batch after they might already execute (#35496) --- app/models/worker_batch.rb | 40 +++++++++++++++---- .../activitypub/fetch_replies_service.rb | 7 +++- config/initializers/sidekiq.rb | 6 +-- lib/mastodon/worker_batch_middleware.rb | 11 +++++ 4 files changed, 52 insertions(+), 12 deletions(-) create mode 100644 lib/mastodon/worker_batch_middleware.rb diff --git a/app/models/worker_batch.rb b/app/models/worker_batch.rb index f741071ba95..0a22ce61419 100644 --- a/app/models/worker_batch.rb +++ b/app/models/worker_batch.rb @@ -19,17 +19,22 @@ class WorkerBatch redis.hset(key, { 'async_refresh_key' => async_refresh_key, 'threshold' => threshold }) end + def within + raise NoBlockGivenError unless block_given? + + begin + Thread.current[:batch] = self + yield + ensure + Thread.current[:batch] = nil + end + end + # Add jobs to the batch. Usually when the batch is created. # @param [Array] jids def add_jobs(jids) if jids.blank? - async_refresh_key = redis.hget(key, 'async_refresh_key') - - if async_refresh_key.present? - async_refresh = AsyncRefresh.new(async_refresh_key) - async_refresh.finish! - end - + finish! return end @@ -55,8 +60,23 @@ class WorkerBatch if async_refresh_key.present? async_refresh = AsyncRefresh.new(async_refresh_key) async_refresh.increment_result_count(by: 1) - async_refresh.finish! if pending.zero? || processed >= threshold.to_f * (processed + pending) end + + if pending.zero? || processed >= (threshold || 1.0).to_f * (processed + pending) + async_refresh&.finish! + cleanup + end + end + + def finish! + async_refresh_key = redis.hget(key, 'async_refresh_key') + + if async_refresh_key.present? + async_refresh = AsyncRefresh.new(async_refresh_key) + async_refresh.finish! + end + + cleanup end # Get pending jobs. @@ -76,4 +96,8 @@ class WorkerBatch def key(suffix = nil) "worker_batch:#{@id}#{":#{suffix}" if suffix}" end + + def cleanup + redis.del(key, key('jobs')) + end end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 25eb275ca5c..239df0ba584 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -17,7 +17,12 @@ class ActivityPub::FetchRepliesService < BaseService batch = WorkerBatch.new batch.connect(async_refresh_key) if async_refresh_key.present? - batch.add_jobs(FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'batch_id' => batch.id }] }) + batch.finish! if @items.empty? + batch.within do + FetchReplyWorker.push_bulk(@items) do |reply_uri| + [reply_uri, { 'request_id' => request_id, 'batch_id' => batch.id }] + end + end [@items, n_pages] end diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index 3c2f12780c0..7edaf38a60a 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require_relative '../../lib/mastodon/sidekiq_middleware' +require_relative '../../lib/mastodon/worker_batch_middleware' Sidekiq.configure_server do |config| config.redis = REDIS_CONFIGURATION.sidekiq @@ -72,14 +73,12 @@ Sidekiq.configure_server do |config| config.server_middleware do |chain| chain.add Mastodon::SidekiqMiddleware - end - - config.server_middleware do |chain| chain.add SidekiqUniqueJobs::Middleware::Server end config.client_middleware do |chain| chain.add SidekiqUniqueJobs::Middleware::Client + chain.add Mastodon::WorkerBatchMiddleware end config.on(:startup) do @@ -105,6 +104,7 @@ Sidekiq.configure_client do |config| config.client_middleware do |chain| chain.add SidekiqUniqueJobs::Middleware::Client + chain.add Mastodon::WorkerBatchMiddleware end config.logger.level = Logger.const_get(ENV.fetch('RAILS_LOG_LEVEL', 'info').upcase.to_s) diff --git a/lib/mastodon/worker_batch_middleware.rb b/lib/mastodon/worker_batch_middleware.rb new file mode 100644 index 00000000000..c4623013327 --- /dev/null +++ b/lib/mastodon/worker_batch_middleware.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class Mastodon::WorkerBatchMiddleware + def call(_worker, msg, _queue, _redis_pool = nil) + if (batch = Thread.current[:batch]) + batch.add_jobs([msg['jid']]) + end + + yield + end +end