Add integration tests for mastodon-streaming (#36025)

Co-authored-by: Claire <claire.github-309c@sitedethib.com>
Co-authored-by: David Roetzel <david@roetzel.de>
This commit is contained in:
Emelia Smith 2025-09-30 09:27:09 +02:00 committed by GitHub
parent 150f0fcba5
commit 5b97f25a15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 358 additions and 4 deletions

View File

@ -160,6 +160,9 @@ group :test do
# Stub web requests for specs
gem 'webmock', '~> 3.18'
# Websocket driver for testing integration between rails/sidekiq and streaming
gem 'websocket-driver', '~> 0.8', require: false
end
group :development do

View File

@ -1102,6 +1102,7 @@ DEPENDENCIES
webauthn (~> 3.0)
webmock (~> 3.18)
webpush!
websocket-driver (~> 0.8)
xorcist (~> 1.1)
RUBY VERSION

View File

@ -30,7 +30,8 @@ end
# This needs to be defined before Rails is initialized
STREAMING_PORT = ENV.fetch('TEST_STREAMING_PORT', '4020')
ENV['STREAMING_API_BASE_URL'] = "http://localhost:#{STREAMING_PORT}"
STREAMING_HOST = ENV.fetch('TEST_STREAMING_HOST', 'localhost')
ENV['STREAMING_API_BASE_URL'] = "http://#{STREAMING_HOST}:#{STREAMING_PORT}"
require_relative '../config/environment'

View File

@ -0,0 +1,205 @@
# frozen_string_literal: true
require 'websocket/driver'
class StreamingClient
module AUTHENTICATION
SUBPROTOCOL = 1
AUTHORIZATION_HEADER = 2
QUERY_PARAMETER = 3
end
class Connection
attr_reader :url, :messages, :last_error
attr_accessor :logger, :protocols
def initialize(url)
@uri = URI.parse(url)
@query_params = @uri.query.present? ? URI.decode_www_form(@uri.query).to_h : {}
@protocols = nil
@headers = {}
@dead = false
@events_queue = Thread::Queue.new
@messages = []
@last_error = nil
end
def set_header(key, value)
@headers[key] = value
end
def set_query_param(key, value)
@query_params[key] = value
end
def driver
return @driver if defined?(@driver)
@uri.query = URI.encode_www_form(@query_params)
@url = @uri.to_s
@tcp = TCPSocket.new(@uri.host, @uri.port)
@driver = WebSocket::Driver.client(self, {
protocols: @protocols,
})
@headers.each_pair do |key, value|
@driver.set_header(key, value)
end
at_exit do
@driver.close
end
@driver.on(:open) do
@events_queue.enq({ event: :opened })
end
@driver.on(:message) do |event|
@events_queue.enq({ event: :message, payload: event.data })
@messages << event.data
end
@driver.on(:error) do |event|
logger&.debug(event.message)
@events_queue.enq({ event: :error, payload: event })
@last_error = event
end
@driver.on(:close) do |event|
@events_queue.enq({ event: :closing, payload: event })
finalize(event)
end
@thread = Thread.new do
@driver.parse(@tcp.read(1)) until @dead || @tcp.closed?
rescue Errno::ECONNRESET
# Create a synthetic close event:
close_event = WebSocket::Driver::CloseEvent.new(
WebSocket::Driver::Hybi::ERRORS[:unexpected_condition],
'Connection reset'
)
finalize(close_event)
end
@driver
end
def wait_for_event(expected_event, timeout: 10)
Timeout.timeout(timeout) do
loop do
event = dequeue_event
return nil if event.nil? && @events_queue.closed?
return event[:payload] unless event.nil? || event[:event] != expected_event
end
end
end
def write(data)
@tcp.write(data)
rescue Errno::EPIPE => e
logger&.debug("EPIPE: #{e}")
end
def finalize(event)
@dead = true
@events_queue.enq({ event: :closed, payload: event })
@events_queue.close
@thread.kill
end
def dequeue_event
event = @events_queue.pop
logger&.debug(event) unless event.nil?
event
end
end
def initialize
@logger = Logger.new($stdout)
@logger.level = 'info'
@connection = Connection.new("ws://#{STREAMING_HOST}:#{STREAMING_PORT}/api/v1/streaming")
@connection.logger = @logger
end
def debug!
@logger.debug!
end
def authenticate(access_token, authentication_method = StreamingClient::AUTHENTICATION::SUBPROTOCOL)
raise 'Invalid access_token passed to StreamingClient, expected a string' unless access_token.is_a?(String)
case authentication_method
when AUTHENTICATION::QUERY_PARAMETER
@connection.set_query_param('access_token', access_token)
when AUTHENTICATION::SUBPROTOCOL
@connection.protocols = access_token
when AUTHENTICATION::AUTHORIZATION_HEADER
@connection.set_header('Authorization', "Bearer #{access_token}")
else
raise 'Invalid authentication method'
end
end
def connect
@connection.driver.start
@connection.wait_for_event(:opened)
end
def subscribe(channel, **params)
send(Oj.dump({ type: 'subscribe', stream: channel }.merge(params)))
end
def wait_for(event = nil)
@connection.wait_for_event(event)
end
def wait_for_message
message = @connection.wait_for_event(:message)
event = Oj.load(message)
event['payload'] = Oj.load(event['payload']) if event['payload']
event.deep_symbolize_keys
end
delegate :status, :state, to: :'@connection.driver'
delegate :messages, to: :@connection
def open?
state == :open
end
def closing?
state == :closing
end
def closed?
state == :closed
end
def send(message)
@connection.driver.text(message) if open?
end
def close
return if closed?
@connection.driver.close unless closing?
@connection.wait_for_event(:closed)
end
end
module StreamingClientHelper
def streaming_client
@streaming_client ||= StreamingClient.new
end
end
RSpec.configure do |config|
config.include StreamingClientHelper, :streaming
end

View File

@ -12,6 +12,11 @@ class StreamingServerManager
queue = Queue.new
if ENV['DEBUG_STREAMING_SERVER'].present?
logger = Logger.new($stdout)
logger.level = 'debug'
end
@queue = queue
@running_thread = Thread.new do
@ -31,7 +36,7 @@ class StreamingServerManager
# Spawn a thread to listen on streaming server output
output_thread = Thread.new do
stdout_err.each_line do |line|
Rails.logger.info "Streaming server: #{line}"
logger&.info "Streaming server: #{line}"
if status == :starting && line.match('Streaming API now listening on')
status = :started
@ -115,12 +120,12 @@ RSpec.configure do |config|
self.use_transactional_tests = true
end
private
def streaming_server_manager
@streaming_server_manager ||= StreamingServerManager.new
end
private
def streaming_examples_present?
RSpec.world.filtered_examples.values.flatten.any? { |example| example.metadata[:streaming] == true }
end

View File

@ -0,0 +1,62 @@
# frozen_string_literal: true
require 'rails_helper'
require 'debug'
RSpec.describe 'Channel Subscriptions', :inline_jobs, :streaming do
let(:application) { Fabricate(:application, confidential: false) }
let(:scopes) { nil }
let(:access_token) { Fabricate(:accessible_access_token, resource_owner_id: user_account.user.id, application: application, scopes: scopes) }
let(:user_account) { Fabricate(:account, username: 'alice', domain: nil) }
let(:bob_account) { Fabricate(:account, username: 'bob') }
after do
streaming_client.close
end
context 'when the access token has read scope' do
let(:scopes) { 'read' }
it 'can subscribing to the public:local channel' do
streaming_client.authenticate(access_token.token)
streaming_client.connect
streaming_client.subscribe('public:local')
# We need to publish a status as there is no positive acknowledgement of
# subscriptions:
status = PostStatusService.new.call(bob_account, text: 'Hello @alice')
# And then we want to receive that status:
message = streaming_client.wait_for_message
expect(message).to include(
stream: be_an(Array).and(contain_exactly('public:local')),
event: 'update',
payload: include(
id: status.id.to_s
)
)
end
end
context 'when the access token cannot read notifications' do
let(:scopes) { 'read:statuses' }
it 'cannot subscribing to the user:notifications channel' do
streaming_client.authenticate(access_token.token)
streaming_client.connect
streaming_client.subscribe('user:notification')
# We should receive an error back immediately:
message = streaming_client.wait_for_message
expect(message).to include(
error: 'Access token does not have the required scopes',
status: 401
)
end
end
end

View File

@ -0,0 +1,77 @@
# frozen_string_literal: true
require 'rails_helper'
RSpec.describe 'Streaming', :inline_jobs, :streaming do
let(:authentication_method) { StreamingClient::AUTHENTICATION::SUBPROTOCOL }
let(:user) { Fabricate(:user) }
let(:scopes) { '' }
let(:application) { Fabricate(:application, confidential: false) }
let(:token) { Fabricate(:accessible_access_token, resource_owner_id: user.id, application: application, scopes: scopes) }
let(:access_token) { token.token }
before do
streaming_client.authenticate(access_token, authentication_method)
end
after do
streaming_client.close
end
context 'when authenticating via subprotocol' do
it 'is able to connect' do
streaming_client.connect
expect(streaming_client.status).to eq(101)
expect(streaming_client.open?).to be(true)
end
end
context 'when authenticating via authorization header' do
let(:authentication_method) { StreamingClient::AUTHENTICATION::AUTHORIZATION_HEADER }
it 'is able to connect successfully' do
streaming_client.connect
expect(streaming_client.status).to eq(101)
expect(streaming_client.open?).to be(true)
end
end
context 'when authenticating via query parameter' do
let(:authentication_method) { StreamingClient::AUTHENTICATION::QUERY_PARAMETER }
it 'is able to connect successfully' do
streaming_client.connect
expect(streaming_client.status).to eq(101)
expect(streaming_client.open?).to be(true)
end
end
context 'with a revoked access token' do
before do
token.revoke
end
it 'receives an 401 unauthorized error' do
streaming_client.connect
expect(streaming_client.status).to eq(401)
expect(streaming_client.open?).to be(false)
end
end
context 'when revoking an access token after connection' do
it 'disconnects the client' do
streaming_client.connect
expect(streaming_client.status).to eq(101)
expect(streaming_client.open?).to be(true)
token.revoke
expect(streaming_client.wait_for(:closed).code).to be(1000)
expect(streaming_client.open?).to be(false)
end
end
end