From 8cf7a77808f69470b55faa762d172c34b5bbc9ff Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 29 Jul 2025 11:23:32 +0200 Subject: [PATCH] Fix async refresh never being finished when status cannot be fetched (#35500) --- app/controllers/api/v1/statuses_controller.rb | 6 +++++- .../concerns/status/fetch_replies_concern.rb | 2 +- app/models/worker_batch.rb | 15 +++++---------- .../activitypub/fetch_all_replies_service.rb | 2 +- app/services/activitypub/fetch_replies_service.rb | 7 ++----- .../activitypub/fetch_all_replies_worker.rb | 12 ++++++++++-- app/workers/fetch_reply_worker.rb | 2 +- spec/models/worker_batch_spec.rb | 10 +--------- 8 files changed, 26 insertions(+), 30 deletions(-) diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index f047ba60466..57977e14b8a 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -66,7 +66,11 @@ class Api::V1::StatusesController < Api::BaseController add_async_refresh_header(async_refresh) elsif !current_account.nil? && @status.should_fetch_replies? add_async_refresh_header(AsyncRefresh.create(refresh_key)) - ActivityPub::FetchAllRepliesWorker.perform_async(@status.id) + + WorkerBatch.new.within do |batch| + batch.connect(refresh_key, threshold: 1.0) + ActivityPub::FetchAllRepliesWorker.perform_async(@status.id, { 'batch_id' => batch.id }) + end end render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) diff --git a/app/models/concerns/status/fetch_replies_concern.rb b/app/models/concerns/status/fetch_replies_concern.rb index cc117cb5ac6..7ab46481747 100644 --- a/app/models/concerns/status/fetch_replies_concern.rb +++ b/app/models/concerns/status/fetch_replies_concern.rb @@ -33,7 +33,7 @@ module Status::FetchRepliesConcern def should_fetch_replies? # we aren't brand new, and we haven't fetched replies since the debounce window - !local? && created_at <= FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago && ( + !local? && distributable? && created_at <= FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago && ( fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_COOLDOWN_MINUTES.ago ) end diff --git a/app/models/worker_batch.rb b/app/models/worker_batch.rb index 0a22ce61419..ab9d8d457b2 100644 --- a/app/models/worker_batch.rb +++ b/app/models/worker_batch.rb @@ -24,7 +24,7 @@ class WorkerBatch begin Thread.current[:batch] = self - yield + yield(self) ensure Thread.current[:batch] = nil end @@ -33,10 +33,7 @@ class WorkerBatch # Add jobs to the batch. Usually when the batch is created. # @param [Array] jids def add_jobs(jids) - if jids.blank? - finish! - return - end + return if jids.empty? redis.multi do |pipeline| pipeline.sadd(key('jobs'), jids) @@ -48,7 +45,7 @@ class WorkerBatch # Remove a job from the batch, such as when it's been processed or it has failed. # @param [String] jid - def remove_job(jid) + def remove_job(jid, increment: false) _, pending, processed, async_refresh_key, threshold = redis.multi do |pipeline| pipeline.srem(key('jobs'), jid) pipeline.hincrby(key, 'pending', -1) @@ -57,10 +54,8 @@ class WorkerBatch pipeline.hget(key, 'threshold') end - if async_refresh_key.present? - async_refresh = AsyncRefresh.new(async_refresh_key) - async_refresh.increment_result_count(by: 1) - end + async_refresh = AsyncRefresh.new(async_refresh_key) if async_refresh_key.present? + async_refresh&.increment_result_count(by: 1) if increment if pending.zero? || processed >= (threshold || 1.0).to_f * (processed + pending) async_refresh&.finish! diff --git a/app/services/activitypub/fetch_all_replies_service.rb b/app/services/activitypub/fetch_all_replies_service.rb index e9c1712ed66..b771b845265 100644 --- a/app/services/activitypub/fetch_all_replies_service.rb +++ b/app/services/activitypub/fetch_all_replies_service.rb @@ -6,7 +6,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService # Limit of replies to fetch per status MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i - def call(status_uri, collection_or_uri, max_pages: 1, async_refresh_key: nil, request_id: nil) + def call(status_uri, collection_or_uri, max_pages: 1, batch_id: nil, request_id: nil) @status_uri = status_uri super diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 239df0ba584..327c88d846d 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -6,7 +6,7 @@ class ActivityPub::FetchRepliesService < BaseService # Limit of fetched replies MAX_REPLIES = 5 - def call(reference_uri, collection_or_uri, max_pages: 1, allow_synchronous_requests: true, async_refresh_key: nil, request_id: nil) + def call(reference_uri, collection_or_uri, max_pages: 1, allow_synchronous_requests: true, batch_id: nil, request_id: nil) @reference_uri = reference_uri @allow_synchronous_requests = allow_synchronous_requests @@ -15,10 +15,7 @@ class ActivityPub::FetchRepliesService < BaseService @items = filter_replies(@items) - batch = WorkerBatch.new - batch.connect(async_refresh_key) if async_refresh_key.present? - batch.finish! if @items.empty? - batch.within do + WorkerBatch.new(batch_id).within do |batch| FetchReplyWorker.push_bulk(@items) do |reply_uri| [reply_uri, { 'request_id' => request_id, 'batch_id' => batch.id }] end diff --git a/app/workers/activitypub/fetch_all_replies_worker.rb b/app/workers/activitypub/fetch_all_replies_worker.rb index ab9eebc4ec7..14142b9cd5a 100644 --- a/app/workers/activitypub/fetch_all_replies_worker.rb +++ b/app/workers/activitypub/fetch_all_replies_worker.rb @@ -16,7 +16,9 @@ class ActivityPub::FetchAllRepliesWorker MAX_PAGES = (ENV['FETCH_REPLIES_MAX_PAGES'] || 500).to_i def perform(root_status_id, options = {}) + @batch = WorkerBatch.new(options['batch_id']) @root_status = Status.remote.find_by(id: root_status_id) + return unless @root_status&.should_fetch_replies? @root_status.touch(:fetched_replies_at) @@ -45,6 +47,8 @@ class ActivityPub::FetchAllRepliesWorker # Workers shouldn't be returning anything, but this is used in tests fetched_uris + ensure + @batch.remove_job(jid) end private @@ -53,9 +57,10 @@ class ActivityPub::FetchAllRepliesWorker # status URI, or the prefetched body of the Note object def get_replies(status, max_pages, options = {}) replies_collection_or_uri = get_replies_uri(status) + return if replies_collection_or_uri.nil? - ActivityPub::FetchAllRepliesService.new.call(value_or_id(status), replies_collection_or_uri, max_pages: max_pages, async_refresh_key: "context:#{@root_status.id}:refresh", **options.deep_symbolize_keys) + ActivityPub::FetchAllRepliesService.new.call(value_or_id(status), replies_collection_or_uri, max_pages: max_pages, **options.deep_symbolize_keys) end # Get the URI of the replies collection of a status @@ -78,9 +83,12 @@ class ActivityPub::FetchAllRepliesWorker # @param root_status_uri [String] def get_root_replies(root_status_uri, options = {}) root_status_body = fetch_resource(root_status_uri, true) + return if root_status_body.nil? - FetchReplyWorker.perform_async(root_status_uri, { **options.deep_stringify_keys, 'prefetched_body' => root_status_body }) + @batch.within do + FetchReplyWorker.perform_async(root_status_uri, { **options.deep_stringify_keys, 'prefetched_body' => root_status_body }) + end get_replies(root_status_body, MAX_PAGES, options) end diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index da3b9a8c131..42e38dc682f 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -10,6 +10,6 @@ class FetchReplyWorker batch = WorkerBatch.new(options.delete('batch_id')) if options['batch_id'] FetchRemoteStatusService.new.call(child_url, **options.symbolize_keys) ensure - batch&.remove_job(jid) + batch&.remove_job(jid, increment: true) end end diff --git a/spec/models/worker_batch_spec.rb b/spec/models/worker_batch_spec.rb index b58dc48618a..7c6b8aa8c6d 100644 --- a/spec/models/worker_batch_spec.rb +++ b/spec/models/worker_batch_spec.rb @@ -42,14 +42,6 @@ RSpec.describe WorkerBatch do it 'does not persist the job IDs' do expect(subject.jobs).to eq [] end - - context 'when async refresh is connected' do - let(:async_refresh) { AsyncRefresh.new(async_refresh_key) } - - it 'immediately marks the async refresh as finished' do - expect(async_refresh.reload.finished?).to be true - end - end end context 'when called with an array of job IDs' do @@ -71,7 +63,7 @@ RSpec.describe WorkerBatch do before do subject.connect(async_refresh_key, threshold: 0.5) if async_refresh.present? subject.add_jobs(%w(foo bar baz)) - subject.remove_job('foo') + subject.remove_job('foo', increment: true) end it 'removes the job from pending jobs' do