From 319fbbbfac4bcbcb718d059cb35a67eb9a2c5450 Mon Sep 17 00:00:00 2001 From: David Roetzel Date: Thu, 12 Jun 2025 16:54:00 +0200 Subject: [PATCH] Experimental Async Refreshes API (#34918) --- .../api/v1/timelines/home_controller.rb | 4 + .../v1_alpha/async_refreshes_controller.rb | 16 ++ .../concerns/async_refreshes_concern.rb | 11 ++ app/models/async_refresh.rb | 76 ++++++++ app/models/home_feed.rb | 30 ++- config/routes/api.rb | 5 + .../concerns/user_tracking_concern_spec.rb | 4 +- spec/models/async_refresh_spec.rb | 174 ++++++++++++++++++ spec/models/home_feed_spec.rb | 56 +++++- spec/requests/api/v1/timelines/home_spec.rb | 4 +- .../api/v1_alpha/async_refreshes_spec.rb | 70 +++++++ 11 files changed, 437 insertions(+), 13 deletions(-) create mode 100644 app/controllers/api/v1_alpha/async_refreshes_controller.rb create mode 100644 app/controllers/concerns/async_refreshes_concern.rb create mode 100644 app/models/async_refresh.rb create mode 100644 spec/models/async_refresh_spec.rb create mode 100644 spec/requests/api/v1_alpha/async_refreshes_spec.rb diff --git a/app/controllers/api/v1/timelines/home_controller.rb b/app/controllers/api/v1/timelines/home_controller.rb index d5d1828666..b8384a1368 100644 --- a/app/controllers/api/v1/timelines/home_controller.rb +++ b/app/controllers/api/v1/timelines/home_controller.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class Api::V1::Timelines::HomeController < Api::V1::Timelines::BaseController + include AsyncRefreshesConcern + before_action -> { doorkeeper_authorize! :read, :'read:statuses' }, only: [:show] before_action :require_user!, only: [:show] @@ -12,6 +14,8 @@ class Api::V1::Timelines::HomeController < Api::V1::Timelines::BaseController @relationships = StatusRelationshipsPresenter.new(@statuses, current_user&.account_id) end + add_async_refresh_header(account_home_feed.async_refresh, retry_seconds: 5) + render json: @statuses, each_serializer: REST::StatusSerializer, relationships: @relationships, diff --git a/app/controllers/api/v1_alpha/async_refreshes_controller.rb b/app/controllers/api/v1_alpha/async_refreshes_controller.rb new file mode 100644 index 0000000000..a34935c0dd --- /dev/null +++ b/app/controllers/api/v1_alpha/async_refreshes_controller.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +class Api::V1Alpha::AsyncRefreshesController < Api::BaseController + before_action -> { doorkeeper_authorize! :read } + before_action :require_user! + + def show + async_refresh = AsyncRefresh.find(params[:id]) + + if async_refresh + render json: async_refresh + else + not_found + end + end +end diff --git a/app/controllers/concerns/async_refreshes_concern.rb b/app/controllers/concerns/async_refreshes_concern.rb new file mode 100644 index 0000000000..29122e16b5 --- /dev/null +++ b/app/controllers/concerns/async_refreshes_concern.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +module AsyncRefreshesConcern + private + + def add_async_refresh_header(async_refresh, retry_seconds: 3) + return unless async_refresh.running? + + response.headers['Mastodon-Async-Refresh'] = "id=\"#{async_refresh.id}\", retry=#{retry_seconds}" + end +end diff --git a/app/models/async_refresh.rb b/app/models/async_refresh.rb new file mode 100644 index 0000000000..9ca6985750 --- /dev/null +++ b/app/models/async_refresh.rb @@ -0,0 +1,76 @@ +# frozen_string_literal: true + +class AsyncRefresh + extend Redisable + include Redisable + + NEW_REFRESH_EXPIRATION = 1.day + FINISHED_REFRESH_EXPIRATION = 1.hour + + def self.find(id) + redis_key = Rails.application.message_verifier('async_refreshes').verify(id) + new(redis_key) if redis.exists?(redis_key) + rescue ActiveSupport::MessageVerifier::InvalidSignature + nil + end + + def self.create(redis_key, count_results: false) + data = { 'status' => 'running' } + data['result_count'] = 0 if count_results + redis.hset(redis_key, data) + redis.expire(redis_key, NEW_REFRESH_EXPIRATION) + new(redis_key) + end + + attr_reader :status, :result_count + + def initialize(redis_key) + @redis_key = redis_key + fetch_data_from_redis + end + + def id + Rails.application.message_verifier('async_refreshes').generate(@redis_key) + end + + def running? + @status == 'running' + end + + def finished? + @status == 'finished' + end + + def finish! + redis.pipelined do |pipeline| + pipeline.hset(@redis_key, { 'status' => 'finished' }) + pipeline.expire(@redis_key, FINISHED_REFRESH_EXPIRATION) + end + @status = 'finished' + end + + def reload + fetch_data_from_redis + self + end + + def to_json(_options) + { + async_refresh: { + id:, + status:, + result_count:, + }, + }.to_json + end + + private + + def fetch_data_from_redis + @status, @result_count = redis.pipelined do |pipeline| + pipeline.hget(@redis_key, 'status') + pipeline.hget(@redis_key, 'result_count') + end + @result_count = @result_count.presence&.to_i + end +end diff --git a/app/models/home_feed.rb b/app/models/home_feed.rb index 81730ac98f..8962a99e32 100644 --- a/app/models/home_feed.rb +++ b/app/models/home_feed.rb @@ -6,15 +6,39 @@ class HomeFeed < Feed super(:home, account.id) end + def async_refresh + @async_refresh ||= AsyncRefresh.new(redis_regeneration_key) + end + def regenerating? - redis.exists?("account:#{@account.id}:regeneration") + async_refresh.running? + rescue Redis::CommandError + retry if upgrade_redis_key! end def regeneration_in_progress! - redis.set("account:#{@account.id}:regeneration", true, nx: true, ex: 1.day.seconds) + @async_refresh = AsyncRefresh.create(redis_regeneration_key) + rescue Redis::CommandError + upgrade_redis_key! end def regeneration_finished! - redis.del("account:#{@account.id}:regeneration") + async_refresh.finish! + rescue Redis::CommandError + retry if upgrade_redis_key! + end + + private + + def redis_regeneration_key + @redis_regeneration_key = "account:#{@account.id}:regeneration" + end + + def upgrade_redis_key! + if redis.type(redis_regeneration_key) == 'string' + redis.del(redis_regeneration_key) + regeneration_in_progress! + true + end end end diff --git a/config/routes/api.rb b/config/routes/api.rb index 9c94467f01..4040a4350f 100644 --- a/config/routes/api.rb +++ b/config/routes/api.rb @@ -4,6 +4,11 @@ namespace :api, format: false do # OEmbed get '/oembed', to: 'oembed#show', as: :oembed + # Experimental JSON / REST API + namespace :v1_alpha do + resources :async_refreshes, only: :show + end + # JSON / REST API namespace :v1 do resources :statuses, only: [:index, :create, :show, :update, :destroy] do diff --git a/spec/controllers/concerns/user_tracking_concern_spec.rb b/spec/controllers/concerns/user_tracking_concern_spec.rb index d67b0ef5e7..3d7d134cac 100644 --- a/spec/controllers/concerns/user_tracking_concern_spec.rb +++ b/spec/controllers/concerns/user_tracking_concern_spec.rb @@ -65,7 +65,7 @@ RSpec.describe UserTrackingConcern do get :show expect_updated_sign_in_at(user) - expect(redis.get("account:#{user.account_id}:regeneration")).to eq 'true' + expect(redis.exists?("account:#{user.account_id}:regeneration")).to be true expect(RegenerationWorker).to have_received(:perform_async) end @@ -80,7 +80,7 @@ RSpec.describe UserTrackingConcern do expect_updated_sign_in_at(user) expect(redis.zcard(FeedManager.instance.key(:home, user.account_id))).to eq 3 - expect(redis.get("account:#{user.account_id}:regeneration")).to be_nil + expect(redis.hget("account:#{user.account_id}:regeneration", 'status')).to eq 'finished' end end diff --git a/spec/models/async_refresh_spec.rb b/spec/models/async_refresh_spec.rb new file mode 100644 index 0000000000..3e055b5053 --- /dev/null +++ b/spec/models/async_refresh_spec.rb @@ -0,0 +1,174 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe AsyncRefresh do + subject { described_class.new(redis_key) } + + let(:redis_key) { 'testjob:key' } + let(:status) { 'running' } + let(:job_hash) { { 'status' => status, 'result_count' => 23 } } + + describe '::find' do + context 'when a matching job in redis exists' do + before do + redis.hset(redis_key, job_hash) + end + + it 'returns a new instance' do + id = Rails.application.message_verifier('async_refreshes').generate(redis_key) + async_refresh = described_class.find(id) + + expect(async_refresh).to be_a described_class + end + end + + context 'when no matching job in redis exists' do + it 'returns `nil`' do + id = Rails.application.message_verifier('async_refreshes').generate('non_existent') + expect(described_class.find(id)).to be_nil + end + end + end + + describe '::create' do + it 'inserts the given key into redis' do + described_class.create(redis_key) + + expect(redis.exists?(redis_key)).to be true + end + + it 'sets the status to `running`' do + async_refresh = described_class.create(redis_key) + + expect(async_refresh.status).to eq 'running' + end + + context 'with `count_results`' do + it 'set `result_count` to 0' do + async_refresh = described_class.create(redis_key, count_results: true) + + expect(async_refresh.result_count).to eq 0 + end + end + + context 'without `count_results`' do + it 'does not set `result_count`' do + async_refresh = described_class.create(redis_key) + + expect(async_refresh.result_count).to be_nil + end + end + end + + describe '#id' do + before do + redis.hset(redis_key, job_hash) + end + + it "returns a signed version of the job's redis key" do + id = subject.id + key_name = Base64.decode64(id.split('-').first) + + expect(key_name).to include redis_key + end + end + + describe '#status' do + before do + redis.hset(redis_key, job_hash) + end + + context 'when the job is running' do + it "returns 'running'" do + expect(subject.status).to eq 'running' + end + end + + context 'when the job is finished' do + let(:status) { 'finished' } + + it "returns 'finished'" do + expect(subject.status).to eq 'finished' + end + end + end + + describe '#running?' do + before do + redis.hset(redis_key, job_hash) + end + + context 'when the job is running' do + it 'returns `true`' do + expect(subject.running?).to be true + end + end + + context 'when the job is finished' do + let(:status) { 'finished' } + + it 'returns `false`' do + expect(subject.running?).to be false + end + end + end + + describe '#finished?' do + before do + redis.hset(redis_key, job_hash) + end + + context 'when the job is running' do + it 'returns `false`' do + expect(subject.finished?).to be false + end + end + + context 'when the job is finished' do + let(:status) { 'finished' } + + it 'returns `true`' do + expect(subject.finished?).to be true + end + end + end + + describe '#finish!' do + before do + redis.hset(redis_key, job_hash) + end + + it 'sets the status to `finished`' do + subject.finish! + + expect(subject).to be_finished + end + end + + describe '#result_count' do + before do + redis.hset(redis_key, job_hash) + end + + it 'returns the result count from redis' do + expect(subject.result_count).to eq 23 + end + end + + describe '#reload' do + before do + redis.hset(redis_key, job_hash) + end + + it 'reloads the current data from redis and returns itself' do + expect(subject).to be_running + redis.hset(redis_key, { 'status' => 'finished' }) + expect(subject).to be_running + + expect(subject.reload).to eq subject + + expect(subject).to be_finished + end + end +end diff --git a/spec/models/home_feed_spec.rb b/spec/models/home_feed_spec.rb index 7546fb7861..882f84c8a7 100644 --- a/spec/models/home_feed_spec.rb +++ b/spec/models/home_feed_spec.rb @@ -32,7 +32,7 @@ RSpec.describe HomeFeed do context 'when feed is being generated' do before do - redis.set("account:#{account.id}:regeneration", true) + redis.hset("account:#{account.id}:regeneration", { 'status' => 'running' }) end it 'returns nothing' do @@ -44,9 +44,19 @@ RSpec.describe HomeFeed do end describe '#regenerating?' do + context 'when an old-style string key is still in use' do + it 'upgrades the key to a hash' do + redis.set("account:#{account.id}:regeneration", true) + + expect(subject.regenerating?).to be true + + expect(redis.type("account:#{account.id}:regeneration")).to eq 'hash' + end + end + context 'when feed is being generated' do before do - redis.set("account:#{account.id}:regeneration", true) + redis.hset("account:#{account.id}:regeneration", { 'status' => 'running' }) end it 'returns `true`' do @@ -55,13 +65,35 @@ RSpec.describe HomeFeed do end context 'when feed is not being generated' do - it 'returns `false`' do - expect(subject.regenerating?).to be false + context 'when the job is marked as finished' do + before do + redis.hset("account:#{account.id}:regeneration", { 'status' => 'finished' }) + end + + it 'returns `false`' do + expect(subject.regenerating?).to be false + end + end + + context 'when the job key is missing' do + it 'returns `false`' do + expect(subject.regenerating?).to be false + end end end end describe '#regeneration_in_progress!' do + context 'when an old-style string key is still in use' do + it 'upgrades the key to a hash' do + redis.set("account:#{account.id}:regeneration", true) + + subject.regeneration_in_progress! + + expect(redis.type("account:#{account.id}:regeneration")).to eq 'hash' + end + end + it 'sets the corresponding key in redis' do expect(redis.exists?("account:#{account.id}:regeneration")).to be false @@ -72,12 +104,22 @@ RSpec.describe HomeFeed do end describe '#regeneration_finished!' do - it 'removes the corresponding key from redis' do - redis.set("account:#{account.id}:regeneration", true) + context 'when an old-style string key is still in use' do + it 'upgrades the key to a hash' do + redis.set("account:#{account.id}:regeneration", true) + + subject.regeneration_finished! + + expect(redis.type("account:#{account.id}:regeneration")).to eq 'hash' + end + end + + it "sets the corresponding key's status to 'finished'" do + redis.hset("account:#{account.id}:regeneration", { 'status' => 'running' }) subject.regeneration_finished! - expect(redis.exists?("account:#{account.id}:regeneration")).to be false + expect(redis.hget("account:#{account.id}:regeneration", 'status')).to eq 'finished' end end end diff --git a/spec/requests/api/v1/timelines/home_spec.rb b/spec/requests/api/v1/timelines/home_spec.rb index 38e18979d2..89b5f392c5 100644 --- a/spec/requests/api/v1/timelines/home_spec.rb +++ b/spec/requests/api/v1/timelines/home_spec.rb @@ -66,7 +66,8 @@ RSpec.describe 'Home', :inline_jobs do end context 'when the timeline is regenerating' do - let(:timeline) { instance_double(HomeFeed, regenerating?: true, get: []) } + let(:async_refresh) { AsyncRefresh.create("account:#{user.account_id}:regeneration") } + let(:timeline) { instance_double(HomeFeed, regenerating?: true, get: [], async_refresh:) } before do allow(HomeFeed).to receive(:new).and_return(timeline) @@ -76,6 +77,7 @@ RSpec.describe 'Home', :inline_jobs do subject expect(response).to have_http_status(206) + expect(response.headers['Mastodon-Async-Refresh']).to eq "id=\"#{async_refresh.id}\", retry=5" expect(response.content_type) .to start_with('application/json') end diff --git a/spec/requests/api/v1_alpha/async_refreshes_spec.rb b/spec/requests/api/v1_alpha/async_refreshes_spec.rb new file mode 100644 index 0000000000..0cd85cf99b --- /dev/null +++ b/spec/requests/api/v1_alpha/async_refreshes_spec.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe 'AsyncRefreshes' do + let(:user) { Fabricate(:user) } + let(:token) { Fabricate(:accessible_access_token, resource_owner_id: user.id, scopes: scopes) } + let(:headers) { { 'Authorization' => "Bearer #{token.token}" } } + let(:job) { AsyncRefresh.new('test_job') } + + describe 'GET /api/v1_alpha/async_refreshes/:id' do + context 'when not authorized' do + it 'returns http unauthorized' do + get api_v1_alpha_async_refresh_path(job.id) + + expect(response) + .to have_http_status(401) + expect(response.content_type) + .to start_with('application/json') + end + end + + context 'with wrong scope' do + before do + get api_v1_alpha_async_refresh_path(job.id), headers: headers + end + + it_behaves_like 'forbidden for wrong scope', 'write write:accounts' + end + + context 'with correct scope' do + let(:scopes) { 'read' } + + context 'when job exists' do + before do + redis.hset('test_job', { 'status' => 'running', 'result_count' => 10 }) + end + + after do + redis.del('test_job') + end + + it 'returns http success' do + get api_v1_alpha_async_refresh_path(job.id), headers: headers + + expect(response) + .to have_http_status(200) + + expect(response.content_type) + .to start_with('application/json') + + parsed_response = response.parsed_body + expect(parsed_response) + .to be_present + expect(parsed_response['async_refresh']) + .to include('status' => 'running', 'result_count' => 10) + end + end + + context 'when job does not exist' do + it 'returns not found' do + get api_v1_alpha_async_refresh_path(job.id), headers: headers + + expect(response) + .to have_http_status(404) + end + end + end + end +end