This commit is contained in:
Jonny Saunders 2025-05-06 15:05:48 +00:00 committed by GitHub
commit 0109593ace
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 35 additions and 9 deletions

View File

@ -22,7 +22,7 @@ class ActivityPub::FetchAllRepliesWorker
@root_status.touch(:fetched_replies_at)
Rails.logger.debug { "FetchAllRepliesWorker - #{@root_status.uri}: Fetching all replies for status: #{@root_status}" }
uris_to_fetch, n_pages = get_replies(@root_status.uri, MAX_PAGES, options)
uris_to_fetch, n_pages = get_root_replies(@root_status.uri, options)
return if uris_to_fetch.nil?
fetched_uris = uris_to_fetch.clone.to_set
@ -49,20 +49,39 @@ class ActivityPub::FetchAllRepliesWorker
private
def get_replies(status_uri, max_pages, options = {})
replies_collection_or_uri = get_replies_uri(status_uri)
# @param status [String, Hash]
# status URI, or the prefetched body of the Note object
def get_replies(status, max_pages, options = {})
replies_collection_or_uri = get_replies_uri(status)
return if replies_collection_or_uri.nil?
ActivityPub::FetchAllRepliesService.new.call(status_uri, replies_collection_or_uri, max_pages: max_pages, **options.deep_symbolize_keys)
ActivityPub::FetchAllRepliesService.new.call(value_or_id(status), replies_collection_or_uri, max_pages: max_pages, **options.deep_symbolize_keys)
end
def get_replies_uri(parent_status_uri)
fetch_resource(parent_status_uri, true)&.fetch('replies', nil)
# Get the URI of the replies collection of a status
#
# @param parent_status [String, Hash]
# status URI, or the prefetched body of the Note object
def get_replies_uri(parent_status)
resource = parent_status.is_a?(Hash) ? parent_status : fetch_resource(parent_status, true)
resource&.fetch('replies', nil)
rescue => e
Rails.logger.info { "FetchAllRepliesWorker - #{@root_status.uri}: Caught exception while resolving replies URI #{parent_status_uri}: #{e} - #{e.message}" }
Rails.logger.info { "FetchAllRepliesWorker - #{@root_status.uri}: Caught exception while resolving replies URI #{parent_status}: #{e} - #{e.message}" }
# Raise if we can't get the collection for top-level status to trigger retry
raise e if parent_status_uri == @root_status.uri
raise e if value_or_id(parent_status) == @root_status.uri
nil
end
# Get the root status, updating the status without fetching it twice
#
# @param root_status_uri [String]
def get_root_replies(root_status_uri, options = {})
root_status_body = fetch_resource(root_status_uri, true)
raise RuntimeError("FetchAllRepliesWorker - #{@root_status.uri}: Root status could not be fetched") if root_status_body.nil?
FetchReplyWorker.perform_async(root_status_uri, { prefetched_body: root_status_body })
get_replies(root_status_body, MAX_PAGES, options)
end
end

View File

@ -7,6 +7,6 @@ class FetchReplyWorker
sidekiq_options queue: 'pull', retry: 3
def perform(child_url, options = {})
FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys)
FetchRemoteStatusService.new.call(child_url, **options.symbolize_keys)
end
end

View File

@ -125,6 +125,7 @@ RSpec.describe ActivityPub::FetchAllRepliesWorker do
before do
stub_const('Status::FetchRepliesConcern::FETCH_REPLIES_ENABLED', true)
allow(FetchReplyWorker).to receive(:push_bulk)
allow(FetchReplyWorker).to receive(:perform_async)
all_items.each do |item|
next if [top_note_uri, reply_note_uri].include? item
@ -146,6 +147,12 @@ RSpec.describe ActivityPub::FetchAllRepliesWorker do
got_uris = subject.perform(status.id)
expect(got_uris).to match_array(top_items + top_items_paged)
end
it 'fetches the top status only once' do
_ = subject.perform(status.id)
expect(FetchReplyWorker).to have_received(:perform_async).with(top_note_uri, { prefetched_body: top_object.deep_stringify_keys })
expect(a_request(:get, top_note_uri)).to have_been_made.once
end
end
describe 'perform' do