Add delivery failure handling to FASP jobs (#35723)

This commit is contained in:
David Roetzel 2025-08-08 11:46:09 +02:00 committed by GitHub
parent 1fd147bf2b
commit 868c46bc76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 373 additions and 106 deletions

View File

@ -9,6 +9,7 @@
# capabilities :jsonb not null
# confirmed :boolean default(FALSE), not null
# contact_email :string
# delivery_last_failed_at :datetime
# fediverse_account :string
# name :string not null
# privacy_policy :jsonb
@ -22,6 +23,8 @@
class Fasp::Provider < ApplicationRecord
include DebugConcern
RETRY_INTERVAL = 1.hour
has_many :fasp_backfill_requests, inverse_of: :fasp_provider, class_name: 'Fasp::BackfillRequest', dependent: :delete_all
has_many :fasp_debug_callbacks, inverse_of: :fasp_provider, class_name: 'Fasp::DebugCallback', dependent: :delete_all
has_many :fasp_subscriptions, inverse_of: :fasp_provider, class_name: 'Fasp::Subscription', dependent: :delete_all
@ -122,6 +125,16 @@ class Fasp::Provider < ApplicationRecord
@delivery_failure_tracker ||= DeliveryFailureTracker.new(base_url, resolution: :minutes)
end
def available?
delivery_failure_tracker.available? || retry_worthwile?
end
def update_availability!
self.delivery_last_failed_at = (Time.current unless delivery_failure_tracker.available?)
save!
end
private
def create_keypair
@ -148,4 +161,8 @@ class Fasp::Provider < ApplicationRecord
Fasp::Request.new(self).delete(path)
end
end
def retry_worthwile?
delivery_last_failed_at && delivery_last_failed_at < RETRY_INTERVAL.ago
end
end

View File

@ -1,9 +1,7 @@
# frozen_string_literal: true
class Fasp::AccountSearchWorker
include Sidekiq::Worker
sidekiq_options queue: 'fasp', retry: 0
class Fasp::AccountSearchWorker < Fasp::BaseWorker
sidekiq_options retry: 0
def perform(query)
return unless Mastodon::Feature.fasp_enabled?
@ -17,11 +15,13 @@ class Fasp::AccountSearchWorker
fetch_service = ActivityPub::FetchRemoteActorService.new
account_search_providers.each do |provider|
Fasp::Request.new(provider).get("/account_search/v0/search?#{params}").each do |uri|
next if Account.where(uri:).any?
with_provider(provider) do
Fasp::Request.new(provider).get("/account_search/v0/search?#{params}").each do |uri|
next if Account.where(uri:).any?
account = fetch_service.call(uri)
async_refresh.increment_result_count(by: 1) if account.present?
account = fetch_service.call(uri)
async_refresh.increment_result_count(by: 1) if account.present?
end
end
end
ensure

View File

@ -1,13 +1,13 @@
# frozen_string_literal: true
class Fasp::AnnounceAccountLifecycleEventWorker
include Sidekiq::Worker
sidekiq_options queue: 'fasp', retry: 5
class Fasp::AnnounceAccountLifecycleEventWorker < Fasp::BaseWorker
sidekiq_options retry: 5
def perform(uri, event_type)
Fasp::Subscription.includes(:fasp_provider).category_account.lifecycle.each do |subscription|
announce(subscription, uri, event_type)
with_provider(subscription.fasp_provider) do
announce(subscription, uri, event_type)
end
end
end

View File

@ -1,13 +1,13 @@
# frozen_string_literal: true
class Fasp::AnnounceContentLifecycleEventWorker
include Sidekiq::Worker
sidekiq_options queue: 'fasp', retry: 5
class Fasp::AnnounceContentLifecycleEventWorker < Fasp::BaseWorker
sidekiq_options retry: 5
def perform(uri, event_type)
Fasp::Subscription.includes(:fasp_provider).category_content.lifecycle.each do |subscription|
announce(subscription, uri, event_type)
with_provider(subscription.fasp_provider) do
announce(subscription, uri, event_type)
end
end
end

View File

@ -1,16 +1,16 @@
# frozen_string_literal: true
class Fasp::AnnounceTrendWorker
include Sidekiq::Worker
sidekiq_options queue: 'fasp', retry: 5
class Fasp::AnnounceTrendWorker < Fasp::BaseWorker
sidekiq_options retry: 5
def perform(status_id, trend_source)
status = ::Status.includes(:account).find(status_id)
return unless status.account.indexable?
Fasp::Subscription.includes(:fasp_provider).category_content.trends.each do |subscription|
announce(subscription, status.uri) if trending?(subscription, status, trend_source)
with_provider(subscription.fasp_provider) do
announce(subscription, status.uri) if trending?(subscription, status, trend_source)
end
end
rescue ActiveRecord::RecordNotFound
# status might not exist anymore, in which case there is nothing to do

View File

@ -1,16 +1,16 @@
# frozen_string_literal: true
class Fasp::BackfillWorker
include Sidekiq::Worker
sidekiq_options queue: 'fasp', retry: 5
class Fasp::BackfillWorker < Fasp::BaseWorker
sidekiq_options retry: 5
def perform(backfill_request_id)
backfill_request = Fasp::BackfillRequest.find(backfill_request_id)
announce(backfill_request)
with_provider(backfill_request.fasp_provider) do
announce(backfill_request)
backfill_request.advance!
backfill_request.advance!
end
rescue ActiveRecord::RecordNotFound
# ignore missing backfill requests
end

View File

@ -0,0 +1,19 @@
# frozen_string_literal: true
class Fasp::BaseWorker
include Sidekiq::Worker
sidekiq_options queue: 'fasp'
private
def with_provider(provider)
return unless provider.available?
yield
rescue *Mastodon::HTTP_CONNECTION_ERRORS
raise if provider.available?
ensure
provider.update_availability!
end
end

View File

@ -1,9 +1,7 @@
# frozen_string_literal: true
class Fasp::FollowRecommendationWorker
include Sidekiq::Worker
sidekiq_options queue: 'fasp', retry: 0
class Fasp::FollowRecommendationWorker < Fasp::BaseWorker
sidekiq_options retry: 0
def perform(account_id)
return unless Mastodon::Feature.fasp_enabled?
@ -20,14 +18,16 @@ class Fasp::FollowRecommendationWorker
fetch_service = ActivityPub::FetchRemoteActorService.new
follow_recommendation_providers.each do |provider|
Fasp::Request.new(provider).get("/follow_recommendation/v0/accounts?#{params}").each do |uri|
next if Account.where(uri:).any?
with_provider(provider) do
Fasp::Request.new(provider).get("/follow_recommendation/v0/accounts?#{params}").each do |uri|
next if Account.where(uri:).any?
new_account = fetch_service.call(uri)
new_account = fetch_service.call(uri)
if new_account.present?
Fasp::FollowRecommendation.find_or_create_by(requesting_account: account, recommended_account: new_account)
async_refresh.increment_result_count(by: 1)
if new_account.present?
Fasp::FollowRecommendation.find_or_create_by(requesting_account: account, recommended_account: new_account)
async_refresh.increment_result_count(by: 1)
end
end
end
end

View File

@ -0,0 +1,7 @@
# frozen_string_literal: true
class AddDeliveryLastFailedAtToFaspProviders < ActiveRecord::Migration[8.0]
def change
add_column :fasp_providers, :delivery_last_failed_at, :datetime
end
end

View File

@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[8.0].define(version: 2025_07_17_003848) do
ActiveRecord::Schema[8.0].define(version: 2025_08_05_075010) do
# These are extensions that must be enabled in order to support this database
enable_extension "pg_catalog.plpgsql"
@ -488,6 +488,7 @@ ActiveRecord::Schema[8.0].define(version: 2025_07_17_003848) do
t.string "fediverse_account"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.datetime "delivery_last_failed_at"
t.index ["base_url"], name: "index_fasp_providers_on_base_url", unique: true
end
@ -1483,53 +1484,6 @@ ActiveRecord::Schema[8.0].define(version: 2025_07_17_003848) do
add_foreign_key "web_settings", "users", name: "fk_11910667b2", on_delete: :cascade
add_foreign_key "webauthn_credentials", "users", on_delete: :cascade
create_view "instances", materialized: true, sql_definition: <<-SQL
WITH domain_counts(domain, accounts_count) AS (
SELECT accounts.domain,
count(*) AS accounts_count
FROM accounts
WHERE (accounts.domain IS NOT NULL)
GROUP BY accounts.domain
)
SELECT domain_counts.domain,
domain_counts.accounts_count
FROM domain_counts
UNION
SELECT domain_blocks.domain,
COALESCE(domain_counts.accounts_count, (0)::bigint) AS accounts_count
FROM (domain_blocks
LEFT JOIN domain_counts ON (((domain_counts.domain)::text = (domain_blocks.domain)::text)))
UNION
SELECT domain_allows.domain,
COALESCE(domain_counts.accounts_count, (0)::bigint) AS accounts_count
FROM (domain_allows
LEFT JOIN domain_counts ON (((domain_counts.domain)::text = (domain_allows.domain)::text)));
SQL
add_index "instances", "reverse(('.'::text || (domain)::text)), domain", name: "index_instances_on_reverse_domain"
add_index "instances", ["domain"], name: "index_instances_on_domain", unique: true
create_view "user_ips", sql_definition: <<-SQL
SELECT user_id,
ip,
max(used_at) AS used_at
FROM ( SELECT users.id AS user_id,
users.sign_up_ip AS ip,
users.created_at AS used_at
FROM users
WHERE (users.sign_up_ip IS NOT NULL)
UNION ALL
SELECT session_activations.user_id,
session_activations.ip,
session_activations.updated_at
FROM session_activations
UNION ALL
SELECT login_activities.user_id,
login_activities.ip,
login_activities.created_at
FROM login_activities
WHERE (login_activities.success = true)) t0
GROUP BY user_id, ip;
SQL
create_view "account_summaries", materialized: true, sql_definition: <<-SQL
SELECT accounts.id AS account_id,
mode() WITHIN GROUP (ORDER BY t0.language) AS language,
@ -1580,4 +1534,51 @@ ActiveRecord::Schema[8.0].define(version: 2025_07_17_003848) do
SQL
add_index "global_follow_recommendations", ["account_id"], name: "index_global_follow_recommendations_on_account_id", unique: true
create_view "instances", materialized: true, sql_definition: <<-SQL
WITH domain_counts(domain, accounts_count) AS (
SELECT accounts.domain,
count(*) AS accounts_count
FROM accounts
WHERE (accounts.domain IS NOT NULL)
GROUP BY accounts.domain
)
SELECT domain_counts.domain,
domain_counts.accounts_count
FROM domain_counts
UNION
SELECT domain_blocks.domain,
COALESCE(domain_counts.accounts_count, (0)::bigint) AS accounts_count
FROM (domain_blocks
LEFT JOIN domain_counts ON (((domain_counts.domain)::text = (domain_blocks.domain)::text)))
UNION
SELECT domain_allows.domain,
COALESCE(domain_counts.accounts_count, (0)::bigint) AS accounts_count
FROM (domain_allows
LEFT JOIN domain_counts ON (((domain_counts.domain)::text = (domain_allows.domain)::text)));
SQL
add_index "instances", "reverse(('.'::text || (domain)::text)), domain", name: "index_instances_on_reverse_domain"
add_index "instances", ["domain"], name: "index_instances_on_domain", unique: true
create_view "user_ips", sql_definition: <<-SQL
SELECT user_id,
ip,
max(used_at) AS used_at
FROM ( SELECT users.id AS user_id,
users.sign_up_ip AS ip,
users.created_at AS used_at
FROM users
WHERE (users.sign_up_ip IS NOT NULL)
UNION ALL
SELECT session_activations.user_id,
session_activations.ip,
session_activations.updated_at
FROM session_activations
UNION ALL
SELECT login_activities.user_id,
login_activities.ip,
login_activities.created_at
FROM login_activities
WHERE (login_activities.success = true)) t0
GROUP BY user_id, ip;
SQL
end

View File

@ -214,4 +214,102 @@ RSpec.describe Fasp::Provider do
expect(subject.delivery_failure_tracker).to be_a(DeliveryFailureTracker)
end
end
describe '#available?' do
subject { Fabricate(:fasp_provider, delivery_last_failed_at:) }
let(:delivery_last_failed_at) { nil }
before do
allow(subject.delivery_failure_tracker).to receive(:available?).and_return(available)
end
context 'when the delivery failure tracker reports it is available' do
let(:available) { true }
it 'returns true' do
expect(subject.available?).to be true
end
end
context 'when the delivery failure tracker reports it is unavailable' do
let(:available) { false }
context 'when the last failure was more than one hour ago' do
let(:delivery_last_failed_at) { 61.minutes.ago }
it 'returns true' do
expect(subject.available?).to be true
end
end
context 'when the last failure is very recent' do
let(:delivery_last_failed_at) { 5.minutes.ago }
it 'returns false' do
expect(subject.available?).to be false
end
end
end
end
describe '#update_availability!' do
subject { Fabricate(:fasp_provider, delivery_last_failed_at:) }
before do
allow(subject.delivery_failure_tracker).to receive(:available?).and_return(available)
end
context 'when `delivery_last_failed_at` is `nil`' do
let(:delivery_last_failed_at) { nil }
context 'when the delivery failure tracker reports it is available' do
let(:available) { true }
it 'does not update the provider' do
subject.update_availability!
expect(subject.saved_changes?).to be false
end
end
context 'when the delivery failure tracker reports it is unavailable' do
let(:available) { false }
it 'sets `delivery_last_failed_at` to the current time' do
freeze_time
subject.update_availability!
expect(subject.delivery_last_failed_at).to eq Time.zone.now
end
end
end
context 'when `delivery_last_failed_at` is present' do
context 'when the delivery failure tracker reports it is available' do
let(:available) { true }
let(:delivery_last_failed_at) { 5.minutes.ago }
it 'sets `delivery_last_failed_at` to `nil`' do
subject.update_availability!
expect(subject.delivery_last_failed_at).to be_nil
end
end
context 'when the delivery failure tracker reports it is unavailable' do
let(:available) { false }
let(:delivery_last_failed_at) { 5.minutes.ago }
it 'updates `delivery_last_failed_at` to the current time' do
freeze_time
subject.update_availability!
expect(subject.delivery_last_failed_at).to eq Time.zone.now
end
end
end
end
end

View File

@ -0,0 +1,57 @@
# frozen_string_literal: true
RSpec.shared_examples 'worker handling fasp delivery failures' do
context 'when provider is not available' do
before do
provider.update(delivery_last_failed_at: 1.minute.ago)
domain = Addressable::URI.parse(provider.base_url).normalized_host
UnavailableDomain.create!(domain:)
end
it 'does not attempt connecting and does not fail the job' do
expect { subject }.to_not raise_error
expect(stubbed_request).to_not have_been_made
end
end
context 'when connection to provider fails' do
before do
base_stubbed_request
.to_raise(HTTP::ConnectionError)
end
context 'when provider becomes unavailable' do
before do
travel_to 5.minutes.ago
4.times do
provider.delivery_failure_tracker.track_failure!
travel_to 1.minute.since
end
end
it 'updates the provider and does not fail the job, so it will not be retried' do
expect { subject }.to_not raise_error
expect(provider.reload.delivery_last_failed_at).to eq Time.current
end
end
context 'when provider is still marked as available' do
it 'fails the job so it can be retried' do
expect { subject }.to raise_error(HTTP::ConnectionError)
end
end
end
context 'when connection to a previously unavailable provider succeeds' do
before do
provider.update(delivery_last_failed_at: 2.hours.ago)
domain = Addressable::URI.parse(provider.base_url).normalized_host
UnavailableDomain.create!(domain:)
end
it 'marks the provider as being available again' do
expect { subject }.to_not raise_error
expect(provider).to be_available
end
end
end

View File

@ -5,12 +5,14 @@ require 'rails_helper'
RSpec.describe Fasp::AccountSearchWorker, feature: :fasp do
include ProviderRequestHelper
subject { described_class.new.perform('cats') }
let(:provider) { Fabricate(:account_search_fasp) }
let(:account) { Fabricate(:account) }
let(:fetch_service) { instance_double(ActivityPub::FetchRemoteActorService, call: true) }
let(:path) { '/account_search/v0/search?term=cats&limit=10' }
let!(:stubbed_request) do
path = '/account_search/v0/search?term=cats&limit=10'
stub_provider_request(provider,
method: :get,
path:,
@ -25,7 +27,7 @@ RSpec.describe Fasp::AccountSearchWorker, feature: :fasp do
end
it 'requests search results and fetches received account uris' do
described_class.new.perform('cats')
subject
expect(stubbed_request).to have_been_made
expect(fetch_service).to have_received(:call).with('https://fedi.example.com/accounts/2')
@ -35,7 +37,7 @@ RSpec.describe Fasp::AccountSearchWorker, feature: :fasp do
it 'marks a running async refresh as finished' do
async_refresh = AsyncRefresh.create("fasp:account_search:#{Digest::MD5.base64digest('cats')}", count_results: true)
described_class.new.perform('cats')
subject
expect(async_refresh.reload).to be_finished
end
@ -43,8 +45,16 @@ RSpec.describe Fasp::AccountSearchWorker, feature: :fasp do
it 'tracks the number of fetched accounts in the async refresh' do
async_refresh = AsyncRefresh.create("fasp:account_search:#{Digest::MD5.base64digest('cats')}", count_results: true)
described_class.new.perform('cats')
subject
expect(async_refresh.reload.result_count).to eq 2
end
describe 'provider delivery failure handling' do
let(:base_stubbed_request) do
stub_request(:get, provider.url(path))
end
it_behaves_like('worker handling fasp delivery failures')
end
end

View File

@ -5,15 +5,19 @@ require 'rails_helper'
RSpec.describe Fasp::AnnounceAccountLifecycleEventWorker do
include ProviderRequestHelper
subject { described_class.new.perform(account_uri, 'new') }
let(:account_uri) { 'https://masto.example.com/accounts/1' }
let(:subscription) do
Fabricate(:fasp_subscription, category: 'account')
end
let(:provider) { subscription.fasp_provider }
let(:path) { '/data_sharing/v0/announcements' }
let!(:stubbed_request) do
stub_provider_request(provider,
method: :post,
path: '/data_sharing/v0/announcements',
path:,
response_body: {
source: {
subscription: {
@ -27,8 +31,16 @@ RSpec.describe Fasp::AnnounceAccountLifecycleEventWorker do
end
it 'sends the account uri to subscribed providers' do
described_class.new.perform(account_uri, 'new')
subject
expect(stubbed_request).to have_been_made
end
describe 'provider delivery failure handling' do
let(:base_stubbed_request) do
stub_request(:post, provider.url(path))
end
it_behaves_like('worker handling fasp delivery failures')
end
end

View File

@ -5,15 +5,19 @@ require 'rails_helper'
RSpec.describe Fasp::AnnounceContentLifecycleEventWorker do
include ProviderRequestHelper
subject { described_class.new.perform(status_uri, 'new') }
let(:status_uri) { 'https://masto.example.com/status/1' }
let(:subscription) do
Fabricate(:fasp_subscription)
end
let(:provider) { subscription.fasp_provider }
let(:path) { '/data_sharing/v0/announcements' }
let!(:stubbed_request) do
stub_provider_request(provider,
method: :post,
path: '/data_sharing/v0/announcements',
path:,
response_body: {
source: {
subscription: {
@ -27,8 +31,16 @@ RSpec.describe Fasp::AnnounceContentLifecycleEventWorker do
end
it 'sends the status uri to subscribed providers' do
described_class.new.perform(status_uri, 'new')
subject
expect(stubbed_request).to have_been_made
end
describe 'provider delivery failure handling' do
let(:base_stubbed_request) do
stub_request(:post, provider.url(path))
end
it_behaves_like('worker handling fasp delivery failures')
end
end

View File

@ -5,6 +5,8 @@ require 'rails_helper'
RSpec.describe Fasp::AnnounceTrendWorker do
include ProviderRequestHelper
subject { described_class.new.perform(status.id, 'favourite') }
let(:status) { Fabricate(:status) }
let(:subscription) do
Fabricate(:fasp_subscription,
@ -14,10 +16,12 @@ RSpec.describe Fasp::AnnounceTrendWorker do
threshold_likes: 2)
end
let(:provider) { subscription.fasp_provider }
let(:path) { '/data_sharing/v0/announcements' }
let!(:stubbed_request) do
stub_provider_request(provider,
method: :post,
path: '/data_sharing/v0/announcements',
path:,
response_body: {
source: {
subscription: {
@ -36,15 +40,23 @@ RSpec.describe Fasp::AnnounceTrendWorker do
end
it 'sends the account uri to subscribed providers' do
described_class.new.perform(status.id, 'favourite')
subject
expect(stubbed_request).to have_been_made
end
describe 'provider delivery failure handling' do
let(:base_stubbed_request) do
stub_request(:post, provider.url(path))
end
it_behaves_like('worker handling fasp delivery failures')
end
end
context 'when the configured threshold is not met' do
it 'does not notify any provider' do
described_class.new.perform(status.id, 'favourite')
subject
expect(stubbed_request).to_not have_been_made
end

View File

@ -5,13 +5,17 @@ require 'rails_helper'
RSpec.describe Fasp::BackfillWorker do
include ProviderRequestHelper
subject { described_class.new.perform(backfill_request.id) }
let(:backfill_request) { Fabricate(:fasp_backfill_request) }
let(:provider) { backfill_request.fasp_provider }
let(:status) { Fabricate(:status) }
let(:path) { '/data_sharing/v0/announcements' }
let!(:stubbed_request) do
stub_provider_request(provider,
method: :post,
path: '/data_sharing/v0/announcements',
path:,
response_body: {
source: {
backfillRequest: {
@ -25,8 +29,16 @@ RSpec.describe Fasp::BackfillWorker do
end
it 'sends status uri to provider that requested backfill' do
described_class.new.perform(backfill_request.id)
subject
expect(stubbed_request).to have_been_made
end
describe 'provider delivery failure handling' do
let(:base_stubbed_request) do
stub_request(:post, provider.url(path))
end
it_behaves_like('worker handling fasp delivery failures')
end
end

View File

@ -5,13 +5,15 @@ require 'rails_helper'
RSpec.describe Fasp::FollowRecommendationWorker, feature: :fasp do
include ProviderRequestHelper
subject { described_class.new.perform(account.id) }
let(:provider) { Fabricate(:follow_recommendation_fasp) }
let(:account) { Fabricate(:account) }
let(:account_uri) { ActivityPub::TagManager.instance.uri_for(account) }
let(:fetch_service) { instance_double(ActivityPub::FetchRemoteActorService) }
let(:path) { "/follow_recommendation/v0/accounts?accountUri=#{URI.encode_uri_component(account_uri)}" }
let!(:stubbed_request) do
account_uri = ActivityPub::TagManager.instance.uri_for(account)
path = "/follow_recommendation/v0/accounts?accountUri=#{URI.encode_uri_component(account_uri)}"
stub_provider_request(provider,
method: :get,
path:,
@ -28,7 +30,7 @@ RSpec.describe Fasp::FollowRecommendationWorker, feature: :fasp do
end
it "sends the requesting account's uri to provider and fetches received account uris" do
described_class.new.perform(account.id)
subject
expect(stubbed_request).to have_been_made
expect(fetch_service).to have_received(:call).with('https://fedi.example.com/accounts/1')
@ -38,7 +40,7 @@ RSpec.describe Fasp::FollowRecommendationWorker, feature: :fasp do
it 'marks a running async refresh as finished' do
async_refresh = AsyncRefresh.create("fasp:follow_recommendation:#{account.id}", count_results: true)
described_class.new.perform(account.id)
subject
expect(async_refresh.reload).to be_finished
end
@ -46,14 +48,22 @@ RSpec.describe Fasp::FollowRecommendationWorker, feature: :fasp do
it 'tracks the number of fetched accounts in the async refresh' do
async_refresh = AsyncRefresh.create("fasp:follow_recommendation:#{account.id}", count_results: true)
described_class.new.perform(account.id)
subject
expect(async_refresh.reload.result_count).to eq 2
end
it 'persists the results' do
expect do
described_class.new.perform(account.id)
subject
end.to change(Fasp::FollowRecommendation, :count).by(2)
end
describe 'provider delivery failure handling' do
let(:base_stubbed_request) do
stub_request(:get, provider.url(path))
end
it_behaves_like('worker handling fasp delivery failures')
end
end