Fix async refresh never being finished when status cannot be fetched (#35500)

This commit is contained in:
Eugen Rochko 2025-07-29 11:23:32 +02:00 committed by GitHub
parent d121007927
commit 8cf7a77808
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 26 additions and 30 deletions

View File

@ -66,7 +66,11 @@ class Api::V1::StatusesController < Api::BaseController
add_async_refresh_header(async_refresh)
elsif !current_account.nil? && @status.should_fetch_replies?
add_async_refresh_header(AsyncRefresh.create(refresh_key))
ActivityPub::FetchAllRepliesWorker.perform_async(@status.id)
WorkerBatch.new.within do |batch|
batch.connect(refresh_key, threshold: 1.0)
ActivityPub::FetchAllRepliesWorker.perform_async(@status.id, { 'batch_id' => batch.id })
end
end
render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id)

View File

@ -33,7 +33,7 @@ module Status::FetchRepliesConcern
def should_fetch_replies?
# we aren't brand new, and we haven't fetched replies since the debounce window
!local? && created_at <= FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago && (
!local? && distributable? && created_at <= FETCH_REPLIES_INITIAL_WAIT_MINUTES.ago && (
fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_COOLDOWN_MINUTES.ago
)
end

View File

@ -24,7 +24,7 @@ class WorkerBatch
begin
Thread.current[:batch] = self
yield
yield(self)
ensure
Thread.current[:batch] = nil
end
@ -33,10 +33,7 @@ class WorkerBatch
# Add jobs to the batch. Usually when the batch is created.
# @param [Array<String>] jids
def add_jobs(jids)
if jids.blank?
finish!
return
end
return if jids.empty?
redis.multi do |pipeline|
pipeline.sadd(key('jobs'), jids)
@ -48,7 +45,7 @@ class WorkerBatch
# Remove a job from the batch, such as when it's been processed or it has failed.
# @param [String] jid
def remove_job(jid)
def remove_job(jid, increment: false)
_, pending, processed, async_refresh_key, threshold = redis.multi do |pipeline|
pipeline.srem(key('jobs'), jid)
pipeline.hincrby(key, 'pending', -1)
@ -57,10 +54,8 @@ class WorkerBatch
pipeline.hget(key, 'threshold')
end
if async_refresh_key.present?
async_refresh = AsyncRefresh.new(async_refresh_key)
async_refresh.increment_result_count(by: 1)
end
async_refresh = AsyncRefresh.new(async_refresh_key) if async_refresh_key.present?
async_refresh&.increment_result_count(by: 1) if increment
if pending.zero? || processed >= (threshold || 1.0).to_f * (processed + pending)
async_refresh&.finish!

View File

@ -6,7 +6,7 @@ class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService
# Limit of replies to fetch per status
MAX_REPLIES = (ENV['FETCH_REPLIES_MAX_SINGLE'] || 500).to_i
def call(status_uri, collection_or_uri, max_pages: 1, async_refresh_key: nil, request_id: nil)
def call(status_uri, collection_or_uri, max_pages: 1, batch_id: nil, request_id: nil)
@status_uri = status_uri
super

View File

@ -6,7 +6,7 @@ class ActivityPub::FetchRepliesService < BaseService
# Limit of fetched replies
MAX_REPLIES = 5
def call(reference_uri, collection_or_uri, max_pages: 1, allow_synchronous_requests: true, async_refresh_key: nil, request_id: nil)
def call(reference_uri, collection_or_uri, max_pages: 1, allow_synchronous_requests: true, batch_id: nil, request_id: nil)
@reference_uri = reference_uri
@allow_synchronous_requests = allow_synchronous_requests
@ -15,10 +15,7 @@ class ActivityPub::FetchRepliesService < BaseService
@items = filter_replies(@items)
batch = WorkerBatch.new
batch.connect(async_refresh_key) if async_refresh_key.present?
batch.finish! if @items.empty?
batch.within do
WorkerBatch.new(batch_id).within do |batch|
FetchReplyWorker.push_bulk(@items) do |reply_uri|
[reply_uri, { 'request_id' => request_id, 'batch_id' => batch.id }]
end

View File

@ -16,7 +16,9 @@ class ActivityPub::FetchAllRepliesWorker
MAX_PAGES = (ENV['FETCH_REPLIES_MAX_PAGES'] || 500).to_i
def perform(root_status_id, options = {})
@batch = WorkerBatch.new(options['batch_id'])
@root_status = Status.remote.find_by(id: root_status_id)
return unless @root_status&.should_fetch_replies?
@root_status.touch(:fetched_replies_at)
@ -45,6 +47,8 @@ class ActivityPub::FetchAllRepliesWorker
# Workers shouldn't be returning anything, but this is used in tests
fetched_uris
ensure
@batch.remove_job(jid)
end
private
@ -53,9 +57,10 @@ class ActivityPub::FetchAllRepliesWorker
# status URI, or the prefetched body of the Note object
def get_replies(status, max_pages, options = {})
replies_collection_or_uri = get_replies_uri(status)
return if replies_collection_or_uri.nil?
ActivityPub::FetchAllRepliesService.new.call(value_or_id(status), replies_collection_or_uri, max_pages: max_pages, async_refresh_key: "context:#{@root_status.id}:refresh", **options.deep_symbolize_keys)
ActivityPub::FetchAllRepliesService.new.call(value_or_id(status), replies_collection_or_uri, max_pages: max_pages, **options.deep_symbolize_keys)
end
# Get the URI of the replies collection of a status
@ -78,9 +83,12 @@ class ActivityPub::FetchAllRepliesWorker
# @param root_status_uri [String]
def get_root_replies(root_status_uri, options = {})
root_status_body = fetch_resource(root_status_uri, true)
return if root_status_body.nil?
FetchReplyWorker.perform_async(root_status_uri, { **options.deep_stringify_keys, 'prefetched_body' => root_status_body })
@batch.within do
FetchReplyWorker.perform_async(root_status_uri, { **options.deep_stringify_keys, 'prefetched_body' => root_status_body })
end
get_replies(root_status_body, MAX_PAGES, options)
end

View File

@ -10,6 +10,6 @@ class FetchReplyWorker
batch = WorkerBatch.new(options.delete('batch_id')) if options['batch_id']
FetchRemoteStatusService.new.call(child_url, **options.symbolize_keys)
ensure
batch&.remove_job(jid)
batch&.remove_job(jid, increment: true)
end
end

View File

@ -42,14 +42,6 @@ RSpec.describe WorkerBatch do
it 'does not persist the job IDs' do
expect(subject.jobs).to eq []
end
context 'when async refresh is connected' do
let(:async_refresh) { AsyncRefresh.new(async_refresh_key) }
it 'immediately marks the async refresh as finished' do
expect(async_refresh.reload.finished?).to be true
end
end
end
context 'when called with an array of job IDs' do
@ -71,7 +63,7 @@ RSpec.describe WorkerBatch do
before do
subject.connect(async_refresh_key, threshold: 0.5) if async_refresh.present?
subject.add_jobs(%w(foo bar baz))
subject.remove_job('foo')
subject.remove_job('foo', increment: true)
end
it 'removes the job from pending jobs' do