From be5a1b1bb9ef2b303c4656798d1cb9feb8ef3343 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Thu, 4 Sep 2025 00:31:13 +0200 Subject: [PATCH 1/4] Add integration tests for mastodon-streaming --- Gemfile | 3 + Gemfile.lock | 1 + spec/rails_helper.rb | 3 +- spec/streaming/streaming_spec.rb | 82 +++++++++ spec/support/streaming_client.rb | 215 +++++++++++++++++++++++ spec/support/streaming_server_manager.rb | 14 +- 6 files changed, 313 insertions(+), 5 deletions(-) create mode 100644 spec/streaming/streaming_spec.rb create mode 100644 spec/support/streaming_client.rb diff --git a/Gemfile b/Gemfile index b80621bf9de..d5c8fecf3d5 100644 --- a/Gemfile +++ b/Gemfile @@ -159,6 +159,9 @@ group :test do # Stub web requests for specs gem 'webmock', '~> 3.18' + + # Websocket driver for testing integration between rails/sidekiq and streaming + gem 'websocket-driver', '~> 0.8', require: false end group :development do diff --git a/Gemfile.lock b/Gemfile.lock index 0f6f85585c8..80cad721630 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1103,6 +1103,7 @@ DEPENDENCIES webauthn (~> 3.0) webmock (~> 3.18) webpush! + websocket-driver (~> 0.8) xorcist (~> 1.1) RUBY VERSION diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index 3d3e556f353..6be93ecb70e 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -30,7 +30,8 @@ end # This needs to be defined before Rails is initialized STREAMING_PORT = ENV.fetch('TEST_STREAMING_PORT', '4020') -ENV['STREAMING_API_BASE_URL'] = "http://localhost:#{STREAMING_PORT}" +STREAMING_HOST = ENV.fetch('TEST_STREAMING_HOST', 'localhost') +ENV['STREAMING_API_BASE_URL'] = "http://#{STREAMING_HOST}:#{STREAMING_PORT}" require_relative '../config/environment' diff --git a/spec/streaming/streaming_spec.rb b/spec/streaming/streaming_spec.rb new file mode 100644 index 00000000000..93cdebfcfa8 --- /dev/null +++ b/spec/streaming/streaming_spec.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +require 'rails_helper' +RSpec.describe 'Streaming', :inline_jobs do + let(:authentication_method) { StreamingClient::AUTHENTICATION::SUBPROTOCOL } + let(:user) { Fabricate(:user) } + let(:scopes) { '' } + let(:application) { Fabricate(:application, confidential: false) } + let(:token) { Fabricate(:accessible_access_token, resource_owner_id: user.id, application: application, scopes: scopes) } + let(:access_token) { token.token } + + before do + streaming_client.authenticate(access_token, authentication_method) + end + + after do + streaming_client.close + end + + it 'receives an 101 upgrade to websocket' do + streaming_client.connect + expect(streaming_client.status_code).to eq(101) + end + + context 'when authenticating via subprotocol' do + it 'is able to connect' do + streaming_client.connect + + expect(streaming_client.status_code).to eq(101) + expect(streaming_client.open?).to be(true) + end + end + + context 'when authenticating via authorization header' do + let(:authentication_method) { StreamingClient::AUTHENTICATION::AUTHORIZATION_HEADER } + + it 'is able to connect successfully' do + streaming_client.connect + + expect(streaming_client.status_code).to eq(101) + expect(streaming_client.open?).to be(true) + end + end + + context 'when authenticating via query parameter' do + let(:authentication_method) { StreamingClient::AUTHENTICATION::QUERY_PARAMETER } + + it 'is able to connect successfully' do + streaming_client.connect + + expect(streaming_client.status_code).to eq(101) + expect(streaming_client.open?).to be(true) + end + end + + context 'with a revoked access token' do + before do + token.revoke + end + + it 'receives an 401 unauthorized error' do + streaming_client.connect + + expect(streaming_client.status_code).to eq(401) + expect(streaming_client.open?).to be(false) + end + end + + context 'when revoking an access token after connection' do + it 'disconnects the client' do + streaming_client.connect + + expect(streaming_client.status_code).to eq(101) + expect(streaming_client.open?).to be(true) + + token.revoke + + expect(streaming_client.wait_for(:closed).code).to be(1000) + expect(streaming_client.open?).to be(false) + end + end +end diff --git a/spec/support/streaming_client.rb b/spec/support/streaming_client.rb new file mode 100644 index 00000000000..b32acce44c1 --- /dev/null +++ b/spec/support/streaming_client.rb @@ -0,0 +1,215 @@ +# frozen_string_literal: true + +require 'websocket/driver' + +class StreamingClient + module AUTHENTICATION + SUBPROTOCOL = 1 + AUTHORIZATION_HEADER = 2 + QUERY_PARAMETER = 3 + + def self.supported?(method) + [ + AUTHENTICATION::SUBPROTOCOL, + AUTHENTICATION::QUERY_PARAMETER, + AUTHENTICATION::AUTHORIZATION_HEADER, + ].include?(method) + end + end + + class Connection + attr_reader :url, :messages, :last_error + attr_accessor :logger, :protocols + + @logger = nil + + def initialize(url) + @uri = URI.parse(url) + @query_params = @uri.query.present? ? URI.decode_www_form(@uri.query).to_h : {} + @protocols = nil + @headers = {} + + @dead = false + + @events_queue = Thread::Queue.new + @messages = Thread::Queue.new + @last_error = nil + end + + def set_header(key, value) + @headers[key] = value + end + + def set_query_param(key, value) + @query_params[key] = value + end + + def driver + return @driver if defined?(@driver) + + @uri.query = URI.encode_www_form(@query_params) + @url = @uri.to_s + @tcp = TCPSocket.new(@uri.host, @uri.port) + + @driver = WebSocket::Driver.client(self, { + protocols: @protocols, + }) + + @headers.each_pair do |key, value| + @driver.set_header(key, value) + end + + at_exit do + @driver.close + end + + @driver.on(:open) do + @events_queue.enq({ event: :opened }) + end + + @driver.on(:message) do |event| + @events_queue.enq({ event: :message, payload: event }) + @messages << event + end + + @driver.on(:error) do |event| + logger&.debug(event.message) + @events_queue.enq({ event: :error, payload: event }) + @last_error = event + end + + @driver.on(:close) do |event| + @events_queue.enq({ event: :closing, payload: event }) + finalize(event) + end + + @thread = Thread.new do + @driver.parse(@tcp.read(1)) until @dead || @tcp.closed? + end + + @driver + end + + def wait_for_event(expected_event = nil, timeout: 10) + Timeout.timeout(timeout) do + loop do + if expected_event.nil? + unless (event = dequeue_event(timeout)).nil? + return event[:payload] + end + else + event = dequeue_event(timeout) + + return nil if event.nil? && @events_queue.closed? + return event[:payload] unless event.nil? || event[:event] != expected_event + end + end + end + end + + def write(data) + @tcp.write(data) + end + + def finalize(event) + @dead = true + @events_queue.enq({ event: :closed, payload: event }) + @events_queue.close + @thread.kill + end + + def dequeue_event(timeout) + event = @events_queue.pop(timeout:) + logger&.debug(event) unless event.nil? + event + end + end + + def initialize + @logger = Logger.new($stdout) + @logger.level = 'info' + + @connection = Connection.new("ws://#{STREAMING_HOST}:#{STREAMING_PORT}/api/v1/streaming") + @connection.logger = @logger + end + + def debug! + @logger.debug! + end + + def authenticate(access_token, authentication_method) + raise 'access_token passed to StreamingClient was not a string' unless access_token.is_a?(String) + raise 'invalid authentication method' unless AUTHENTICATION.supported?(authentication_method) + + case authentication_method + when AUTHENTICATION::QUERY_PARAMETER + @connection.set_query_param('access_token', access_token) + when AUTHENTICATION::SUBPROTOCOL + @connection.protocols = access_token + when AUTHENTICATION::AUTHORIZATION_HEADER + @connection.set_header('Authorization', "Bearer #{access_token}") + end + end + + def connect + @connection.driver.start + @connection.wait_for_event(:opened) + end + + def wait_for(event = nil) + @connection.wait_for_event(event) + end + + def status_code + @connection.driver.status + end + + def state + @connection.driver.state + end + + def open? + state == :open + end + + def closing? + state == :closing + end + + def closed? + state == :closed + end + + def send(message) + @connection.driver.text(message) if open? + end + + def close + return if closed? + + @connection.driver.close unless closing? + @connection.wait_for_event(:closed) + end +end + +RSpec.configure do |config| + config.around :each, type: :streaming do |example| + # Streaming server needs DB access but `use_transactional_tests` rolls back + # every transaction. Disable this feature for streaming tests, and use + # DatabaseCleaner to clean the database tables between each test. + self.use_transactional_tests = false + + def streaming_client + @streaming_client ||= StreamingClient.new + end + + DatabaseCleaner.cleaning do + # Load seeds so we have the default roles otherwise cleared by `DatabaseCleaner` + Rails.application.load_seed + + example.run + end + + self.use_transactional_tests = true + end +end diff --git a/spec/support/streaming_server_manager.rb b/spec/support/streaming_server_manager.rb index d98f7dd9607..ca8f8d59c93 100644 --- a/spec/support/streaming_server_manager.rb +++ b/spec/support/streaming_server_manager.rb @@ -79,14 +79,14 @@ end RSpec.configure do |config| config.before :suite do - if streaming_examples_present? + if streaming_examples_present? || streaming_spec? # Start the node streaming server streaming_server_manager.start(port: STREAMING_PORT) end end config.after :suite do - if streaming_examples_present? + if streaming_examples_present? || streaming_spec? # Stop the node streaming server streaming_server_manager.stop end @@ -115,13 +115,19 @@ RSpec.configure do |config| self.use_transactional_tests = true end - private - def streaming_server_manager @streaming_server_manager ||= StreamingServerManager.new end + private + def streaming_examples_present? RSpec.world.filtered_examples.values.flatten.any? { |example| example.metadata[:streaming] == true } end + + def streaming_spec? + RSpec.world.filtered_examples.values.flatten.any? do |example| + example.metadata[:type] == :streaming + end + end end From 2010e1cde26c368e4a76e046a7249928a262d4e0 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Thu, 4 Sep 2025 22:59:44 +0200 Subject: [PATCH 2/4] Move streaming specs to test-e2e job --- .github/workflows/test-ruby.yml | 2 +- spec/rails_helper.rb | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test-ruby.yml b/.github/workflows/test-ruby.yml index 63d31725043..3cbbf9da2db 100644 --- a/.github/workflows/test-ruby.yml +++ b/.github/workflows/test-ruby.yml @@ -347,7 +347,7 @@ jobs: if: steps.playwright-cache.outputs.cache-hit == 'true' run: yarn run playwright install-deps chromium - - run: bin/rspec spec/system --tag streaming --tag js + - run: bin/rspec spec/system --tag streaming --tag js --tag streaming_client - name: Archive logs uses: actions/upload-artifact@v4 diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index 6be93ecb70e..178b78c60c3 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -85,6 +85,7 @@ RSpec.configure do |config| # By default, skip specs that need the streaming server config.filter_run_excluding :streaming + config.filter_run_excluding :streaming_client config.fixture_paths = [ Rails.root.join('spec', 'fixtures'), @@ -104,6 +105,11 @@ RSpec.configure do |config| metadata[:search] = true end + # Set `streaming_client` metadata true for all specs in spec/streaming/ + config.define_derived_metadata(file_path: Regexp.new('spec/streaming/*')) do |metadata| + metadata[:streaming_client] = true + end + config.include Devise::Test::ControllerHelpers, type: :controller config.include Devise::Test::ControllerHelpers, type: :helper config.include Devise::Test::ControllerHelpers, type: :view From 2aefbe02a1bc9bd424835ea88bfb213db96618fd Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Thu, 4 Sep 2025 23:07:16 +0200 Subject: [PATCH 3/4] Move streaming specs to test-streaming-integration job --- .github/workflows/test-ruby.yml | 82 ++++++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test-ruby.yml b/.github/workflows/test-ruby.yml index 3cbbf9da2db..79e84ee268d 100644 --- a/.github/workflows/test-ruby.yml +++ b/.github/workflows/test-ruby.yml @@ -347,7 +347,7 @@ jobs: if: steps.playwright-cache.outputs.cache-hit == 'true' run: yarn run playwright install-deps chromium - - run: bin/rspec spec/system --tag streaming --tag js --tag streaming_client + - run: bin/rspec spec/system --tag streaming --tag js - name: Archive logs uses: actions/upload-artifact@v4 @@ -363,6 +363,86 @@ jobs: name: e2e-screenshots-${{ matrix.ruby-version }} path: tmp/capybara/ + test-streaming-integration: + name: Streaming Server integration testing + runs-on: ubuntu-latest + + needs: + - build + + services: + postgres: + image: postgres:14-alpine + env: + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + options: >- + --health-cmd pg_isready + --health-interval 10ms + --health-timeout 3s + --health-retries 50 + ports: + - 5432:5432 + + redis: + image: redis:7-alpine + options: >- + --health-cmd "redis-cli ping" + --health-interval 10ms + --health-timeout 3s + --health-retries 50 + ports: + - 6379:6379 + + env: + DB_HOST: localhost + DB_USER: postgres + DB_PASS: postgres + RAILS_ENV: test + BUNDLE_WITH: test + LOCAL_DOMAIN: localhost:3000 + LOCAL_HTTPS: false + + strategy: + fail-fast: false + matrix: + ruby-version: + - '3.2' + - '3.3' + - '.ruby-version' + + steps: + - uses: actions/checkout@v4 + + - uses: actions/download-artifact@v4 + with: + path: './' + name: ${{ github.sha }} + + - name: Expand archived asset artifacts + run: | + tar xvzf artifacts.tar.gz + + - name: Set up Ruby environment + uses: ./.github/actions/setup-ruby + with: + ruby-version: ${{ matrix.ruby-version}} + additional-system-dependencies: ffmpeg + + - name: Set up Javascript environment + uses: ./.github/actions/setup-javascript + + - name: Load database schema + run: './bin/rails db:create db:schema:load db:seed' + - run: bin/rspec spec/streaming --tag streaming_client + + - name: Archive logs + uses: actions/upload-artifact@v4 + if: failure() + with: + name: e2e-logs-${{ matrix.ruby-version }} + path: log/ + test-search: name: Elastic Search integration testing runs-on: ubuntu-latest From 7839dfd43871abc4c6a6e6ad7af9a54f83399b55 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Fri, 5 Sep 2025 01:34:45 +0200 Subject: [PATCH 4/4] Add tests for subscribing to some channels --- spec/streaming/channel_subscriptions_spec.rb | 62 ++++++++++++++++++++ spec/support/streaming_client.rb | 34 +++++++++-- spec/support/streaming_server_manager.rb | 7 ++- 3 files changed, 98 insertions(+), 5 deletions(-) create mode 100644 spec/streaming/channel_subscriptions_spec.rb diff --git a/spec/streaming/channel_subscriptions_spec.rb b/spec/streaming/channel_subscriptions_spec.rb new file mode 100644 index 00000000000..39a85592909 --- /dev/null +++ b/spec/streaming/channel_subscriptions_spec.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +require 'rails_helper' +require 'debug' + +RSpec.describe 'Channel Subscriptions', :inline_jobs do # rubocop:disable RSpec/DescribeClass + let(:application) { Fabricate(:application, confidential: false) } + let(:scopes) { nil } + let(:access_token) { Fabricate(:accessible_access_token, resource_owner_id: user_account.user.id, application: application, scopes: scopes) } + + let(:user_account) { Fabricate(:account, username: 'alice', domain: nil) } + let(:bob_account) { Fabricate(:account, username: 'bob') } + + after do + streaming_client.close + end + + context 'when the access token has read scope' do + let(:scopes) { 'read' } + + it 'can subscribing to the public:local channel' do + streaming_client.authenticate(access_token.token) + + streaming_client.connect + streaming_client.subscribe('public:local') + + # We need to publish a status as there is no positive acknowledgement of + # subscriptions: + status = PostStatusService.new.call(bob_account, text: 'Hello @alice') + + # And then we want to receive that status: + message = streaming_client.wait_for_message + + expect(message).to include( + stream: be_an(Array).and(contain_exactly('public:local')), + event: 'update', + payload: include( + id: status.id.to_s + ) + ) + end + end + + context 'when the access token cannot read notifications' do + let(:scopes) { 'read:statuses' } + + it 'cannot subscribing to the user:notifications channel' do + streaming_client.authenticate(access_token.token) + + streaming_client.connect + streaming_client.subscribe('user:notification') + + # We should receive an error back immediately: + message = streaming_client.wait_for_message + + expect(message).to include( + error: 'Access token does not have the required scopes', + status: 401 + ) + end + end +end diff --git a/spec/support/streaming_client.rb b/spec/support/streaming_client.rb index b32acce44c1..cc0951b8042 100644 --- a/spec/support/streaming_client.rb +++ b/spec/support/streaming_client.rb @@ -32,7 +32,7 @@ class StreamingClient @dead = false @events_queue = Thread::Queue.new - @messages = Thread::Queue.new + @messages = [] @last_error = nil end @@ -68,8 +68,8 @@ class StreamingClient end @driver.on(:message) do |event| - @events_queue.enq({ event: :message, payload: event }) - @messages << event + @events_queue.enq({ event: :message, payload: event.data }) + @messages << event.data end @driver.on(:error) do |event| @@ -85,6 +85,14 @@ class StreamingClient @thread = Thread.new do @driver.parse(@tcp.read(1)) until @dead || @tcp.closed? + rescue Errno::ECONNRESET + # Create a synthetic close event: + close_event = WebSocket::Driver::CloseEvent.new( + WebSocket::Driver::Hybi::ERRORS[:unexpected_condition], + 'Connection reset' + ) + + finalize(close_event) end @driver @@ -109,6 +117,8 @@ class StreamingClient def write(data) @tcp.write(data) + rescue Errno::EPIPE => e + logger&.debug("EPIPE: #{e}") end def finalize(event) @@ -137,7 +147,7 @@ class StreamingClient @logger.debug! end - def authenticate(access_token, authentication_method) + def authenticate(access_token, authentication_method = StreamingClient::AUTHENTICATION::SUBPROTOCOL) raise 'access_token passed to StreamingClient was not a string' unless access_token.is_a?(String) raise 'invalid authentication method' unless AUTHENTICATION.supported?(authentication_method) @@ -156,10 +166,22 @@ class StreamingClient @connection.wait_for_event(:opened) end + def subscribe(channel, **params) + send(Oj.dump({ type: 'subscribe', stream: channel }.merge(params))) + end + def wait_for(event = nil) @connection.wait_for_event(event) end + def wait_for_message + message = @connection.wait_for_event(:message) + event = Oj.load(message) + event['payload'] = Oj.load(event['payload']) if event['payload'] + + event.deep_symbolize_keys + end + def status_code @connection.driver.status end @@ -168,6 +190,10 @@ class StreamingClient @connection.driver.state end + def messages + @connection.messages + end + def open? state == :open end diff --git a/spec/support/streaming_server_manager.rb b/spec/support/streaming_server_manager.rb index ca8f8d59c93..52e86a4286c 100644 --- a/spec/support/streaming_server_manager.rb +++ b/spec/support/streaming_server_manager.rb @@ -12,6 +12,11 @@ class StreamingServerManager queue = Queue.new + if ENV['DEBUG_STREAMING_SERVER'].present? + logger = Logger.new($stdout) + logger.level = 'debug' + end + @queue = queue @running_thread = Thread.new do @@ -31,7 +36,7 @@ class StreamingServerManager # Spawn a thread to listen on streaming server output output_thread = Thread.new do stdout_err.each_line do |line| - Rails.logger.info "Streaming server: #{line}" + logger&.info "Streaming server: #{line}" if status == :starting && line.match('Streaming API now listening on') status = :started