Compare commits

...

2 Commits

Author SHA1 Message Date
Matt Jankowski
dac81e44a8
Merge 2a81df8270 into 624c024766 2025-09-03 10:07:13 +00:00
Matt Jankowski
2a81df8270 Consistent usage of constants for perform_in 2025-08-28 09:00:34 -04:00
13 changed files with 39 additions and 20 deletions

View File

@ -14,7 +14,7 @@ class Api::V1::Accounts::CredentialsController < Api::BaseController
@account = current_account
UpdateAccountService.new.call(@account, account_params, raise_error: true)
current_user.update(user_params) if user_params
ActivityPub::UpdateDistributionWorker.perform_in(ActivityPub::UpdateDistributionWorker::DEBOUNCE_DELAY, @account.id)
ActivityPub::UpdateDistributionWorker.distribute(@account)
render json: @account, serializer: REST::CredentialAccountSerializer
rescue ActiveRecord::RecordInvalid => e
render json: ValidationErrorFormatter.new(e).as_json, status: 422

View File

@ -7,7 +7,7 @@ class Api::V1::Profile::AvatarsController < Api::BaseController
def destroy
@account = current_account
UpdateAccountService.new.call(@account, { avatar: nil }, raise_error: true)
ActivityPub::UpdateDistributionWorker.perform_in(ActivityPub::UpdateDistributionWorker::DEBOUNCE_DELAY, @account.id)
ActivityPub::UpdateDistributionWorker.distribute(@account)
render json: @account, serializer: REST::CredentialAccountSerializer
end
end

View File

@ -7,7 +7,7 @@ class Api::V1::Profile::HeadersController < Api::BaseController
def destroy
@account = current_account
UpdateAccountService.new.call(@account, { header: nil }, raise_error: true)
ActivityPub::UpdateDistributionWorker.perform_in(ActivityPub::UpdateDistributionWorker::DEBOUNCE_DELAY, @account.id)
ActivityPub::UpdateDistributionWorker.distribute(@account)
render json: @account, serializer: REST::CredentialAccountSerializer
end
end

View File

@ -8,7 +8,7 @@ module Settings
def destroy
if valid_picture?
if UpdateAccountService.new.call(@account, { @picture => nil, "#{@picture}_remote_url" => '' })
ActivityPub::UpdateDistributionWorker.perform_in(ActivityPub::UpdateDistributionWorker::DEBOUNCE_DELAY, @account.id)
ActivityPub::UpdateDistributionWorker.distribute(@account)
redirect_to settings_profile_path, notice: I18n.t('generic.changes_saved_msg'), status: 303
else
redirect_to settings_profile_path

View File

@ -8,7 +8,7 @@ class Settings::PrivacyController < Settings::BaseController
def update
if UpdateAccountService.new.call(@account, account_params.except(:settings))
current_user.update!(settings_attributes: account_params[:settings])
ActivityPub::UpdateDistributionWorker.perform_in(ActivityPub::UpdateDistributionWorker::DEBOUNCE_DELAY, @account.id)
ActivityPub::UpdateDistributionWorker.distribute(@account)
redirect_to settings_privacy_path, notice: I18n.t('generic.changes_saved_msg')
else
render :show

View File

@ -9,7 +9,7 @@ class Settings::ProfilesController < Settings::BaseController
def update
if UpdateAccountService.new.call(@account, account_params)
ActivityPub::UpdateDistributionWorker.perform_in(ActivityPub::UpdateDistributionWorker::DEBOUNCE_DELAY, @account.id)
ActivityPub::UpdateDistributionWorker.distribute(@account)
redirect_to settings_profile_path, notice: I18n.t('generic.changes_saved_msg')
else
@account.build_fields

View File

@ -8,7 +8,7 @@ class Settings::VerificationsController < Settings::BaseController
def update
if UpdateAccountService.new.call(@account, account_params)
ActivityPub::UpdateDistributionWorker.perform_in(ActivityPub::UpdateDistributionWorker::DEBOUNCE_DELAY, @account.id)
ActivityPub::UpdateDistributionWorker.distribute(@account)
redirect_to settings_verification_path, notice: I18n.t('generic.changes_saved_msg')
else
render :show

View File

@ -3,6 +3,9 @@
class ActivityPub::Activity::Create < ActivityPub::Activity
include FormattingHelper
DISTRIBUTE_DELAY = 1.minute
PROCESSING_DELAY = (30.seconds)..(10.minutes)
def perform
@account.schedule_refresh_if_stale!
@ -73,7 +76,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
def distribute
# Spread out crawling randomly to avoid DDoSing the link
LinkCrawlWorker.perform_in(rand(1..59).seconds, @status.id)
LinkCrawlWorker.perform_in(rand(DISTRIBUTE_DELAY), @status.id)
# Distribute into home and list feeds and notify mentioned accounts
::DistributionWorker.perform_async(@status.id, { 'silenced_account_ids' => @silenced_account_ids }) if @options[:override_timestamps] || @status.within_realtime_window?
@ -303,7 +306,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
media_attachment.download_thumbnail!
media_attachment.save
rescue Mastodon::UnexpectedResponseError, *Mastodon::HTTP_CONNECTION_ERRORS
RedownloadMediaWorker.perform_in(rand(30..600).seconds, media_attachment.id)
RedownloadMediaWorker.perform_in(rand(PROCESSING_DELAY), media_attachment.id)
rescue Seahorse::Client::NetworkingError => e
Rails.logger.warn "Error storing media attachment: #{e}"
RedownloadMediaWorker.perform_async(media_attachment.id)
@ -348,7 +351,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
end
increment_voters_count! unless already_voted
ActivityPub::DistributePollUpdateWorker.perform_in(3.minutes, replied_to_status.id) unless replied_to_status.preloadable_poll.hide_totals?
ActivityPub::DistributePollUpdateWorker.distribute(replied_to_status) unless replied_to_status.preloadable_poll.hide_totals?
end
def resolve_thread(status)
@ -359,7 +362,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
def resolve_unresolved_mentions(status)
@unresolved_mentions.uniq.each do |uri|
MentionResolveWorker.perform_in(rand(30...600).seconds, status.id, uri, { 'request_id' => @options[:request_id] })
MentionResolveWorker.perform_in(rand(PROCESSING_DELAY), status.id, uri, { 'request_id' => @options[:request_id] })
end
end
@ -382,7 +385,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
embedded_quote = safe_prefetched_embed(@account, @status_parser.quoted_object, @json['context'])
ActivityPub::VerifyQuoteService.new.call(@quote, fetchable_quoted_uri: @quote_uri, prefetched_quoted_object: embedded_quote, request_id: @options[:request_id], depth: @options[:depth])
rescue Mastodon::RecursionLimitExceededError, Mastodon::UnexpectedResponseError, *Mastodon::HTTP_CONNECTION_ERRORS
ActivityPub::RefetchAndVerifyQuoteWorker.perform_in(rand(30..600).seconds, @quote.id, @quote_uri, { 'request_id' => @options[:request_id] })
ActivityPub::RefetchAndVerifyQuoteWorker.perform_in(rand(PROCESSING_DELAY), @quote.id, @quote_uri, { 'request_id' => @options[:request_id] })
end
def conversation_from_uri(uri)

View File

@ -9,6 +9,9 @@ class ActivityPub::ProcessAccountService < BaseService
SUBDOMAINS_RATELIMIT = 10
DISCOVERIES_PER_REQUEST = 400
PROCESSING_DELAY = (30.seconds)..(10.minutes)
VERIFY_DELAY = 10.minutes
VALID_URI_SCHEMES = %w(http https).freeze
# Should be called with confirmed valid JSON
@ -142,13 +145,13 @@ class ActivityPub::ProcessAccountService < BaseService
@account.avatar_remote_url = image_url('icon') || '' unless skip_download?
@account.avatar = nil if @account.avatar_remote_url.blank?
rescue Mastodon::UnexpectedResponseError, *Mastodon::HTTP_CONNECTION_ERRORS
RedownloadAvatarWorker.perform_in(rand(30..600).seconds, @account.id)
RedownloadAvatarWorker.perform_in(rand(PROCESSING_DELAY), @account.id)
end
begin
@account.header_remote_url = image_url('image') || '' unless skip_download?
@account.header = nil if @account.header_remote_url.blank?
rescue Mastodon::UnexpectedResponseError, *Mastodon::HTTP_CONNECTION_ERRORS
RedownloadHeaderWorker.perform_in(rand(30..600).seconds, @account.id)
RedownloadHeaderWorker.perform_in(rand(PROCESSING_DELAY), @account.id)
end
@account.statuses_count = outbox_total_items if outbox_total_items.present?
@account.following_count = following_total_items if following_total_items.present?
@ -194,7 +197,7 @@ class ActivityPub::ProcessAccountService < BaseService
end
def check_links!
VerifyAccountLinksWorker.perform_in(rand(10.minutes.to_i), @account.id)
VerifyAccountLinksWorker.perform_in(rand(VERIFY_DELAY), @account.id)
end
def process_duplicate_accounts!

View File

@ -5,6 +5,9 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
include Redisable
include Lockable
CRAWL_DELAY = 1.minute
PROCESSING_DELAY = (30.seconds)..(10.minutes)
def call(status, activity_json, object_json, request_id: nil)
raise ArgumentError, 'Status has unsaved changes' if status.changed?
@ -124,7 +127,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
media_attachment.download_thumbnail! if media_attachment.thumbnail_remote_url_previously_changed?
media_attachment.save
rescue Mastodon::UnexpectedResponseError, *Mastodon::HTTP_CONNECTION_ERRORS
RedownloadMediaWorker.perform_in(rand(30..600).seconds, media_attachment.id)
RedownloadMediaWorker.perform_in(rand(PROCESSING_DELAY), media_attachment.id)
rescue Seahorse::Client::NetworkingError => e
Rails.logger.warn "Error storing media attachment: #{e}"
end
@ -249,7 +252,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
# Queue unresolved mentions for later
unresolved_mentions.uniq.each do |uri|
MentionResolveWorker.perform_in(rand(30...600).seconds, @status.id, uri, { 'request_id' => @request_id })
MentionResolveWorker.perform_in(rand(PROCESSING_DELAY), @status.id, uri, { 'request_id' => @request_id })
end
end
@ -329,7 +332,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
embedded_quote = safe_prefetched_embed(@account, @status_parser.quoted_object, @activity_json['context'])
ActivityPub::VerifyQuoteService.new.call(quote, fetchable_quoted_uri: quote_uri, prefetched_quoted_object: embedded_quote, request_id: @request_id)
rescue Mastodon::UnexpectedResponseError, *Mastodon::HTTP_CONNECTION_ERRORS
ActivityPub::RefetchAndVerifyQuoteWorker.perform_in(rand(30..600).seconds, quote.id, quote_uri, { 'request_id' => @request_id })
ActivityPub::RefetchAndVerifyQuoteWorker.perform_in(rand(PROCESSING_DELAY), quote.id, quote_uri, { 'request_id' => @request_id })
end
def update_counts!
@ -387,7 +390,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
def reset_preview_card!
@status.reset_preview_card!
LinkCrawlWorker.perform_in(rand(1..59).seconds, @status.id)
LinkCrawlWorker.perform_in(rand(CRAWL_DELAY), @status.id)
end
def broadcast_updates!

View File

@ -45,7 +45,7 @@ class VoteService < BaseService
def distribute_poll!
return if @poll.hide_totals?
ActivityPub::DistributePollUpdateWorker.perform_in(3.minutes, @poll.status.id)
ActivityPub::DistributePollUpdateWorker.distribute(@poll.status)
end
def queue_final_poll_check!

View File

@ -4,6 +4,8 @@ class ActivityPub::DistributePollUpdateWorker
include Sidekiq::Worker
include Payloadable
PROCESSING_DELAY = 3.minutes
sidekiq_options queue: 'push', lock: :until_executed, retry: 0
def perform(status_id)
@ -21,6 +23,10 @@ class ActivityPub::DistributePollUpdateWorker
true
end
def self.distribute(status)
perform_in(PROCESSING_DELAY, status.id)
end
private
def relayable?

View File

@ -16,6 +16,10 @@ class ActivityPub::UpdateDistributionWorker < ActivityPub::RawDistributionWorker
true
end
def self.distribute(account)
perform_in(DEBOUNCE_DELAY, account.id)
end
protected
def inboxes