Add scheduled job to clean up old notficiations.

This is a first step to have something concrete to discuss.

Needs tuning and tests.
This commit is contained in:
David Roetzel 2024-08-01 13:56:51 +02:00
parent 6c76502527
commit 634fc631b3
No known key found for this signature in database
6 changed files with 154 additions and 21 deletions

View File

@ -100,6 +100,8 @@ class Notification < ApplicationRecord
validates :type, inclusion: { in: TYPES } validates :type, inclusion: { in: TYPES }
scope :filtered, -> { where(filtered: true) }
scope :unfiltered, -> { where(filtered: false) }
scope :without_suspended, -> { joins(:from_account).merge(Account.without_suspended) } scope :without_suspended, -> { joins(:from_account).merge(Account.without_suspended) }
def type def type

View File

@ -0,0 +1,103 @@
# frozen_string_literal: true
class NotificationsCleanupService
# This can be expensive, so instead of cleaning up everything
# at once, we limit the number of accounts per run and run
# this more often.
ACCOUNTS_PER_RUN = 5
# Unfiltered notifications do not need any additional
# processing and can be deleted via SQL. This means we
# can safely delete a large number in one run.
UNFILTERED_DELETES_PER_ACCOUNT = 100_000
# Filtered notifications need to update their associated
# notification request. So we need to call #destroy on them
# which means we can only delete a comparatively small number
# in one run.
FILTERED_DESTROYS_PER_ACCOUNT = 1_000
# Different types of notifications may have different
# policies of how much of them / how long to keep them around.
POLICY_BY_TYPE = {
default: {
keep_at_least: 20_000,
months_to_keep: 6,
}.freeze,
}.freeze
def call(notification_type)
@notification_type = notification_type
accounts_with_old_notifications = fetch_accounts_with_old_notifications
accounts_with_many_notifications = fetch_accounts_with_many_notifications
affected_accounts = accounts_with_old_notifications & accounts_with_many_notifications
affected_accounts.take(ACCOUNTS_PER_RUN).each do |account_id|
base_query = construct_base_query(account_id)
# Delete unfiltered notifications via SQL
base_query
.unfiltered
.limit(UNFILTERED_DELETES_PER_ACCOUNT)
.delete_all
# Delete filtered notifications with '#destroy' to
# update notification requests.
base_query
.filtered
.limit(FILTERED_DESTROYS_PER_ACCOUNT)
.destroy_all
end
end
private
def policy
@policy ||= POLICY_BY_TYPE[@notification_type] || POLICY_BY_TYPE[:default]
end
def fetch_accounts_with_old_notifications
Notification
.where(type: @notification_type)
.where(created_at: ...policy[:months_to_keep].months.ago)
.distinct
.pluck(:account_id)
end
def fetch_accounts_with_many_notifications
Notification
.from(
Notification
.select('account_id, COUNT(*) AS total')
.where(type: @notification_type)
.group(:account_id)
.arel.as('totals')
)
.where('totals.total > ?', policy[:keep_at_least])
.pluck(:account_id)
end
def find_min_created_at_to_keep(account_id)
Notification
.from(
Notification
.where(type: @notification_type)
.where(account_id: account_id)
.limit(policy[:keep_at_least])
.order(created_at: :desc)
)
.group(:account_id)
.minimum(:created_at)[account_id]
end
def construct_base_query(account_id)
min_created_at_to_keep = find_min_created_at_to_keep(account_id)
Notification
.where(account_id: account_id)
.where(type: @notification_type)
.where(notifications: { created_at: ...min_created_at_to_keep })
.where(notifications: { created_at: ...policy[:months_to_keep].months.ago })
end
end

View File

@ -0,0 +1,26 @@
# frozen_string_literal: true
module LowPriorityScheduler
# These are latency limits on various queues above which a server is
# considered to be under load, causing the auto-deletion to be entirely
# skipped for that run.
LOAD_LATENCY_THRESHOLDS = {
default: 5,
push: 10,
# The `pull` queue has lower priority jobs, and it's unlikely that
# pushing deletes would cause much issues with this queue if it didn't
# cause issues with `default` and `push`. Yet, do not enqueue deletes
# if the instance is lagging behind too much.
pull: 5.minutes.to_i,
}.freeze
def under_load?
LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) }
end
private
def queue_under_load?(name, max_latency)
Sidekiq::Queue.new(name).latency > max_latency
end
end

View File

@ -3,6 +3,7 @@
class Scheduler::AccountsStatusesCleanupScheduler class Scheduler::AccountsStatusesCleanupScheduler
include Sidekiq::Worker include Sidekiq::Worker
include Redisable include Redisable
include LowPriorityScheduler
# This limit is mostly to be nice to the fediverse at large and not # This limit is mostly to be nice to the fediverse at large and not
# generate too much traffic. # generate too much traffic.
@ -19,19 +20,6 @@ class Scheduler::AccountsStatusesCleanupScheduler
# jobs to something the particular server can handle. # jobs to something the particular server can handle.
PER_THREAD_BUDGET = 5 PER_THREAD_BUDGET = 5
# These are latency limits on various queues above which a server is
# considered to be under load, causing the auto-deletion to be entirely
# skipped for that run.
LOAD_LATENCY_THRESHOLDS = {
default: 5,
push: 10,
# The `pull` queue has lower priority jobs, and it's unlikely that
# pushing deletes would cause much issues with this queue if it didn't
# cause issues with `default` and `push`. Yet, do not enqueue deletes
# if the instance is lagging behind too much.
pull: 5.minutes.to_i,
}.freeze
sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i
def perform def perform
@ -91,10 +79,6 @@ class Scheduler::AccountsStatusesCleanupScheduler
[PER_THREAD_BUDGET * threads, MAX_BUDGET].min [PER_THREAD_BUDGET * threads, MAX_BUDGET].min
end end
def under_load?
LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) }
end
private private
def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration) def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
@ -113,10 +97,6 @@ class Scheduler::AccountsStatusesCleanupScheduler
end end
end end
def queue_under_load?(name, max_latency)
Sidekiq::Queue.new(name).latency > max_latency
end
def last_processed_id def last_processed_id
redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i
end end

View File

@ -0,0 +1,18 @@
# frozen_string_literal: true
class Scheduler::NotificationsCleanupScheduler
include Sidekiq::Worker
include LowPriorityScheduler
TYPES_TO_CLEAN_UP = Notification::TYPES
sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i
def perform
return if under_load?
TYPES_TO_CLEAN_UP.each do |type|
NotificationsCleanupService.new.call(type)
end
end
end

View File

@ -67,3 +67,7 @@
interval: 1 hour interval: 1 hour
class: Scheduler::AutoCloseRegistrationsScheduler class: Scheduler::AutoCloseRegistrationsScheduler
queue: scheduler queue: scheduler
notifications_cleanup_scheduler:
interval: 4 hours
class: Scheduler::NotificationsCleanupScheduler
queue: scheduler