From 5b97f25a1577c48f2f3b3b20f371f2488a2d1c91 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Tue, 30 Sep 2025 09:27:09 +0200 Subject: [PATCH] Add integration tests for mastodon-streaming (#36025) Co-authored-by: Claire Co-authored-by: David Roetzel --- Gemfile | 3 + Gemfile.lock | 1 + spec/rails_helper.rb | 3 +- spec/support/streaming_client.rb | 205 ++++++++++++++++++ spec/support/streaming_server_manager.rb | 11 +- .../streaming/channel_subscriptions_spec.rb | 62 ++++++ spec/system/streaming/streaming_spec.rb | 77 +++++++ 7 files changed, 358 insertions(+), 4 deletions(-) create mode 100644 spec/support/streaming_client.rb create mode 100644 spec/system/streaming/channel_subscriptions_spec.rb create mode 100644 spec/system/streaming/streaming_spec.rb diff --git a/Gemfile b/Gemfile index 126d73f9cab..16be707bfbf 100644 --- a/Gemfile +++ b/Gemfile @@ -160,6 +160,9 @@ group :test do # Stub web requests for specs gem 'webmock', '~> 3.18' + + # Websocket driver for testing integration between rails/sidekiq and streaming + gem 'websocket-driver', '~> 0.8', require: false end group :development do diff --git a/Gemfile.lock b/Gemfile.lock index fa785d6876f..5886fd6085b 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1102,6 +1102,7 @@ DEPENDENCIES webauthn (~> 3.0) webmock (~> 3.18) webpush! + websocket-driver (~> 0.8) xorcist (~> 1.1) RUBY VERSION diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index 3d3e556f353..6be93ecb70e 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -30,7 +30,8 @@ end # This needs to be defined before Rails is initialized STREAMING_PORT = ENV.fetch('TEST_STREAMING_PORT', '4020') -ENV['STREAMING_API_BASE_URL'] = "http://localhost:#{STREAMING_PORT}" +STREAMING_HOST = ENV.fetch('TEST_STREAMING_HOST', 'localhost') +ENV['STREAMING_API_BASE_URL'] = "http://#{STREAMING_HOST}:#{STREAMING_PORT}" require_relative '../config/environment' diff --git a/spec/support/streaming_client.rb b/spec/support/streaming_client.rb new file mode 100644 index 00000000000..02186e781c7 --- /dev/null +++ b/spec/support/streaming_client.rb @@ -0,0 +1,205 @@ +# frozen_string_literal: true + +require 'websocket/driver' + +class StreamingClient + module AUTHENTICATION + SUBPROTOCOL = 1 + AUTHORIZATION_HEADER = 2 + QUERY_PARAMETER = 3 + end + + class Connection + attr_reader :url, :messages, :last_error + attr_accessor :logger, :protocols + + def initialize(url) + @uri = URI.parse(url) + @query_params = @uri.query.present? ? URI.decode_www_form(@uri.query).to_h : {} + @protocols = nil + @headers = {} + + @dead = false + + @events_queue = Thread::Queue.new + @messages = [] + @last_error = nil + end + + def set_header(key, value) + @headers[key] = value + end + + def set_query_param(key, value) + @query_params[key] = value + end + + def driver + return @driver if defined?(@driver) + + @uri.query = URI.encode_www_form(@query_params) + @url = @uri.to_s + @tcp = TCPSocket.new(@uri.host, @uri.port) + + @driver = WebSocket::Driver.client(self, { + protocols: @protocols, + }) + + @headers.each_pair do |key, value| + @driver.set_header(key, value) + end + + at_exit do + @driver.close + end + + @driver.on(:open) do + @events_queue.enq({ event: :opened }) + end + + @driver.on(:message) do |event| + @events_queue.enq({ event: :message, payload: event.data }) + @messages << event.data + end + + @driver.on(:error) do |event| + logger&.debug(event.message) + @events_queue.enq({ event: :error, payload: event }) + @last_error = event + end + + @driver.on(:close) do |event| + @events_queue.enq({ event: :closing, payload: event }) + finalize(event) + end + + @thread = Thread.new do + @driver.parse(@tcp.read(1)) until @dead || @tcp.closed? + rescue Errno::ECONNRESET + # Create a synthetic close event: + close_event = WebSocket::Driver::CloseEvent.new( + WebSocket::Driver::Hybi::ERRORS[:unexpected_condition], + 'Connection reset' + ) + + finalize(close_event) + end + + @driver + end + + def wait_for_event(expected_event, timeout: 10) + Timeout.timeout(timeout) do + loop do + event = dequeue_event + + return nil if event.nil? && @events_queue.closed? + return event[:payload] unless event.nil? || event[:event] != expected_event + end + end + end + + def write(data) + @tcp.write(data) + rescue Errno::EPIPE => e + logger&.debug("EPIPE: #{e}") + end + + def finalize(event) + @dead = true + @events_queue.enq({ event: :closed, payload: event }) + @events_queue.close + @thread.kill + end + + def dequeue_event + event = @events_queue.pop + logger&.debug(event) unless event.nil? + event + end + end + + def initialize + @logger = Logger.new($stdout) + @logger.level = 'info' + + @connection = Connection.new("ws://#{STREAMING_HOST}:#{STREAMING_PORT}/api/v1/streaming") + @connection.logger = @logger + end + + def debug! + @logger.debug! + end + + def authenticate(access_token, authentication_method = StreamingClient::AUTHENTICATION::SUBPROTOCOL) + raise 'Invalid access_token passed to StreamingClient, expected a string' unless access_token.is_a?(String) + + case authentication_method + when AUTHENTICATION::QUERY_PARAMETER + @connection.set_query_param('access_token', access_token) + when AUTHENTICATION::SUBPROTOCOL + @connection.protocols = access_token + when AUTHENTICATION::AUTHORIZATION_HEADER + @connection.set_header('Authorization', "Bearer #{access_token}") + else + raise 'Invalid authentication method' + end + end + + def connect + @connection.driver.start + @connection.wait_for_event(:opened) + end + + def subscribe(channel, **params) + send(Oj.dump({ type: 'subscribe', stream: channel }.merge(params))) + end + + def wait_for(event = nil) + @connection.wait_for_event(event) + end + + def wait_for_message + message = @connection.wait_for_event(:message) + event = Oj.load(message) + event['payload'] = Oj.load(event['payload']) if event['payload'] + + event.deep_symbolize_keys + end + + delegate :status, :state, to: :'@connection.driver' + delegate :messages, to: :@connection + + def open? + state == :open + end + + def closing? + state == :closing + end + + def closed? + state == :closed + end + + def send(message) + @connection.driver.text(message) if open? + end + + def close + return if closed? + + @connection.driver.close unless closing? + @connection.wait_for_event(:closed) + end +end + +module StreamingClientHelper + def streaming_client + @streaming_client ||= StreamingClient.new + end +end + +RSpec.configure do |config| + config.include StreamingClientHelper, :streaming +end diff --git a/spec/support/streaming_server_manager.rb b/spec/support/streaming_server_manager.rb index d98f7dd9607..b565ed79a88 100644 --- a/spec/support/streaming_server_manager.rb +++ b/spec/support/streaming_server_manager.rb @@ -12,6 +12,11 @@ class StreamingServerManager queue = Queue.new + if ENV['DEBUG_STREAMING_SERVER'].present? + logger = Logger.new($stdout) + logger.level = 'debug' + end + @queue = queue @running_thread = Thread.new do @@ -31,7 +36,7 @@ class StreamingServerManager # Spawn a thread to listen on streaming server output output_thread = Thread.new do stdout_err.each_line do |line| - Rails.logger.info "Streaming server: #{line}" + logger&.info "Streaming server: #{line}" if status == :starting && line.match('Streaming API now listening on') status = :started @@ -115,12 +120,12 @@ RSpec.configure do |config| self.use_transactional_tests = true end - private - def streaming_server_manager @streaming_server_manager ||= StreamingServerManager.new end + private + def streaming_examples_present? RSpec.world.filtered_examples.values.flatten.any? { |example| example.metadata[:streaming] == true } end diff --git a/spec/system/streaming/channel_subscriptions_spec.rb b/spec/system/streaming/channel_subscriptions_spec.rb new file mode 100644 index 00000000000..54e125c293d --- /dev/null +++ b/spec/system/streaming/channel_subscriptions_spec.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +require 'rails_helper' +require 'debug' + +RSpec.describe 'Channel Subscriptions', :inline_jobs, :streaming do + let(:application) { Fabricate(:application, confidential: false) } + let(:scopes) { nil } + let(:access_token) { Fabricate(:accessible_access_token, resource_owner_id: user_account.user.id, application: application, scopes: scopes) } + + let(:user_account) { Fabricate(:account, username: 'alice', domain: nil) } + let(:bob_account) { Fabricate(:account, username: 'bob') } + + after do + streaming_client.close + end + + context 'when the access token has read scope' do + let(:scopes) { 'read' } + + it 'can subscribing to the public:local channel' do + streaming_client.authenticate(access_token.token) + + streaming_client.connect + streaming_client.subscribe('public:local') + + # We need to publish a status as there is no positive acknowledgement of + # subscriptions: + status = PostStatusService.new.call(bob_account, text: 'Hello @alice') + + # And then we want to receive that status: + message = streaming_client.wait_for_message + + expect(message).to include( + stream: be_an(Array).and(contain_exactly('public:local')), + event: 'update', + payload: include( + id: status.id.to_s + ) + ) + end + end + + context 'when the access token cannot read notifications' do + let(:scopes) { 'read:statuses' } + + it 'cannot subscribing to the user:notifications channel' do + streaming_client.authenticate(access_token.token) + + streaming_client.connect + streaming_client.subscribe('user:notification') + + # We should receive an error back immediately: + message = streaming_client.wait_for_message + + expect(message).to include( + error: 'Access token does not have the required scopes', + status: 401 + ) + end + end +end diff --git a/spec/system/streaming/streaming_spec.rb b/spec/system/streaming/streaming_spec.rb new file mode 100644 index 00000000000..c12bd1b18fe --- /dev/null +++ b/spec/system/streaming/streaming_spec.rb @@ -0,0 +1,77 @@ +# frozen_string_literal: true + +require 'rails_helper' +RSpec.describe 'Streaming', :inline_jobs, :streaming do + let(:authentication_method) { StreamingClient::AUTHENTICATION::SUBPROTOCOL } + let(:user) { Fabricate(:user) } + let(:scopes) { '' } + let(:application) { Fabricate(:application, confidential: false) } + let(:token) { Fabricate(:accessible_access_token, resource_owner_id: user.id, application: application, scopes: scopes) } + let(:access_token) { token.token } + + before do + streaming_client.authenticate(access_token, authentication_method) + end + + after do + streaming_client.close + end + + context 'when authenticating via subprotocol' do + it 'is able to connect' do + streaming_client.connect + + expect(streaming_client.status).to eq(101) + expect(streaming_client.open?).to be(true) + end + end + + context 'when authenticating via authorization header' do + let(:authentication_method) { StreamingClient::AUTHENTICATION::AUTHORIZATION_HEADER } + + it 'is able to connect successfully' do + streaming_client.connect + + expect(streaming_client.status).to eq(101) + expect(streaming_client.open?).to be(true) + end + end + + context 'when authenticating via query parameter' do + let(:authentication_method) { StreamingClient::AUTHENTICATION::QUERY_PARAMETER } + + it 'is able to connect successfully' do + streaming_client.connect + + expect(streaming_client.status).to eq(101) + expect(streaming_client.open?).to be(true) + end + end + + context 'with a revoked access token' do + before do + token.revoke + end + + it 'receives an 401 unauthorized error' do + streaming_client.connect + + expect(streaming_client.status).to eq(401) + expect(streaming_client.open?).to be(false) + end + end + + context 'when revoking an access token after connection' do + it 'disconnects the client' do + streaming_client.connect + + expect(streaming_client.status).to eq(101) + expect(streaming_client.open?).to be(true) + + token.revoke + + expect(streaming_client.wait_for(:closed).code).to be(1000) + expect(streaming_client.open?).to be(false) + end + end +end