diff --git a/.env.production.sample b/.env.production.sample index 4afaf8d756..0ad4b1497a 100644 --- a/.env.production.sample +++ b/.env.production.sample @@ -110,3 +110,17 @@ FETCH_REPLIES_MAX_SINGLE=500 # Max number of replies Collection pages to fetch - total FETCH_REPLIES_MAX_PAGES=500 + + +# Account Backfill Behavior +# -------------------------- +# When the first person from your instance follows a remote account, +# backfill their most recent n statuses. +# (default: true if unset, set explicitly to ``false`` to disable) +ACCOUNT_BACKFILL_ENABLED=true + +# Max statuses to fetch when backfilling a new account +ACCOUNT_BACKFILL_MAX_STATUSES=1000 + +# Max number of replies Collection pages to fetch +ACCOUNT_BACKFILL_MAX_PAGES=200 diff --git a/app/helpers/json_ld_helper.rb b/app/helpers/json_ld_helper.rb index 65daaa5302..c4ba405d2c 100644 --- a/app/helpers/json_ld_helper.rb +++ b/app/helpers/json_ld_helper.rb @@ -203,6 +203,85 @@ module JsonLdHelper end end + # Iterate through the pages of an activitypub collection, + # returning the collected items and the number of pages that were fetched. + # + # @param collection_or_uri [String, Hash] + # either the URI or an already-fetched AP object + # @param max_pages [Integer, nil] + # Max pages to fetch, if nil, fetch until no more pages + # @param max_items [Integer, nil] + # Max items to fetch, if nil, fetch until no more items + # @param reference_uri [String, nil] + # If not nil, a URI to compare to the collection URI. + # If the host of the collection URI does not match the reference URI, + # do not fetch the collection page. + # @param on_behalf_of [Account, nil] + # Sign the request on behalf of the Account, if not nil + # @return [Array, Integer>, nil] + # The collection items and the number of pages fetched + def collection_items(collection_or_uri, max_pages: 1, max_items: nil, reference_uri: nil, on_behalf_of: nil) + collection = fetch_collection(collection_or_uri, reference_uri: reference_uri, on_behalf_of: on_behalf_of) + return unless collection.is_a?(Hash) + + collection = fetch_collection(collection['first'], reference_uri: reference_uri, on_behalf_of: on_behalf_of) if collection['first'].present? + return unless collection.is_a?(Hash) + + items = [] + n_pages = 1 + while collection.is_a?(Hash) + items.concat(as_array(collection_page_items(collection))) + + break if !max_items.nil? && items.size >= max_items + break if !max_pages.nil? && n_pages >= max_pages + + collection = collection['next'].present? ? fetch_collection(collection['next'], reference_uri: reference_uri, on_behalf_of: on_behalf_of) : nil + n_pages += 1 + end + + [items, n_pages] + end + + def collection_page_items(collection) + case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'] + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end + end + + # Fetch a single collection page + # To get the whole collection, use collection_items + # + # @param collection_or_uri [String, Hash] + # @param reference_uri [String, nil] + # If not nil, a URI to compare to the collection URI. + # If the host of the collection URI does not match the reference URI, + # do not fetch the collection page. + # @param on_behalf_of [Account, nil] + # Sign the request on behalf of the Account, if not nil + # @return [Hash, nil] + def fetch_collection(collection_or_uri, reference_uri: nil, on_behalf_of: nil) + return collection_or_uri if collection_or_uri.is_a?(Hash) + return if !reference_uri.nil? && non_matching_uri_hosts?(reference_uri, collection_or_uri) + + # NOTE: For backward compatibility reasons, Mastodon signs outgoing + # queries incorrectly by default. + # + # While this is relevant for all URLs with query strings, this is + # the only code path where this happens in practice. + # + # Therefore, retry with correct signatures if this fails. + begin + fetch_resource_without_id_validation(collection_or_uri, on_behalf_of, raise_on_error: :temporary) + rescue Mastodon::UnexpectedResponseError => e + raise unless e.response && e.response.code == 401 && Addressable::URI.parse(collection_or_uri).query.present? + + fetch_resource_without_id_validation(collection_or_uri, on_behalf_of, raise_on_error: :temporary, request_options: { omit_query_string: false }) + end + end + def valid_activitypub_content_type?(response) return true if response.mime_type == 'application/activity+json' diff --git a/app/models/follow_request.rb b/app/models/follow_request.rb index 964d4e279a..41403ded1c 100644 --- a/app/models/follow_request.rb +++ b/app/models/follow_request.rb @@ -32,6 +32,7 @@ class FollowRequest < ApplicationRecord validates :languages, language: true def authorize! + is_first_follow = first_follow? follow = account.follow!(target_account, reblogs: show_reblogs, notify: notify, languages: languages, uri: uri, bypass_limit: true) if account.local? @@ -40,6 +41,7 @@ class FollowRequest < ApplicationRecord MergeWorker.push_bulk(List.where(account: account).joins(:list_accounts).where(list_accounts: { account_id: target_account.id }).pluck(:id)) do |list_id| [target_account.id, list_id, 'list'] end + ActivityPub::AccountBackfillWorker.perform_async(target_account.id) if is_first_follow & ActivityPub::AccountBackfillService::ENABLED end destroy! @@ -51,6 +53,10 @@ class FollowRequest < ApplicationRecord false # Force uri_for to use uri attribute end + def first_follow? + !target_account.followers.local.exists? + end + before_validation :set_uri, only: :create after_commit :invalidate_follow_recommendations_cache diff --git a/app/services/activitypub/account_backfill_service.rb b/app/services/activitypub/account_backfill_service.rb new file mode 100644 index 0000000000..f17c86c33e --- /dev/null +++ b/app/services/activitypub/account_backfill_service.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +class ActivityPub::AccountBackfillService < BaseService + include JsonLdHelper + + ENABLED = ENV['ACCOUNT_BACKFILL_ENABLED'].nil? || ENV['ACCOUNT_BACKFILL_ENABLED'] == 'true' + MAX_STATUSES = (ENV['ACCOUNT_BACKFILL_MAX_STATUSES'] || 1000).to_i + MAX_PAGES = (ENV['ACCOUNT_BACKFILL_MAX_PAGES'] || 200).to_i + + def call(account, on_behalf_of: nil, request_id: nil) + return unless ENABLED + + @account = account + return if @account.nil? || @account.outbox_url.nil? + + @items, = collection_items(@account.outbox_url, max_items: MAX_STATUSES, max_pages: MAX_PAGES, on_behalf_of: on_behalf_of) + @items = filter_items(@items) + return if @items.nil? + + on_behalf_of_id = on_behalf_of&.id + + FetchReplyWorker.push_bulk(@items) do |status_uri_or_body| + if status_uri_or_body.is_a?(Hash) && status_uri_or_body.key?('object') && status_uri_or_body.key?('id') + # Re-add the minimally-acceptable @context, which gets stripped because this object comes inside a collection + status_uri_or_body['@context'] = ActivityPub::TagManager::CONTEXT unless status_uri_or_body.key?('@context') + [status_uri_or_body['id'], { prefetched_body: status_uri_or_body, request_id: request_id, on_behalf_of: on_behalf_of_id }] + else + [status_uri_or_body, { request_id: request_id, on_behalf_of: on_behalf_of_id }] + end + end + + @items + end + + private + + # Reject any non-public statuses. + # Since our request may have been signed on behalf of the follower, + # we may have received followers-only statuses. + # + # Formally, a followers-only status is addressed to the account's followers collection. + # We were not in that collection at the time that the post was made, + # so followers-only statuses fetched by backfilling are not addressed to us. + # Public and unlisted statuses are send to the activitystreams "Public" entity. + # We are part of the public, so those posts *are* addressed to us. + # + # @param items [Array] + # @return [Array] + def filter_items(items) + allowed = [:public, :unlisted] + items.filter { |item| item.is_a?(String) || allowed.include?(ActivityPub::Parser::StatusParser.new(item).visibility) } + end +end diff --git a/app/services/activitypub/fetch_featured_collection_service.rb b/app/services/activitypub/fetch_featured_collection_service.rb index 25c62f3be6..9d0b1a3717 100644 --- a/app/services/activitypub/fetch_featured_collection_service.rb +++ b/app/services/activitypub/fetch_featured_collection_service.rb @@ -12,30 +12,12 @@ class ActivityPub::FetchFeaturedCollectionService < BaseService return unless supported_context?(@json) - process_items(collection_items(@json)) + @items, = collection_items(@json, max_pages: 1, reference_uri: @account.uri, on_behalf_of: local_follower) + process_items(@items) end private - def collection_items(collection) - collection = fetch_collection(collection['first']) if collection['first'].present? - return unless collection.is_a?(Hash) - - case collection['type'] - when 'Collection', 'CollectionPage' - as_array(collection['items']) - when 'OrderedCollection', 'OrderedCollectionPage' - as_array(collection['orderedItems']) - end - end - - def fetch_collection(collection_or_uri) - return collection_or_uri if collection_or_uri.is_a?(Hash) - return if non_matching_uri_hosts?(@account.uri, collection_or_uri) - - fetch_resource_without_id_validation(collection_or_uri, local_follower, raise_on_error: :temporary) - end - def process_items(items) return if items.nil? diff --git a/app/services/activitypub/fetch_featured_tags_collection_service.rb b/app/services/activitypub/fetch_featured_tags_collection_service.rb index ec2422a075..696b4894d4 100644 --- a/app/services/activitypub/fetch_featured_tags_collection_service.rb +++ b/app/services/activitypub/fetch_featured_tags_collection_service.rb @@ -11,43 +11,12 @@ class ActivityPub::FetchFeaturedTagsCollectionService < BaseService return unless supported_context?(@json) - process_items(collection_items(@json)) + @items, = collection_items(@json, max_items: FeaturedTag::LIMIT, reference_uri: @account.uri, on_behalf_of: local_follower) + process_items(@items) end private - def collection_items(collection) - all_items = [] - - collection = fetch_collection(collection['first']) if collection['first'].present? - - while collection.is_a?(Hash) - items = case collection['type'] - when 'Collection', 'CollectionPage' - collection['items'] - when 'OrderedCollection', 'OrderedCollectionPage' - collection['orderedItems'] - end - - break if items.blank? - - all_items.concat(items) - - break if all_items.size >= FeaturedTag::LIMIT - - collection = collection['next'].present? ? fetch_collection(collection['next']) : nil - end - - all_items - end - - def fetch_collection(collection_or_uri) - return collection_or_uri if collection_or_uri.is_a?(Hash) - return if non_matching_uri_hosts?(@account.uri, collection_or_uri) - - fetch_resource_without_id_validation(collection_or_uri, local_follower, raise_on_error: :temporary) - end - def process_items(items) names = items.filter_map { |item| item['type'] == 'Hashtag' && item['name']&.delete_prefix('#') }.take(FeaturedTag::LIMIT) tags = names.index_by { |name| HashtagNormalizer.new.normalize(name) } diff --git a/app/services/activitypub/fetch_remote_status_service.rb b/app/services/activitypub/fetch_remote_status_service.rb index 7173746f2d..0aac679191 100644 --- a/app/services/activitypub/fetch_remote_status_service.rb +++ b/app/services/activitypub/fetch_remote_status_service.rb @@ -11,6 +11,9 @@ class ActivityPub::FetchRemoteStatusService < BaseService def call(uri, prefetched_body: nil, on_behalf_of: nil, expected_actor_uri: nil, request_id: nil) return if domain_not_allowed?(uri) + # load the account if given as an ID + on_behalf_of = Account.find(on_behalf_of) unless on_behalf_of.nil? || on_behalf_of.is_a?(Account) + @request_id = request_id || "#{Time.now.utc.to_i}-status-#{uri}" @json = if prefetched_body.nil? fetch_status(uri, true, on_behalf_of) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index f2e4f45104..d89ad9cd5e 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -8,9 +8,13 @@ class ActivityPub::FetchRepliesService < BaseService def call(reference_uri, collection_or_uri, max_pages: 1, allow_synchronous_requests: true, request_id: nil) @reference_uri = reference_uri - @allow_synchronous_requests = allow_synchronous_requests + return if !allow_synchronous_requests && !collection_or_uri.is_a?(Hash) - @items, n_pages = collection_items(collection_or_uri, max_pages: max_pages) + # if given a prefetched collection while forbidding synchronous requests, + # process it and return without fetching additional pages + max_pages = 1 if !allow_synchronous_requests && collection_or_uri.is_a?(Hash) + + @items, n_pages = collection_items(collection_or_uri, max_pages: max_pages, max_items: MAX_REPLIES, reference_uri: @reference_uri) return if @items.nil? @items = filter_replies(@items) @@ -21,58 +25,6 @@ class ActivityPub::FetchRepliesService < BaseService private - def collection_items(collection_or_uri, max_pages: 1) - collection = fetch_collection(collection_or_uri) - return unless collection.is_a?(Hash) - - collection = fetch_collection(collection['first']) if collection['first'].present? - return unless collection.is_a?(Hash) - - items = [] - n_pages = 1 - while collection.is_a?(Hash) - items.concat(as_array(collection_page_items(collection))) - - break if items.size >= MAX_REPLIES - break if n_pages >= max_pages - - collection = collection['next'].present? ? fetch_collection(collection['next']) : nil - n_pages += 1 - end - - [items, n_pages] - end - - def collection_page_items(collection) - case collection['type'] - when 'Collection', 'CollectionPage' - collection['items'] - when 'OrderedCollection', 'OrderedCollectionPage' - collection['orderedItems'] - end - end - - def fetch_collection(collection_or_uri) - return collection_or_uri if collection_or_uri.is_a?(Hash) - return unless @allow_synchronous_requests - return if non_matching_uri_hosts?(@reference_uri, collection_or_uri) - - # NOTE: For backward compatibility reasons, Mastodon signs outgoing - # queries incorrectly by default. - # - # While this is relevant for all URLs with query strings, this is - # the only code path where this happens in practice. - # - # Therefore, retry with correct signatures if this fails. - begin - fetch_resource_without_id_validation(collection_or_uri, nil, raise_on_error: :temporary) - rescue Mastodon::UnexpectedResponseError => e - raise unless e.response && e.response.code == 401 && Addressable::URI.parse(collection_or_uri).query.present? - - fetch_resource_without_id_validation(collection_or_uri, nil, raise_on_error: :temporary, request_options: { omit_query_string: false }) - end - end - def filter_replies(items) # Only fetch replies to the same server as the original status to avoid # amplification attacks. diff --git a/app/services/activitypub/synchronize_followers_service.rb b/app/services/activitypub/synchronize_followers_service.rb index fd6fd1b899..e9ac13ef64 100644 --- a/app/services/activitypub/synchronize_followers_service.rb +++ b/app/services/activitypub/synchronize_followers_service.rb @@ -63,10 +63,10 @@ class ActivityPub::SynchronizeFollowersService < BaseService # Only returns true if the whole collection has been processed def process_collection!(collection_uri, max_pages: MAX_COLLECTION_PAGES) - collection = fetch_collection(collection_uri) + collection = fetch_collection(collection_uri, reference_uri: @account.uri) return false unless collection.is_a?(Hash) - collection = fetch_collection(collection['first']) if collection['first'].present? + collection = fetch_collection(collection['first'], reference_uri: @account.uri) if collection['first'].present? while collection.is_a?(Hash) process_page!(as_array(collection_page_items(collection))) @@ -81,20 +81,4 @@ class ActivityPub::SynchronizeFollowersService < BaseService false end - - def collection_page_items(collection) - case collection['type'] - when 'Collection', 'CollectionPage' - collection['items'] - when 'OrderedCollection', 'OrderedCollectionPage' - collection['orderedItems'] - end - end - - def fetch_collection(collection_or_uri) - return collection_or_uri if collection_or_uri.is_a?(Hash) - return if non_matching_uri_hosts?(@account.uri, collection_or_uri) - - fetch_resource_without_id_validation(collection_or_uri, nil, raise_on_error: :temporary) - end end diff --git a/app/workers/activitypub/account_backfill_worker.rb b/app/workers/activitypub/account_backfill_worker.rb new file mode 100644 index 0000000000..a4271690e4 --- /dev/null +++ b/app/workers/activitypub/account_backfill_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class ActivityPub::AccountBackfillWorker + include Sidekiq::Worker + include ExponentialBackoff + + def perform(account_id, options = {}) + account = Account.find(account_id) + return if account.local? + + ActivityPub::AccountBackfillService.new.call(account, **options.deep_symbolize_keys) + end +end diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index 68a7414beb..ecb232bbbb 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -7,6 +7,6 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 def perform(child_url, options = {}) - FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) + FetchRemoteStatusService.new.call(child_url, **options.symbolize_keys) end end diff --git a/spec/models/follow_request_spec.rb b/spec/models/follow_request_spec.rb index 237875deab..c31b19cf52 100644 --- a/spec/models/follow_request_spec.rb +++ b/spec/models/follow_request_spec.rb @@ -20,17 +20,19 @@ RSpec.describe FollowRequest do end end - it 'calls Account#follow!, MergeWorker.perform_async, and #destroy!' do + it 'calls Account#follow!, MergeWorker.perform_async, ActivityPub::AccountBackfillWorker, and #destroy!' do allow(account).to receive(:follow!) do account.active_relationships.create!(target_account: target_account) end allow(MergeWorker).to receive(:perform_async) + allow(ActivityPub::AccountBackfillWorker).to receive(:perform_async) allow(follow_request).to receive(:destroy!) follow_request.authorize! expect(account).to have_received(:follow!).with(target_account, reblogs: true, notify: false, uri: follow_request.uri, languages: nil, bypass_limit: true) expect(MergeWorker).to have_received(:perform_async).with(target_account.id, account.id, 'home') + expect(ActivityPub::AccountBackfillWorker).to have_received(:perform_async).with(target_account.id) expect(follow_request).to have_received(:destroy!) end @@ -47,6 +49,21 @@ RSpec.describe FollowRequest do target = follow_request.target_account expect(follow_request.account.muting_reblogs?(target)).to be true end + + context 'when subsequent follow requests are made' do + before do + second_account = Fabricate(:account) + second_account.follow!(target_account) + end + + it 'doesnt call ActivityPub::AccountBackfillWorker' do + allow(ActivityPub::AccountBackfillWorker).to receive(:perform_async) + + follow_request.authorize! + + expect(ActivityPub::AccountBackfillWorker).to_not have_received(:perform_async) + end + end end describe '#reject!' do diff --git a/spec/services/activitypub/account_backfill_service_spec.rb b/spec/services/activitypub/account_backfill_service_spec.rb new file mode 100644 index 0000000000..ec98b960d3 --- /dev/null +++ b/spec/services/activitypub/account_backfill_service_spec.rb @@ -0,0 +1,112 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe ActivityPub::AccountBackfillService do + subject { described_class.new } + + before do + stub_const('ActivityPub::AccountBackfillService::ENABLED', true) + end + + let!(:account) { Fabricate(:account, domain: 'other.com', outbox_url: 'http://other.com/alice/outbox') } + + let!(:outbox) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + + id: 'http://other.com/alice/outbox', + type: 'OrderedCollection', + first: 'http://other.com/alice/outbox?page=true', + }.with_indifferent_access + end + + let!(:items) do + [ + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'https://other.com/alice/1234', + to: ['https://www.w3.org/ns/activitystreams#Public'], + cc: ['https://other.com/alice/followers'], + type: 'Note', + content: 'Lorem ipsum', + attributedTo: 'http://other.com/alice', + }, + 'https://other.com/alice/5678', + ] + end + + let!(:outbox_page) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'http://example.com/alice/outbox?page=true', + type: 'OrderedCollectionPage', + orderedItems: items, + } + end + + describe '#call' do + before do + stub_request(:get, 'http://other.com/alice/outbox').to_return(status: 200, body: Oj.dump(outbox), headers: { 'Content-Type': 'application/activity+json' }) + stub_request(:get, 'http://other.com/alice/outbox?page=true').to_return(status: 200, body: Oj.dump(outbox_page), headers: { 'Content-Type': 'application/activity+json' }) + end + + it 'fetches the items in the outbox' do + allow(FetchReplyWorker).to receive(:push_bulk) + got_items = subject.call(account) + expect(got_items[0].deep_symbolize_keys).to eq(items[0]) + expect(got_items[1]).to eq(items[1]) + expect(FetchReplyWorker).to have_received(:push_bulk).with([items[0].stringify_keys, items[1]]) + end + + context 'with followers-only and private statuses' do + let!(:items) do + [ + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'https://other.com/alice/public', + type: 'Note', + to: ['https://www.w3.org/ns/activitystreams#Public'], + cc: ['https://other.com/alice/followers'], + content: 'Lorem ipsum', + attributedTo: 'http://other.com/alice', + }, + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'https://other.com/alice/unlisted', + to: ['https://other.com/alice/followers'], + cc: ['https://www.w3.org/ns/activitystreams#Public'], + type: 'Note', + content: 'Lorem ipsum', + attributedTo: 'http://other.com/alice', + }, + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'https://other.com/alice/followers-only', + to: ['https://other.com/alice/followers'], + type: 'Note', + content: 'Lorem ipsum', + attributedTo: 'http://other.com/alice', + }, + { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'https://other.com/alice/dm', + to: ['https://other.com/alice/followers'], + type: 'Note', + content: 'Lorem ipsum', + attributedTo: 'http://other.com/alice', + }, + ] + end + + it 'only processes public and unlisted statuses' do + allow(FetchReplyWorker).to receive(:push_bulk) + got_items = subject.call(account) + expect(got_items.length).to eq(2) + expect(got_items[0].deep_symbolize_keys).to eq(items[0]) + expect(got_items[1].deep_symbolize_keys).to eq(items[1]) + expect(FetchReplyWorker).to have_received(:push_bulk).with([items[0].stringify_keys, items[1].stringify_keys]) + end + end + end +end