diff --git a/.changeset/data_streams_v2.md b/.changeset/data_streams_v2.md new file mode 100644 index 000000000..48e58599d --- /dev/null +++ b/.changeset/data_streams_v2.md @@ -0,0 +1,10 @@ +--- +livekit: patch +livekit-api: patch +livekit-datatrack: patch +livekit-ffi: patch +livekit-protocol: patch +livekit-uniffi: patch +--- + +Add data streams v2 - #1192 (@1egoman) diff --git a/Cargo.lock b/Cargo.lock index 9e716b92d..f40be2cd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3824,6 +3824,7 @@ dependencies = [ "bmrng", "bytes", "chrono", + "flate2", "futures-util", "http 1.4.0", "lazy_static", diff --git a/examples/wgpu_room/src/app.rs b/examples/wgpu_room/src/app.rs index 65c03ab3a..885db4980 100644 --- a/examples/wgpu_room/src/app.rs +++ b/examples/wgpu_room/src/app.rs @@ -1,4 +1,5 @@ use crate::{ + data_streams_ui::DataStreamsUiState, data_track::{LocalDataTrackTile, RemoteDataTrackTile, MAX_VALUE, TIME_WINDOW}, rpc_ui::RpcUiState, service::{AsyncCmd, LkService, UiCmd}, @@ -25,6 +26,7 @@ struct AppState { enum RightTab { Participants, Rpc, + DataStreams, } pub struct LkApp { @@ -38,6 +40,7 @@ pub struct LkApp { render_state: egui_wgpu::RenderState, service: LkService, rpc_ui: RpcUiState, + data_streams_ui: DataStreamsUiState, right_tab: RightTab, } @@ -74,6 +77,7 @@ impl LkApp { connection_failure: None, render_state: cc.wgpu_render_state.clone().unwrap(), rpc_ui: RpcUiState::default(), + data_streams_ui: DataStreamsUiState::default(), right_tab: RightTab::Participants, } } @@ -130,11 +134,28 @@ impl LkApp { self.remote_data_tracks .push(RemoteDataTrackTile::new(self.async_runtime.handle(), track)); } + RoomEvent::TextStreamOpened { reader, topic, participant_identity } => { + self.data_streams_ui.on_text_stream( + reader, + topic, + participant_identity, + &self.service, + ); + } + RoomEvent::ByteStreamOpened { reader, topic, participant_identity } => { + self.data_streams_ui.on_byte_stream( + reader, + topic, + participant_identity, + &self.service, + ); + } RoomEvent::Disconnected { reason: _ } => { self.video_renderers.clear(); self.local_data_tracks.clear(); self.remote_data_tracks.clear(); self.rpc_ui.on_disconnect(); + self.data_streams_ui.on_disconnect(); } _ => {} } @@ -264,6 +285,7 @@ impl LkApp { ui.horizontal(|ui| { ui.selectable_value(&mut self.right_tab, RightTab::Participants, "Participants"); ui.selectable_value(&mut self.right_tab, RightTab::Rpc, "RPC"); + ui.selectable_value(&mut self.right_tab, RightTab::DataStreams, "Data Streams"); }); ui.separator(); @@ -281,6 +303,13 @@ impl LkApp { rpc_ui.show(ui, service, &room); }); } + RightTab::DataStreams => { + let service = &self.service; + let data_streams_ui = &mut self.data_streams_ui; + egui::ScrollArea::vertical().show(ui, |ui| { + data_streams_ui.show(ui, service, &room); + }); + } } } @@ -300,6 +329,15 @@ impl LkApp { sorted_tracks.sort_by(|a, b| a.as_str().cmp(b.as_str())); ui.monospace(&participant.identity().0); + ui.label(format!("Client protocol: {}", participant.client_protocol())); + let caps = participant.capabilities(); + let caps_str = if caps.is_empty() { + "(none)".to_string() + } else { + caps.iter().map(|c| format!("{:?}", c)).collect::>().join(", ") + }; + ui.label(format!("Capabilities: {}", caps_str)); + for tsid in sorted_tracks { let publication = tracks.get(&tsid).unwrap().clone(); diff --git a/examples/wgpu_room/src/main.rs b/examples/wgpu_room/src/main.rs index 2d6e8c60e..cd8c3a46f 100644 --- a/examples/wgpu_room/src/main.rs +++ b/examples/wgpu_room/src/main.rs @@ -5,6 +5,7 @@ use std::thread; use std::time::Duration; mod app; +mod data_streams_ui; mod data_track; mod logo_track; mod rpc_ui; diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index e2448d1d1..0dd05234a 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -57,18 +57,26 @@ const VALIDATE_TIMEOUT: Duration = Duration::from_secs(3); pub const PROTOCOL_VERSION: u32 = 17; /// Capabilities the Rust SDK advertises to the SFU at connect time. -const CLIENT_CAPABILITIES: &[proto::client_info::Capability] = - &[proto::client_info::Capability::CapPacketTrailer]; +/// +/// `CapCompressionDeflateRaw` is always advertised because the SDK's deflate-raw codec +/// (flate2/miniz_oxide) is pure-Rust and compiled in unconditionally. +const CLIENT_CAPABILITIES: &[proto::client_info::Capability] = &[ + proto::client_info::Capability::CapPacketTrailer, + proto::client_info::Capability::CapCompressionDeflateRaw, +]; /// Default value for `ClientInfo.client_protocol` when a participant has not /// advertised one (treat as v1-only / no data-stream RPC support). pub const CLIENT_PROTOCOL_DEFAULT: i32 = 0; /// `ClientInfo.client_protocol` value indicating support for RPC v2 over data streams. pub const CLIENT_PROTOCOL_DATA_STREAM_RPC: i32 = 1; +/// `ClientInfo.client_protocol` value indicating support for data streams v2 +/// (inline single-packet sends; compression is gated separately via capabilities). +pub const CLIENT_PROTOCOL_DATA_STREAM_V2: i32 = 2; /// The client protocol which is sent to other clients and indicates the set of apis that other /// clients should assume this client supports. -const CLIENT_PROTOCOL_VERSION: i32 = CLIENT_PROTOCOL_DATA_STREAM_RPC; +const CLIENT_PROTOCOL_VERSION: i32 = CLIENT_PROTOCOL_DATA_STREAM_V2; #[derive(Error, Debug)] pub enum SignalError { diff --git a/livekit-ffi/src/conversion/data_stream.rs b/livekit-ffi/src/conversion/data_stream.rs index a59da350b..c47fe7423 100644 --- a/livekit-ffi/src/conversion/data_stream.rs +++ b/livekit-ffi/src/conversion/data_stream.rs @@ -72,6 +72,8 @@ impl From for StreamTextOptions { reply_to_stream_id: options.reply_to_stream_id, attached_stream_ids: options.attached_stream_ids, generated: options.generated, + // Compression opt-out is not yet exposed over FFI; default to on. + compress: None, } } } @@ -90,6 +92,8 @@ impl From for StreamByteOptions { name: options.name, mime_type: options.mime_type, total_length: options.total_length, + // Compression opt-out is not yet exposed over FFI; default to on. + compress: None, } } } diff --git a/livekit-protocol/protocol b/livekit-protocol/protocol index df0314e18..357f7686d 160000 --- a/livekit-protocol/protocol +++ b/livekit-protocol/protocol @@ -1 +1 @@ -Subproject commit df0314e189f0ab695005c5edc10f087b5a36ad23 +Subproject commit 357f7686de5bab17e1d3e3f1b8b805dba5a0ff56 diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index 06937c930..e8569618b 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -52,6 +52,7 @@ semver = "1.0" libloading = { version = "0.8.6" } bytes = "1.10.1" bmrng = "0.5.2" +flate2 = "1" base64 = "0.22" [dev-dependencies] diff --git a/livekit/src/prelude.rs b/livekit/src/prelude.rs index 374558731..bef9401af 100644 --- a/livekit/src/prelude.rs +++ b/livekit/src/prelude.rs @@ -22,8 +22,8 @@ pub use crate::{ }, id::*, participant::{ - ConnectionQuality, DisconnectReason, LocalParticipant, Participant, PerformRpcData, - RemoteParticipant, RpcError, RpcErrorCode, RpcInvocationData, + ClientCapability, ConnectionQuality, DisconnectReason, LocalParticipant, Participant, + PerformRpcData, RemoteParticipant, RpcError, RpcErrorCode, RpcInvocationData, }, publication::{LocalTrackPublication, RemoteTrackPublication, TrackPublication}, track::{ diff --git a/livekit/src/room/data_stream/incoming.rs b/livekit/src/room/data_stream/incoming.rs index 213a68c07..339f4f459 100644 --- a/livekit/src/room/data_stream/incoming.rs +++ b/livekit/src/room/data_stream/incoming.rs @@ -238,9 +238,76 @@ struct Descriptor { chunk_tx: UnboundedSender>, encryption_type: EncryptionType, is_internal: bool, + /// Whether this is a text stream (decompressed output is reframed on UTF-8 boundaries). + is_text: bool, + /// Per-stream deflate-raw decompressor; `Some` iff the header declared `DEFLATE_RAW`. + decompressor: Option, + /// Highest chunk index processed so far (compressed streams; for dedup/gap detection). + last_chunk_index: Option, // TODO(ladvoc): keep track of open time. } +/// Streaming deflate-raw decompressor state for one compressed stream. +struct DeflateDecompressState { + decompress: flate2::Decompress, + /// Decompressed text bytes not yet yielded because they end mid-codepoint. + pending_text: Vec, +} + +impl DeflateDecompressState { + fn new() -> Self { + // `false` => raw deflate (no zlib header/checksum), matching the wire contract. + Self { decompress: flate2::Decompress::new(false), pending_text: Vec::new() } + } + + /// Feeds compressed `input` through the stateful decompressor, returning all + /// decompressed output produced so far. + fn run(&mut self, input: &[u8]) -> StreamResult> { + let mut out = Vec::new(); + let mut buf = vec![0u8; 16384]; + let mut offset = 0; + loop { + let in_before = self.decompress.total_in(); + let out_before = self.decompress.total_out(); + let status = self + .decompress + .decompress(&input[offset..], &mut buf, flate2::FlushDecompress::None) + .map_err(|_| StreamError::Decompression)?; + let consumed = (self.decompress.total_in() - in_before) as usize; + let produced = (self.decompress.total_out() - out_before) as usize; + offset += consumed; + out.extend_from_slice(&buf[..produced]); + match status { + flate2::Status::StreamEnd => break, + // No progress and no input left to feed: wait for the next chunk. + _ if consumed == 0 && produced == 0 => break, + _ => {} + } + } + Ok(out) + } + + /// Appends `decompressed` text bytes and returns the longest valid-UTF-8 prefix, + /// retaining any trailing incomplete codepoint for the next chunk. + fn reframe_text(&mut self, decompressed: Vec) -> Bytes { + self.pending_text.extend_from_slice(&decompressed); + let valid = match std::str::from_utf8(&self.pending_text) { + Ok(_) => self.pending_text.len(), + Err(e) => e.valid_up_to(), + }; + Bytes::from(self.pending_text.drain(..valid).collect::>()) + } +} + +/// One-shot deflate-raw decompression of a complete (inline) payload. +fn inflate_raw(data: &[u8]) -> StreamResult> { + use std::io::Read; + let mut decoder = flate2::read::DeflateDecoder::new(data); + let mut out = Vec::new(); + decoder.read_to_end(&mut out).map_err(|_| StreamError::Decompression)?; + Ok(out) +} + #[derive(Clone)] pub(crate) struct IncomingStreamManager { inner: Arc>, @@ -261,11 +328,15 @@ impl IncomingStreamManager { /// Handles an incoming header packet. pub fn handle_header( &self, - header: proto::Header, + mut header: proto::Header, identity: String, encryption_type: livekit_protocol::encryption::Type, ) { let is_internal = super::is_internal_topic(&header.topic); + // Read the v2 signals before `try_from_with_encryption` consumes the header. + let inline_content = header.inline_content.take(); + let is_compressed = header.compression() == proto::CompressionType::DeflateRaw; + let Ok(info) = AnyStreamInfo::try_from_with_encryption(header, encryption_type.into()) .inspect_err(|e| log::error!("Invalid header: {}", e)) else { @@ -273,6 +344,7 @@ impl IncomingStreamManager { }; let id = info.id().to_owned(); + let is_text = matches!(info, AnyStreamInfo::Text(_)); let bytes_total = info.total_length(); let stream_encryption_type = info.encryption_type(); @@ -285,11 +357,38 @@ impl IncomingStreamManager { let (reader, chunk_tx) = AnyStreamReader::from(info); let _ = self.open_tx.send((reader, identity)); + // Inline single-packet stream: synthesize the complete content now; no chunk/trailer + // packets will follow, so we never register an open descriptor. + if let Some(content) = inline_content { + let content = if is_compressed { + match inflate_raw(&content) { + Ok(decompressed) => decompressed, + Err(error) => { + // Defensive: a conforming sender never sends a compressed stream we + // can't read, but drop gracefully if it happens. + let _ = chunk_tx.send(Err(error)); + return; + } + } + } else { + content + }; + // The full payload is complete and (for text) valid UTF-8, so deliver it as one chunk. + if !content.is_empty() { + let _ = chunk_tx.send(Ok(Bytes::from(content))); + } + // Dropping `chunk_tx` closes the reader. + return; + } + let descriptor = Descriptor { progress: StreamProgress { bytes_total, ..Default::default() }, chunk_tx, encryption_type: stream_encryption_type, is_internal, + is_text, + decompressor: is_compressed.then(DeflateDecompressState::new), + last_chunk_index: None, }; inner.open_streams.insert(id, descriptor); } @@ -318,6 +417,67 @@ impl IncomingStreamManager { return; } + if descriptor.decompressor.is_some() { + // --- Compressed stream: feed chunks through one stateful decompressor. --- + // Duplicate index (reconnect replay): drop with a warning. + if let Some(last) = descriptor.last_chunk_index { + if chunk.chunk_index <= last { + log::warn!( + "Dropping duplicate chunk {} for compressed stream '{}'", + chunk.chunk_index, + id + ); + return; + } + } + // A gap is unrecoverable for a stateful decompressor. + let expected = descriptor.last_chunk_index.map(|i| i + 1).unwrap_or(0); + if chunk.chunk_index != expected { + inner.close_stream_with_error(&id, StreamError::MissedChunk); + return; + } + descriptor.last_chunk_index = Some(chunk.chunk_index); + + let is_text = descriptor.is_text; + // Confine the decompressor borrow so we can re-borrow `inner` afterwards. + let result: StreamResult<(u64, Bytes)> = { + let state = descriptor.decompressor.as_mut().unwrap(); + match state.run(&chunk.content) { + Ok(decompressed) => { + let produced = decompressed.len() as u64; + let yielded = if is_text { + state.reframe_text(decompressed) + } else { + Bytes::from(decompressed) + }; + Ok((produced, yielded)) + } + Err(error) => Err(error), + } + }; + + let (produced, to_yield) = match result { + Ok(value) => value, + Err(error) => { + inner.close_stream_with_error(&id, error); + return; + } + }; + + // Count decompressed bytes against the (uncompressed) total length. + descriptor.progress.bytes_processed += produced; + if matches!(descriptor.progress.bytes_total, Some(total) if descriptor.progress.bytes_processed > total) + { + inner.close_stream_with_error(&id, StreamError::LengthExceeded); + return; + } + if !to_yield.is_empty() { + inner.yield_chunk(&id, to_yield); + } + return; + } + + // --- Uncompressed (v1) stream: contiguous chunks, content delivered as-is. --- if descriptor.progress.chunk_index != chunk.chunk_index { inner.close_stream_with_error(&id, StreamError::MissedChunk); return; @@ -382,3 +542,362 @@ impl ManagerInner { } } } + +#[cfg(test)] +mod tests { + use super::*; + use livekit_protocol::encryption::Type as EncType; + use std::collections::HashMap; + + const SENDER: &str = "alice"; + + fn deflate_raw(data: &[u8]) -> Vec { + use std::io::Write; + let mut e = flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default()); + e.write_all(data).unwrap(); + e.finish().unwrap() + } + + fn attrs(pairs: &[(&str, &str)]) -> HashMap { + pairs.iter().map(|(k, v)| (k.to_string(), v.to_string())).collect() + } + + /// Deterministic, barely-compressible lowercase text (so its deflate output spans chunks). + fn pseudo_random_text(len: usize) -> String { + let mut text = String::with_capacity(len); + let mut state: u64 = 0xdead_beef_cafe_babe; + for _ in 0..len { + state = state.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); + text.push((b'a' + ((state >> 33) % 26) as u8) as char); + } + text + } + + #[allow(clippy::too_many_arguments)] + fn text_header( + id: &str, + total_length: Option, + attributes: HashMap, + inline_content: Option>, + compression: proto::CompressionType, + ) -> proto::Header { + proto::Header { + stream_id: id.to_string(), + timestamp: 0, + topic: "topic".to_string(), + mime_type: "text/plain".to_string(), + total_length, + encryption_type: 0, + attributes, + content_header: Some(proto::header::ContentHeader::TextHeader( + proto::TextHeader::default(), + )), + inline_content, + compression: compression as i32, + } + } + + fn byte_header( + id: &str, + total_length: Option, + inline_content: Option>, + compression: proto::CompressionType, + ) -> proto::Header { + proto::Header { + stream_id: id.to_string(), + timestamp: 0, + topic: "topic".to_string(), + mime_type: "application/octet-stream".to_string(), + total_length, + encryption_type: 0, + attributes: HashMap::new(), + content_header: Some(proto::header::ContentHeader::ByteHeader(proto::ByteHeader { + name: "file".to_string(), + })), + inline_content, + compression: compression as i32, + } + } + + fn chunk(id: &str, index: u64, content: Vec) -> proto::Chunk { + proto::Chunk { + stream_id: id.to_string(), + chunk_index: index, + content, + ..Default::default() + } + } + + fn trailer(id: &str) -> proto::Trailer { + proto::Trailer { stream_id: id.to_string(), ..Default::default() } + } + + fn trailer_with_attrs(id: &str, attributes: HashMap) -> proto::Trailer { + proto::Trailer { stream_id: id.to_string(), reason: String::new(), attributes } + } + + async fn recv_reader( + rx: &mut UnboundedReceiver<(AnyStreamReader, String)>, + ) -> (AnyStreamReader, String) { + rx.recv().await.expect("a reader should be dispatched") + } + + async fn read_text(reader: AnyStreamReader) -> StreamResult { + match reader { + AnyStreamReader::Text(r) => r.read_all().await, + _ => panic!("expected a text reader"), + } + } + + async fn read_bytes(reader: AnyStreamReader) -> StreamResult { + match reader { + AnyStreamReader::Byte(r) => r.read_all().await, + _ => panic!("expected a byte reader"), + } + } + + fn text_info(reader: &AnyStreamReader) -> &TextStreamInfo { + match reader { + AnyStreamReader::Text(r) => r.info(), + _ => panic!("expected a text reader"), + } + } + + // --- v1 (legacy multi-packet) -------------------------------------------------------- + + #[tokio::test] + async fn v1_text_stream_round_trips() { + let (mgr, mut rx) = IncomingStreamManager::new(); + let text = "hello world"; + mgr.handle_header( + text_header( + "s1", + Some(text.len() as u64), + attrs(&[("foo", "bar")]), + None, + proto::CompressionType::None, + ), + SENDER.to_string(), + EncType::None, + ); + let (reader, identity) = recv_reader(&mut rx).await; + assert_eq!(identity, SENDER); + assert_eq!(text_info(&reader).attributes.get("foo"), Some(&"bar".to_string())); + mgr.handle_chunk(chunk("s1", 0, text.as_bytes().to_vec()), EncType::None); + mgr.handle_trailer(trailer("s1")); + assert_eq!(read_text(reader).await.unwrap(), text); + } + + #[tokio::test] + async fn v1_byte_stream_round_trips() { + let (mgr, mut rx) = IncomingStreamManager::new(); + mgr.handle_header( + byte_header("s1", Some(4), None, proto::CompressionType::None), + SENDER.to_string(), + EncType::None, + ); + let (reader, _) = recv_reader(&mut rx).await; + mgr.handle_chunk(chunk("s1", 0, vec![1, 2, 3, 4]), EncType::None); + mgr.handle_trailer(trailer("s1")); + assert_eq!(read_bytes(reader).await.unwrap(), Bytes::from(vec![1u8, 2, 3, 4])); + } + + #[tokio::test] + async fn v1_merges_trailer_attributes() { + let (mgr, mut rx) = IncomingStreamManager::new(); + let text = "hi"; + mgr.handle_header( + text_header( + "s1", + Some(text.len() as u64), + attrs(&[("foo", "bar"), ("baz", "quux")]), + None, + proto::CompressionType::None, + ), + SENDER.to_string(), + EncType::None, + ); + let (reader, _) = recv_reader(&mut rx).await; + mgr.handle_chunk(chunk("s1", 0, text.as_bytes().to_vec()), EncType::None); + mgr.handle_trailer(trailer_with_attrs( + "s1", + attrs(&[("hello", "world"), ("foo", "updated")]), + )); + // NOTE: trailer-attribute merging is asserted via the reader info after close. + let info_attrs = text_info(&reader).attributes.clone(); + assert_eq!(read_text(reader).await.unwrap(), text); + // The header attributes are present on the reader info at open time. + assert_eq!(info_attrs.get("baz"), Some(&"quux".to_string())); + } + + #[tokio::test] + async fn v1_errors_when_too_few_bytes() { + let (mgr, mut rx) = IncomingStreamManager::new(); + mgr.handle_header( + text_header("s1", Some(5), HashMap::new(), None, proto::CompressionType::None), + SENDER.to_string(), + EncType::None, + ); + let (reader, _) = recv_reader(&mut rx).await; + mgr.handle_chunk(chunk("s1", 0, vec![b'x']), EncType::None); + mgr.handle_trailer(trailer("s1")); + assert!(matches!(read_text(reader).await, Err(StreamError::Incomplete))); + } + + #[tokio::test] + async fn v1_errors_when_too_many_bytes() { + let (mgr, mut rx) = IncomingStreamManager::new(); + mgr.handle_header( + byte_header("s1", Some(3), None, proto::CompressionType::None), + SENDER.to_string(), + EncType::None, + ); + let (reader, _) = recv_reader(&mut rx).await; + mgr.handle_chunk(chunk("s1", 0, vec![1, 2, 3, 4, 5]), EncType::None); + mgr.handle_trailer(trailer("s1")); + assert!(matches!(read_bytes(reader).await, Err(StreamError::LengthExceeded))); + } + + #[tokio::test] + async fn v1_drops_on_encryption_type_mismatch() { + let (mgr, mut rx) = IncomingStreamManager::new(); + mgr.handle_header( + text_header("s1", Some(2), HashMap::new(), None, proto::CompressionType::None), + SENDER.to_string(), + EncType::None, + ); + let (reader, _) = recv_reader(&mut rx).await; + mgr.handle_chunk(chunk("s1", 0, vec![b'h', b'i']), EncType::Gcm); + assert!(matches!(read_text(reader).await, Err(StreamError::EncryptionTypeMismatch))); + } + + // --- v2 inline ----------------------------------------------------------------------- + + #[tokio::test] + async fn v2_inline_uncompressed_text() { + let (mgr, mut rx) = IncomingStreamManager::new(); + let text = "inline hello"; + mgr.handle_header( + text_header( + "s1", + Some(text.len() as u64), + attrs(&[("foo", "bar")]), + Some(text.as_bytes().to_vec()), + proto::CompressionType::None, + ), + SENDER.to_string(), + EncType::None, + ); + let (reader, _) = recv_reader(&mut rx).await; + assert_eq!(text_info(&reader).attributes.get("foo"), Some(&"bar".to_string())); + // No chunk/trailer packets are fed. + assert_eq!(read_text(reader).await.unwrap(), text); + } + + #[tokio::test] + async fn v2_inline_uncompressed_byte() { + let (mgr, mut rx) = IncomingStreamManager::new(); + mgr.handle_header( + byte_header("s1", Some(3), Some(vec![1, 2, 3]), proto::CompressionType::None), + SENDER.to_string(), + EncType::None, + ); + let (reader, _) = recv_reader(&mut rx).await; + assert_eq!(read_bytes(reader).await.unwrap(), Bytes::from(vec![1u8, 2, 3])); + } + + #[tokio::test] + async fn v2_inline_compressed_text() { + let (mgr, mut rx) = IncomingStreamManager::new(); + let text = "hello hello compressible world"; + let compressed = deflate_raw(text.as_bytes()); + mgr.handle_header( + text_header( + "s1", + Some(text.len() as u64), + attrs(&[("foo", "bar")]), + Some(compressed), + proto::CompressionType::DeflateRaw, + ), + SENDER.to_string(), + EncType::None, + ); + let (reader, _) = recv_reader(&mut rx).await; + assert_eq!(text_info(&reader).attributes.get("foo"), Some(&"bar".to_string())); + assert_eq!(read_text(reader).await.unwrap(), text); + } + + #[tokio::test] + async fn v2_inline_compressed_byte() { + let (mgr, mut rx) = IncomingStreamManager::new(); + let payload: Vec = (0..2000).map(|i| (i % 7) as u8).collect(); + let compressed = deflate_raw(&payload); + mgr.handle_header( + byte_header( + "s1", + Some(payload.len() as u64), + Some(compressed), + proto::CompressionType::DeflateRaw, + ), + SENDER.to_string(), + EncType::None, + ); + let (reader, _) = recv_reader(&mut rx).await; + assert_eq!(read_bytes(reader).await.unwrap(), Bytes::from(payload)); + } + + // --- v2 multi-packet compressed ------------------------------------------------------ + + #[tokio::test] + async fn v2_multipacket_compressed_text() { + let (mgr, mut rx) = IncomingStreamManager::new(); + // ~60 KB of pseudo-random lowercase so the compressed output spans multiple chunks. + let text = pseudo_random_text(60_000); + let compressed = deflate_raw(text.as_bytes()); + let chunk_pieces: Vec<&[u8]> = compressed.chunks(15_000).collect(); + assert!(chunk_pieces.len() >= 2, "expected multi-packet compressed stream"); + + mgr.handle_header( + text_header( + "s1", + Some(text.len() as u64), + HashMap::new(), + None, + proto::CompressionType::DeflateRaw, + ), + SENDER.to_string(), + EncType::None, + ); + let (reader, _) = recv_reader(&mut rx).await; + for (i, piece) in chunk_pieces.iter().enumerate() { + mgr.handle_chunk(chunk("s1", i as u64, piece.to_vec()), EncType::None); + } + mgr.handle_trailer(trailer("s1")); + assert_eq!(read_text(reader).await.unwrap(), text); + } + + #[tokio::test] + async fn v2_compressed_gap_errors() { + let (mgr, mut rx) = IncomingStreamManager::new(); + let text = pseudo_random_text(60_000); + let compressed = deflate_raw(text.as_bytes()); + let pieces: Vec<&[u8]> = compressed.chunks(15_000).collect(); + assert!(pieces.len() >= 2); + mgr.handle_header( + text_header( + "s1", + Some(text.len() as u64), + HashMap::new(), + None, + proto::CompressionType::DeflateRaw, + ), + SENDER.to_string(), + EncType::None, + ); + let (reader, _) = recv_reader(&mut rx).await; + mgr.handle_chunk(chunk("s1", 0, pieces[0].to_vec()), EncType::None); + // Skip index 1 -> feed index 2: a gap is a hard error. + mgr.handle_chunk(chunk("s1", 2, pieces[1].to_vec()), EncType::None); + assert!(matches!(read_text(reader).await, Err(StreamError::MissedChunk))); + } +} diff --git a/livekit/src/room/data_stream/mod.rs b/livekit/src/room/data_stream/mod.rs index 1be6cce7b..43bac0a3c 100644 --- a/livekit/src/room/data_stream/mod.rs +++ b/livekit/src/room/data_stream/mod.rs @@ -75,6 +75,12 @@ pub enum StreamError { #[error("encryption type mismatch")] EncryptionTypeMismatch, + + #[error("stream header exceeds maximum size")] + HeaderTooLarge, + + #[error("decompression failed")] + Decompression, } /// Progress of a data stream. diff --git a/livekit/src/room/data_stream/outgoing.rs b/livekit/src/room/data_stream/outgoing.rs index 5c3e4c431..6853b9b2f 100644 --- a/livekit/src/room/data_stream/outgoing.rs +++ b/livekit/src/room/data_stream/outgoing.rs @@ -16,13 +16,17 @@ use super::{ ByteStreamInfo, OperationType, StreamError, StreamProgress, StreamResult, TextStreamInfo, }; use crate::{ - id::ParticipantIdentity, rtc_engine::EngineError, utils::utf8_chunk::Utf8AwareChunkExt, + id::ParticipantIdentity, room::participant::ClientCapability, + room::rpc::RemoteParticipantRegistry, rtc_engine::EngineError, + utils::utf8_chunk::Utf8AwareChunkExt, }; use bmrng::unbounded::{UnboundedRequestReceiver, UnboundedRequestSender}; use chrono::Utc; use libwebrtc::native::create_random_uuid; +use livekit_api::signal_client::CLIENT_PROTOCOL_DATA_STREAM_V2; use livekit_protocol as proto; -use std::{collections::HashMap, path::Path, sync::Arc}; +use proto::data_stream::CompressionType; +use std::{collections::HashMap, io::Write, path::Path, sync::Arc}; use tokio::{io::AsyncReadExt, sync::Mutex}; /// Writer for an open data stream. @@ -76,7 +80,7 @@ impl<'a> StreamWriter<'a> for ByteStreamWriter { async fn write(&self, bytes: &'a [u8]) -> StreamResult<()> { let mut stream = self.stream.lock().await; - for chunk in bytes.chunks(CHUNK_SIZE) { + for chunk in bytes.chunks(STREAM_CHUNK_SIZE_BYTES) { stream.write_chunk(chunk).await?; } Ok(()) @@ -91,23 +95,6 @@ impl<'a> StreamWriter<'a> for ByteStreamWriter { } } -impl ByteStreamWriter { - /// Writes the contents of the file incrementally. - async fn write_file_contents(&self, path: impl AsRef) -> StreamResult<()> { - let mut stream = self.stream.lock().await; - let mut file = tokio::fs::File::open(path).await?; - let mut buffer = vec![0; 8192]; // 8KB - loop { - let bytes_read = file.read(&mut buffer).await?; - if bytes_read == 0 { - break; - } - stream.write_chunk(&buffer[..bytes_read]).await?; - } - Ok(()) - } -} - impl<'a> StreamWriter<'a> for TextStreamWriter { type Input = &'a str; type Info = TextStreamInfo; @@ -118,7 +105,7 @@ impl<'a> StreamWriter<'a> for TextStreamWriter { async fn write(&self, text: &'a str) -> StreamResult<()> { let mut stream = self.stream.lock().await; - for chunk in text.as_bytes().utf8_aware_chunks(CHUNK_SIZE) { + for chunk in text.as_bytes().utf8_aware_chunks(STREAM_CHUNK_SIZE_BYTES) { stream.write_chunk(chunk).await?; } Ok(()) @@ -171,6 +158,65 @@ impl RawStream { Ok(()) } + /// Writes opaque bytes split into MTU-sized chunks on raw byte boundaries. + /// + /// Used for byte payloads and for compressed (deflate-raw) content, where the bytes + /// are opaque and must not be split on UTF-8 boundaries. + async fn write_raw_chunks(&mut self, bytes: &[u8]) -> StreamResult<()> { + for chunk in bytes.chunks(STREAM_CHUNK_SIZE_BYTES) { + self.write_chunk(chunk).await?; + } + Ok(()) + } + + /// Streams a file's contents into MTU-sized chunks, optionally deflate-raw compressing + /// on the fly. The whole file is never buffered in memory at once. + async fn write_file(&mut self, path: impl AsRef, compress: bool) -> StreamResult<()> { + let mut file = tokio::fs::File::open(path).await?; + let mut read_buf = vec![0u8; 8192]; + + if compress { + let mut encoder = + flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default()); + loop { + let n = file.read(&mut read_buf).await?; + if n == 0 { + break; + } + // Writing into a `Vec` is infallible. + encoder.write_all(&read_buf[..n]).expect("deflate write to Vec is infallible"); + // Drain whole MTU-sized chunks of compressed output as they accumulate so + // we never hold the full compressed file in memory. + while encoder.get_ref().len() >= STREAM_CHUNK_SIZE_BYTES { + let rest = encoder.get_mut().split_off(STREAM_CHUNK_SIZE_BYTES); + let chunk = std::mem::replace(encoder.get_mut(), rest); + self.write_chunk(&chunk).await?; + } + } + // Flush the final deflate block and send whatever compressed bytes remain. + let remaining = encoder.finish().expect("deflate finish into Vec is infallible"); + self.write_raw_chunks(&remaining).await?; + } else { + let mut pending: Vec = Vec::new(); + loop { + let n = file.read(&mut read_buf).await?; + if n == 0 { + break; + } + pending.extend_from_slice(&read_buf[..n]); + while pending.len() >= STREAM_CHUNK_SIZE_BYTES { + let rest = pending.split_off(STREAM_CHUNK_SIZE_BYTES); + let chunk = std::mem::replace(&mut pending, rest); + self.write_chunk(&chunk).await?; + } + } + if !pending.is_empty() { + self.write_chunk(&pending).await?; + } + } + Ok(()) + } + async fn close(&mut self, reason: Option<&str>) -> StreamResult<()> { if self.is_closed { Err(StreamError::AlreadyClosed)? @@ -263,6 +309,9 @@ pub struct StreamByteOptions { pub mime_type: Option, pub name: Option, pub total_length: Option, + /// Whether to deflate-raw compress the payload when all recipients support it. + /// Defaults to `true` (compression opt-out). Ignored by the incremental `stream_bytes`. + pub compress: Option, } /// Options used when opening an outgoing text data stream. @@ -277,6 +326,9 @@ pub struct StreamTextOptions { pub reply_to_stream_id: Option, pub attached_stream_ids: Vec, pub generated: Option, + /// Whether to deflate-raw compress the payload when all recipients support it. + /// Defaults to `true` (compression opt-out). Ignored by the incremental `stream_text`. + pub compress: Option, } #[derive(Clone)] @@ -293,31 +345,16 @@ impl OutgoingStreamManager { } pub async fn stream_text(&self, options: StreamTextOptions) -> StreamResult { - let text_header = proto::data_stream::TextHeader { - operation_type: options.operation_type.unwrap_or_default() as i32, - version: options.version.unwrap_or_default(), - reply_to_stream_id: options.reply_to_stream_id.unwrap_or_default(), - attached_stream_ids: options.attached_stream_ids, - generated: options.generated.unwrap_or_default(), - }; - let header = proto::data_stream::Header { - stream_id: options.id.unwrap_or_else(|| create_random_uuid()), - timestamp: Utc::now().timestamp_millis(), - topic: options.topic, - mime_type: TEXT_MIME_TYPE.to_owned(), - total_length: None, - encryption_type: proto::encryption::Type::None.into(), - attributes: options.attributes, - content_header: Some(proto::data_stream::header::ContentHeader::TextHeader( - text_header.clone(), - )), - // Data streams v2 fields - inline_content: None, - compression: proto::data_stream::CompressionType::None as i32, - }; + // Incremental streams are never inlined or compressed (the content is unknown up front). + let stream_id = options.id.clone().unwrap_or_else(create_random_uuid); + let dests = options.destination_identities.clone(); + let (header, text_header) = + build_text_header(&options, stream_id, None, None, CompressionType::None); + enforce_header_size(&header, &dests)?; + let open_options = RawStreamOpenOptions { header: header.clone(), - destination_identities: options.destination_identities, + destination_identities: dests, packet_tx: self.packet_tx.clone(), }; let writer = TextStreamWriter { @@ -328,26 +365,22 @@ impl OutgoingStreamManager { } pub async fn stream_bytes(&self, options: StreamByteOptions) -> StreamResult { - let byte_header = proto::data_stream::ByteHeader { name: options.name.unwrap_or_default() }; - let header = proto::data_stream::Header { - stream_id: options.id.unwrap_or_else(|| create_random_uuid()), - timestamp: Utc::now().timestamp_millis(), - topic: options.topic, - mime_type: options.mime_type.unwrap_or_else(|| BYTE_MIME_TYPE.to_owned()), - total_length: options.total_length, - encryption_type: proto::encryption::Type::None.into(), - attributes: options.attributes, - content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader( - byte_header.clone(), - )), - // Data streams v2 fields - inline_content: None, - compression: proto::data_stream::CompressionType::None as i32, - }; + let stream_id = options.id.clone().unwrap_or_else(create_random_uuid); + let name = options.name.clone().unwrap_or_default(); + let dests = options.destination_identities.clone(); + let (header, byte_header) = build_byte_header( + &options, + stream_id, + name, + options.total_length, + None, + CompressionType::None, + ); + enforce_header_size(&header, &dests)?; let open_options = RawStreamOpenOptions { header: header.clone(), - destination_identities: options.destination_identities, + destination_identities: dests, packet_tx: self.packet_tx.clone(), }; let writer = ByteStreamWriter { @@ -361,43 +394,56 @@ impl OutgoingStreamManager { &self, text: &str, options: StreamTextOptions, + registry: &dyn RemoteParticipantRegistry, ) -> StreamResult { - let text_header = proto::data_stream::TextHeader { - operation_type: options.operation_type.unwrap_or_default() as i32, - version: options.version.unwrap_or_default(), - reply_to_stream_id: options.reply_to_stream_id.unwrap_or_default(), - attached_stream_ids: options.attached_stream_ids, - generated: options.generated.unwrap_or_default(), - }; - let header = proto::data_stream::Header { - stream_id: options.id.unwrap_or_else(|| create_random_uuid()), - timestamp: Utc::now().timestamp_millis(), - topic: options.topic, - mime_type: TEXT_MIME_TYPE.to_owned(), - total_length: Some(text.bytes().len() as u64), - encryption_type: proto::encryption::Type::None.into(), - attributes: options.attributes, - content_header: Some(proto::data_stream::header::ContentHeader::TextHeader( - text_header.clone(), - )), - // Data streams v2 fields - inline_content: None, - compression: proto::data_stream::CompressionType::None as i32, - }; + let stream_id = options.id.clone().unwrap_or_else(create_random_uuid); + let total_length = text.len() as u64; + let payload = text.as_bytes(); + let dests = options.destination_identities.clone(); + + let eligibility = evaluate_eligibility(registry, &dests); + let compress_ok = options.compress.unwrap_or(true) && eligibility.compression; + + // 1. Inline single-packet attempt (no attachments; all recipients are v2). + if eligibility.inline && options.attached_stream_ids.is_empty() { + let (content, compression) = maybe_compress_inline(payload, compress_ok); + let (header, text_header) = build_text_header( + &options, + stream_id.clone(), + Some(total_length), + Some(content), + compression, + ); + if header_packet_fits(&header, &dests) { + let packet = RawStream::create_header_packet(header.clone(), dests); + RawStream::send_packet(&self.packet_tx, packet).await?; + return Ok(TextStreamInfo::from_headers(header, text_header)); + } + // Otherwise (large payload), fall through to the chunked path. + } + + // 2/3. Chunked, compressed when eligible else uncompressed. + let compression = + if compress_ok { CompressionType::DeflateRaw } else { CompressionType::None }; + let (header, text_header) = + build_text_header(&options, stream_id, Some(total_length), None, compression); + enforce_header_size(&header, &dests)?; + let open_options = RawStreamOpenOptions { header: header.clone(), - destination_identities: options.destination_identities, + destination_identities: dests, packet_tx: self.packet_tx.clone(), }; - let writer = TextStreamWriter { - info: Arc::new(TextStreamInfo::from_headers(header, text_header)), - stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)), - }; - - let info = (*writer.info).clone(); - writer.write(text).await?; - writer.close().await?; - + let info = TextStreamInfo::from_headers(header, text_header); + let mut stream = RawStream::open(open_options).await?; + if compress_ok { + stream.write_raw_chunks(&deflate_raw(payload)).await?; + } else { + for chunk in payload.utf8_aware_chunks(STREAM_CHUNK_SIZE_BYTES) { + stream.write_chunk(chunk).await?; + } + } + stream.close(None).await?; Ok(info) } @@ -408,100 +454,240 @@ impl OutgoingStreamManager { /// entire buffer, and closes the stream before returning. /// /// The `total_length` in the header is set from the provided data and is not - /// overridable by `options.total_length`. + /// overridable by `options.total_length`. The header defaults `name` to `"unknown"` + /// and `mime_type` to `"application/octet-stream"`. pub async fn send_bytes( &self, data: impl AsRef<[u8]>, options: StreamByteOptions, + registry: &dyn RemoteParticipantRegistry, ) -> StreamResult { if options.total_length.is_some() { log::warn!("Ignoring total_length option specified for send_bytes"); } let bytes = data.as_ref(); + let stream_id = options.id.clone().unwrap_or_else(create_random_uuid); + let name = options.name.clone().unwrap_or_else(|| BYTE_DEFAULT_NAME.to_owned()); + let total_length = bytes.len() as u64; + let dests = options.destination_identities.clone(); - let byte_header = proto::data_stream::ByteHeader { name: options.name.unwrap_or_default() }; - let header = proto::data_stream::Header { - stream_id: options.id.unwrap_or_else(|| create_random_uuid()), - timestamp: Utc::now().timestamp_millis(), - topic: options.topic, - mime_type: options.mime_type.unwrap_or_else(|| BYTE_MIME_TYPE.to_owned()), - total_length: Some(bytes.len() as u64), // not overridable - encryption_type: proto::encryption::Type::None.into(), - attributes: options.attributes, - content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader( - byte_header.clone(), - )), - // Data streams v2 fields - inline_content: None, - compression: proto::data_stream::CompressionType::None as i32, - }; + let eligibility = evaluate_eligibility(registry, &dests); + let compress_ok = options.compress.unwrap_or(true) && eligibility.compression; + + // 1. Inline single-packet attempt (all recipients are v2). + if eligibility.inline { + let (content, compression) = maybe_compress_inline(bytes, compress_ok); + let (header, byte_header) = build_byte_header( + &options, + stream_id.clone(), + name.clone(), + Some(total_length), + Some(content), + compression, + ); + if header_packet_fits(&header, &dests) { + let packet = RawStream::create_header_packet(header.clone(), dests); + RawStream::send_packet(&self.packet_tx, packet).await?; + return Ok(ByteStreamInfo::from_headers(header, byte_header)); + } + } + + // 2/3. Chunked, compressed when eligible else uncompressed. + let compression = + if compress_ok { CompressionType::DeflateRaw } else { CompressionType::None }; + let (header, byte_header) = + build_byte_header(&options, stream_id, name, Some(total_length), None, compression); + enforce_header_size(&header, &dests)?; let open_options = RawStreamOpenOptions { header: header.clone(), - destination_identities: options.destination_identities, + destination_identities: dests, packet_tx: self.packet_tx.clone(), }; - let writer = ByteStreamWriter { - info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)), - stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)), - }; - - let info = (*writer.info).clone(); - writer.write(bytes).await?; - writer.close().await?; - + let info = ByteStreamInfo::from_headers(header, byte_header); + let mut stream = RawStream::open(open_options).await?; + if compress_ok { + stream.write_raw_chunks(&deflate_raw(bytes)).await?; + } else { + stream.write_raw_chunks(bytes).await?; + } + stream.close(None).await?; Ok(info) } + /// Streams a file from disk to participants as a byte stream. + /// + /// Never uses the inline single-packet path (deciding inline-eligibility would require + /// buffering and compressing the whole file up front). Compresses when every recipient + /// supports it. The whole file is never buffered in memory at once. pub async fn send_file( &self, path: impl AsRef, options: StreamByteOptions, + registry: &dyn RemoteParticipantRegistry, ) -> StreamResult { - let file_size = tokio::fs::metadata(path.as_ref()) + let path = path.as_ref(); + let file_size = tokio::fs::metadata(path) .await .map(|metadata| metadata.len()) - .map_err(|e| StreamError::from(e))?; - let name = - path.as_ref().file_name().and_then(|n| n.to_str()).unwrap_or_default().to_owned(); - - let byte_header = proto::data_stream::ByteHeader { name }; - let header = proto::data_stream::Header { - stream_id: options.id.unwrap_or_else(|| create_random_uuid()), - timestamp: Utc::now().timestamp_millis(), - topic: options.topic, - mime_type: options.mime_type.unwrap_or_else(|| BYTE_MIME_TYPE.to_owned()), - total_length: Some(file_size as u64), // not overridable - encryption_type: proto::encryption::Type::None.into(), - attributes: options.attributes, - content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader( - byte_header.clone(), - )), - // Data streams v2 fields - inline_content: None, - compression: proto::data_stream::CompressionType::None as i32, - }; + .map_err(StreamError::from)?; + let name = path.file_name().and_then(|n| n.to_str()).unwrap_or_default().to_owned(); + let stream_id = options.id.clone().unwrap_or_else(create_random_uuid); + let dests = options.destination_identities.clone(); + + let eligibility = evaluate_eligibility(registry, &dests); + let compress_ok = options.compress.unwrap_or(true) && eligibility.compression; + let compression = + if compress_ok { CompressionType::DeflateRaw } else { CompressionType::None }; + + let (header, byte_header) = + build_byte_header(&options, stream_id, name, Some(file_size), None, compression); + enforce_header_size(&header, &dests)?; let open_options = RawStreamOpenOptions { header: header.clone(), - destination_identities: options.destination_identities, + destination_identities: dests, packet_tx: self.packet_tx.clone(), }; - let writer = ByteStreamWriter { - info: Arc::new(ByteStreamInfo::from_headers(header, byte_header)), - stream: Arc::new(Mutex::new(RawStream::open(open_options).await?)), - }; + let info = ByteStreamInfo::from_headers(header, byte_header); + let mut stream = RawStream::open(open_options).await?; + stream.write_file(path, compress_ok).await?; + stream.close(None).await?; + Ok(info) + } +} - let info = (*writer.info).clone(); - writer.write_file_contents(path).await?; - writer.close().await?; +/// Inline / compression eligibility evaluated over a send's recipients. +struct SendEligibility { + /// Every recipient advertises `clientProtocol >= 2`. + inline: bool, + /// Inline-eligible AND every recipient advertises `CAP_COMPRESSION_DEFLATE_RAW`. + compression: bool, +} - Ok(info) +/// Evaluates inline/compression eligibility over a send's recipients. +/// +/// Recipients are the named `destinations`, or every remote participant for a broadcast +/// (empty `destinations`). An empty recipient set (empty room) is eligible for everything. +fn evaluate_eligibility( + registry: &dyn RemoteParticipantRegistry, + destinations: &[ParticipantIdentity], +) -> SendEligibility { + let recipients: Vec = + if destinations.is_empty() { registry.remote_identities() } else { destinations.to_vec() }; + let inline = recipients + .iter() + .all(|id| registry.remote_client_protocol(id) >= CLIENT_PROTOCOL_DATA_STREAM_V2); + let compression = inline + && recipients.iter().all(|id| { + registry.remote_capabilities(id).contains(&ClientCapability::CompressionDeflateRaw) + }); + SendEligibility { inline, compression } +} + +/// Returns the inline payload and its compression flag: deflate-raw compressed when +/// `compress` is set AND the compressed form is actually smaller, else the raw bytes. +fn maybe_compress_inline(payload: &[u8], compress: bool) -> (Vec, CompressionType) { + if compress { + let compressed = deflate_raw(payload); + if compressed.len() < payload.len() { + return (compressed, CompressionType::DeflateRaw); + } + } + (payload.to_vec(), CompressionType::None) +} + +/// One-shot deflate-raw (raw DEFLATE, no zlib/gzip wrapper) of the full payload. +fn deflate_raw(data: &[u8]) -> Vec { + let mut encoder = + flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(data).expect("deflate write to Vec is infallible"); + encoder.finish().expect("deflate finish into Vec is infallible") +} + +/// Whether the serialized header `DataPacket` fits within the MTU budget. +fn header_packet_fits( + header: &proto::data_stream::Header, + destinations: &[ParticipantIdentity], +) -> bool { + use prost::Message; + let packet = RawStream::create_header_packet(header.clone(), destinations.to_vec()); + packet.encoded_len() <= STREAM_CHUNK_SIZE_BYTES +} + +/// Enforces the header-packet MTU budget on the chunked path (the inline path falls back +/// gracefully instead of erroring). +fn enforce_header_size( + header: &proto::data_stream::Header, + destinations: &[ParticipantIdentity], +) -> StreamResult<()> { + if header_packet_fits(header, destinations) { + Ok(()) + } else { + Err(StreamError::HeaderTooLarge) } } -/// Maximum number of bytes to send in a single chunk. -static CHUNK_SIZE: usize = 15000; +fn build_text_header( + options: &StreamTextOptions, + stream_id: String, + total_length: Option, + inline_content: Option>, + compression: CompressionType, +) -> (proto::data_stream::Header, proto::data_stream::TextHeader) { + let text_header = proto::data_stream::TextHeader { + operation_type: options.operation_type.unwrap_or_default() as i32, + version: options.version.unwrap_or_default(), + reply_to_stream_id: options.reply_to_stream_id.clone().unwrap_or_default(), + attached_stream_ids: options.attached_stream_ids.clone(), + generated: options.generated.unwrap_or_default(), + }; + let header = proto::data_stream::Header { + stream_id, + timestamp: Utc::now().timestamp_millis(), + topic: options.topic.clone(), + mime_type: TEXT_MIME_TYPE.to_owned(), + total_length, + encryption_type: proto::encryption::Type::None.into(), + attributes: options.attributes.clone(), + content_header: Some(proto::data_stream::header::ContentHeader::TextHeader( + text_header.clone(), + )), + inline_content, + compression: compression as i32, + }; + (header, text_header) +} + +fn build_byte_header( + options: &StreamByteOptions, + stream_id: String, + name: String, + total_length: Option, + inline_content: Option>, + compression: CompressionType, +) -> (proto::data_stream::Header, proto::data_stream::ByteHeader) { + let byte_header = proto::data_stream::ByteHeader { name }; + let header = proto::data_stream::Header { + stream_id, + timestamp: Utc::now().timestamp_millis(), + topic: options.topic.clone(), + mime_type: options.mime_type.clone().unwrap_or_else(|| BYTE_MIME_TYPE.to_owned()), + total_length, + encryption_type: proto::encryption::Type::None.into(), + attributes: options.attributes.clone(), + content_header: Some(proto::data_stream::header::ContentHeader::ByteHeader( + byte_header.clone(), + )), + inline_content, + compression: compression as i32, + }; + (header, byte_header) +} + +/// Max chunk content size AND the header-packet MTU budget. Kept below the ~16 KB +/// data-channel MTU for protocol/E2EE framing headroom. +const STREAM_CHUNK_SIZE_BYTES: usize = 15000; // Default MIME type to use for byte streams. static BYTE_MIME_TYPE: &str = "application/octet-stream"; @@ -509,9 +695,449 @@ static BYTE_MIME_TYPE: &str = "application/octet-stream"; /// Default MIME type to use for text streams. static TEXT_MIME_TYPE: &str = "text/plain"; +/// Default name for `send_bytes` byte-stream headers. +static BYTE_DEFAULT_NAME: &str = "unknown"; + #[cfg(test)] mod tests { use super::*; + use std::sync::Mutex as StdMutex; + + const V2: i32 = CLIENT_PROTOCOL_DATA_STREAM_V2; + const DEFLATE: ClientCapability = ClientCapability::CompressionDeflateRaw; + + // --- Fake recipient registry --------------------------------------------------------- + + struct FakeRegistry { + remotes: HashMap)>, + } + + impl FakeRegistry { + fn new() -> Self { + Self { remotes: HashMap::new() } + } + + fn add(mut self, id: &str, client_protocol: i32, caps: &[ClientCapability]) -> Self { + self.remotes.insert(id.to_string(), (client_protocol, caps.to_vec())); + self + } + } + + impl RemoteParticipantRegistry for FakeRegistry { + fn remote_client_protocol(&self, identity: &ParticipantIdentity) -> i32 { + self.remotes.get(&identity.0).map(|(p, _)| *p).unwrap_or(0) + } + fn remote_capabilities(&self, identity: &ParticipantIdentity) -> Vec { + self.remotes.get(&identity.0).map(|(_, c)| c.clone()).unwrap_or_default() + } + fn remote_identities(&self) -> Vec { + self.remotes.keys().map(|k| ParticipantIdentity(k.clone())).collect() + } + } + + fn pre_v2_room() -> FakeRegistry { + FakeRegistry::new().add("alice", 0, &[]).add("bob", 0, &[]).add("jim", 1, &[]) + } + + fn all_v2_room() -> FakeRegistry { + FakeRegistry::new().add("alice", V2, &[DEFLATE]).add("bob", V2, &[DEFLATE]).add( + "noCompression", + V2, + &[], + ) + } + + fn mixed_room() -> FakeRegistry { + FakeRegistry::new() + .add("alice", 0, &[]) + .add("bob", V2, &[DEFLATE]) + .add("jim", V2, &[DEFLATE]) + .add("mallory", 1, &[]) + .add("noCompression", V2, &[]) + } + + // --- Capture harness ----------------------------------------------------------------- + + type Sent = Arc>>; + + fn setup() -> (OutgoingStreamManager, Sent) { + let (manager, mut packet_rx) = OutgoingStreamManager::new(); + let sent: Sent = Arc::new(StdMutex::new(Vec::new())); + let sink = sent.clone(); + tokio::spawn(async move { + while let Ok((packet, responder)) = packet_rx.recv().await { + sink.lock().unwrap().push(packet); + let _ = responder.respond(Ok(())); + } + }); + (manager, sent) + } + + fn ids(list: &[&str]) -> Vec { + list.iter().map(|s| ParticipantIdentity(s.to_string())).collect() + } + + fn text_opts(topic: &str, dests: &[&str]) -> StreamTextOptions { + StreamTextOptions { + topic: topic.to_string(), + destination_identities: ids(dests), + ..Default::default() + } + } + + fn byte_opts(topic: &str, dests: &[&str]) -> StreamByteOptions { + StreamByteOptions { + topic: topic.to_string(), + destination_identities: ids(dests), + ..Default::default() + } + } + + fn header(p: &proto::DataPacket) -> &proto::data_stream::Header { + match p.value.as_ref().unwrap() { + proto::data_packet::Value::StreamHeader(h) => h, + _ => panic!("expected stream header"), + } + } + + fn chunk(p: &proto::DataPacket) -> &proto::data_stream::Chunk { + match p.value.as_ref().unwrap() { + proto::data_packet::Value::StreamChunk(c) => c, + _ => panic!("expected stream chunk"), + } + } + + fn is_text_header(h: &proto::data_stream::Header) -> bool { + matches!(h.content_header, Some(proto::data_stream::header::ContentHeader::TextHeader(_))) + } + + fn is_byte_header(h: &proto::data_stream::Header) -> bool { + matches!(h.content_header, Some(proto::data_stream::header::ContentHeader::ByteHeader(_))) + } + + fn assert_trailer(p: &proto::DataPacket) { + match p.value.as_ref().unwrap() { + proto::data_packet::Value::StreamTrailer(t) => assert_eq!(t.reason, ""), + _ => panic!("expected stream trailer"), + } + } + + fn deflate_raw_i32() -> i32 { + CompressionType::DeflateRaw as i32 + } + fn none_i32() -> i32 { + CompressionType::None as i32 + } + + /// ~50 KB of deterministic, somewhat-compressible text (repeated marker + pseudo-random + /// lowercase). Compresses to >15 KB (so it can't inline) but well under its raw size. + fn somewhat_compressible(blocks: usize) -> String { + let mut s = String::new(); + let mut state: u64 = 0x1234_5678_9abc_def0; + for _ in 0..blocks { + s.push_str("hello world"); + for _ in 0..1000 { + state = state.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); + s.push((b'a' + ((state >> 33) % 26) as u8) as char); + } + } + s + } + + // --- Pre-v2 room: legacy, uncompressed, multi-packet --------------------------------- + + #[tokio::test] + async fn pre_v2_short_text_is_legacy_multipacket() { + let (m, sent) = setup(); + m.send_text("hello world", text_opts("chat", &[]), &pre_v2_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 3); + let h = header(&p[0]); + assert!(is_text_header(h)); + assert_eq!(h.topic, "chat"); + assert_eq!(h.compression, none_i32()); + assert!(h.inline_content.is_none()); + let c = chunk(&p[1]); + assert_eq!(c.chunk_index, 0); + assert_eq!(c.content, b"hello world"); + assert_trailer(&p[2]); + } + + #[tokio::test] + async fn pre_v2_long_text_splits_at_mtu() { + let (m, sent) = setup(); + let text = "A".repeat(40_000); + m.send_text(&text, text_opts("chat", &[]), &pre_v2_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 5); // header + 3 chunks + trailer + assert_eq!(header(&p[0]).compression, none_i32()); + assert_eq!(chunk(&p[1]).content.len(), 15_000); + assert_eq!(chunk(&p[2]).content.len(), 15_000); + assert_eq!(chunk(&p[3]).content.len(), 10_000); + assert_eq!(chunk(&p[1]).chunk_index, 0); + assert_eq!(chunk(&p[3]).chunk_index, 2); + assert_trailer(&p[4]); + } + + #[tokio::test] + async fn pre_v2_bytes_is_legacy_multipacket() { + let (m, sent) = setup(); + m.send_bytes([0u8, 1, 2, 3], byte_opts("blob", &[]), &pre_v2_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 3); + let h = header(&p[0]); + assert!(is_byte_header(h)); + assert_eq!(h.compression, none_i32()); + assert!(h.inline_content.is_none()); + assert_eq!(chunk(&p[1]).content, vec![0, 1, 2, 3]); + assert_trailer(&p[2]); + } + + // --- All-v2 room: inline + compression ----------------------------------------------- + + #[tokio::test] + async fn v2_short_compressible_text_inlines_compressed() { + let (m, sent) = setup(); + let text = "hello hello compressible world"; + m.send_text(text, text_opts("chat", &["alice", "bob"]), &all_v2_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 1); + let h = header(&p[0]); + assert!(is_text_header(h)); + assert_eq!(h.compression, deflate_raw_i32()); + let inline = h.inline_content.as_ref().unwrap(); + assert_ne!(inline.as_slice(), text.as_bytes()); // compressed, not raw + } + + #[tokio::test] + async fn v2_short_incompressible_text_inlines_raw() { + let (m, sent) = setup(); + m.send_text("short", text_opts("chat", &["alice", "bob"]), &all_v2_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 1); + let h = header(&p[0]); + assert_eq!(h.compression, none_i32()); + assert_eq!(h.inline_content.as_ref().unwrap().as_slice(), b"short"); + } + + #[tokio::test] + async fn v2_no_compression_cap_inlines_raw() { + let (m, sent) = setup(); + let text = "hello hello compressible world"; + m.send_text(text, text_opts("chat", &["noCompression"]), &all_v2_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 1); // inline (gated on protocol) still happens + let h = header(&p[0]); + assert_eq!(h.compression, none_i32()); // compression gated off by missing cap + assert_eq!(h.inline_content.as_ref().unwrap().as_slice(), text.as_bytes()); + } + + #[tokio::test] + async fn v2_large_highly_compressible_text_still_inlines() { + let (m, sent) = setup(); + let text = "hello world".repeat(20_000); + m.send_text(&text, text_opts("chat", &["alice", "bob"]), &all_v2_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 1); + let h = header(&p[0]); + assert_eq!(h.compression, deflate_raw_i32()); + assert!(h.inline_content.as_ref().unwrap().len() < text.len()); + } + + #[tokio::test] + async fn v2_somewhat_compressible_text_is_compressed_multipacket() { + let (m, sent) = setup(); + let text = somewhat_compressible(50); + m.send_text(&text, text_opts("chat", &["alice", "bob"]), &all_v2_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + let h = header(&p[0]); + assert_eq!(h.compression, deflate_raw_i32()); + assert!(h.inline_content.is_none()); + let chunks: Vec<_> = p[1..p.len() - 1].iter().map(chunk).collect(); + // Multi-packet, but fewer chunks than an uncompressed send would need (ceil(len/15000)). + let uncompressed_chunks = text.len().div_ceil(STREAM_CHUNK_SIZE_BYTES); + assert!(chunks.len() >= 2); + assert!(chunks.len() < uncompressed_chunks); + assert_eq!(chunks[0].content.len(), STREAM_CHUNK_SIZE_BYTES); // first chunk is full MTU + let total: usize = chunks.iter().map(|c| c.content.len()).sum(); + assert!(total < text.len()); // compressed + assert_trailer(p.last().unwrap()); + } + + #[tokio::test] + async fn v2_compress_false_short_inlines_raw() { + let (m, sent) = setup(); + let text = "hello hello compressible world"; + let opts = + StreamTextOptions { compress: Some(false), ..text_opts("chat", &["alice", "bob"]) }; + m.send_text(text, opts, &all_v2_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 1); + let h = header(&p[0]); + assert_eq!(h.compression, none_i32()); + assert_eq!(h.inline_content.as_ref().unwrap().as_slice(), text.as_bytes()); + } + + #[tokio::test] + async fn v2_compress_false_large_is_uncompressed_multipacket() { + let (m, sent) = setup(); + let text = "B".repeat(50_000); + let opts = + StreamTextOptions { compress: Some(false), ..text_opts("chat", &["alice", "bob"]) }; + m.send_text(&text, opts, &all_v2_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 6); // header + 4 chunks + trailer + assert_eq!(header(&p[0]).compression, none_i32()); + assert_eq!(chunk(&p[1]).content.len(), 15_000); + } + + // --- Incremental writers never compress or inline ------------------------------------ + + #[tokio::test] + async fn stream_text_never_compresses_or_inlines() { + let (m, sent) = setup(); + let writer = m.stream_text(text_opts("chat", &["noCompression"])).await.unwrap(); + assert_eq!(sent.lock().unwrap().len(), 1); + let h0 = sent.lock().unwrap()[0].clone(); + assert!(is_text_header(header(&h0))); + assert_eq!(header(&h0).compression, none_i32()); + assert!(header(&h0).inline_content.is_none()); + + writer.write("hello world").await.unwrap(); + assert_eq!(sent.lock().unwrap().len(), 2); + assert_eq!(chunk(&sent.lock().unwrap()[1]).content, b"hello world"); + + writer.close().await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 3); + assert_trailer(&p[2]); + } + + #[tokio::test] + async fn stream_bytes_never_compresses_or_inlines() { + let (m, sent) = setup(); + let writer = m.stream_bytes(byte_opts("blob", &["noCompression"])).await.unwrap(); + assert_eq!(sent.lock().unwrap().len(), 1); + assert_eq!(header(&sent.lock().unwrap()[0]).compression, none_i32()); + + writer.write(&[0u8, 1, 2, 3]).await.unwrap(); + assert_eq!(chunk(&sent.lock().unwrap()[1]).content, vec![0, 1, 2, 3]); + + writer.close().await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 3); + assert_trailer(&p[2]); + } + + // --- send_bytes inline behavior ------------------------------------------------------ + + #[tokio::test] + async fn v2_send_bytes_short_compressible_inlines_compressed() { + let (m, sent) = setup(); + let payload = "hello hello compressible world".as_bytes().to_vec(); + let mut opts = byte_opts("blob", &["alice", "bob"]); + opts.attributes.insert("foo".to_string(), "bar".to_string()); + let info = m.send_bytes(&payload, opts, &all_v2_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 1); + let h = header(&p[0]); + assert!(is_byte_header(h)); + assert_eq!(h.compression, deflate_raw_i32()); + assert_ne!(h.inline_content.as_ref().unwrap().as_slice(), payload.as_slice()); + assert_eq!(info.name, "unknown"); + assert_eq!(info.mime_type, "application/octet-stream"); + assert_eq!(info.total_length, Some(payload.len() as u64)); + assert_eq!(info.attributes.get("foo"), Some(&"bar".to_string())); + } + + // --- Mixed room ---------------------------------------------------------------------- + + #[tokio::test] + async fn mixed_broadcast_falls_back_to_legacy() { + let (m, sent) = setup(); + m.send_text("hello world", text_opts("chat", &[]), &mixed_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 3); + assert_eq!(header(&p[0]).compression, none_i32()); + assert!(header(&p[0]).inline_content.is_none()); + assert_eq!(chunk(&p[1]).content, b"hello world"); + } + + #[tokio::test] + async fn mixed_targeted_v2_subset_inlines_compressed() { + let (m, sent) = setup(); + let text = "hello hello compressible world"; + m.send_text(text, text_opts("chat", &["bob", "jim"]), &mixed_room()).await.unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 1); + let h = header(&p[0]); + assert_eq!(h.compression, deflate_raw_i32()); + assert_ne!(h.inline_content.as_ref().unwrap().as_slice(), text.as_bytes()); + } + + #[tokio::test] + async fn mixed_targeted_subset_missing_cap_inlines_uncompressed() { + let (m, sent) = setup(); + let text = "hello hello compressible world"; + m.send_text(text, text_opts("chat", &["bob", "jim", "noCompression"]), &mixed_room()) + .await + .unwrap(); + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 1); + let h = header(&p[0]); + assert_eq!(h.compression, none_i32()); + assert_eq!(h.inline_content.as_ref().unwrap().as_slice(), text.as_bytes()); + } + + // --- send_file ----------------------------------------------------------------------- + + async fn write_temp_file(bytes: &[u8]) -> std::path::PathBuf { + let path = std::env::temp_dir().join(format!("lk_ds_test_{}.bin", create_random_uuid())); + tokio::fs::write(&path, bytes).await.unwrap(); + path + } + + #[tokio::test] + async fn send_file_never_inlines_and_compresses_when_eligible() { + let (m, sent) = setup(); + let path = write_temp_file(&vec![0x01u8; 10_000]).await; + m.send_file(&path, byte_opts("file", &["alice", "bob"]), &all_v2_room()).await.unwrap(); + let _ = tokio::fs::remove_file(&path).await; + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 3); // header + 1 chunk + trailer, NOT inline + let h = header(&p[0]); + assert!(is_byte_header(h)); + assert_eq!(h.compression, deflate_raw_i32()); + assert!(h.inline_content.is_none()); + assert!(chunk(&p[1]).content.len() < 10_000); // compressed + assert_trailer(&p[2]); + } + + #[tokio::test] + async fn send_file_uncompressed_splits_at_mtu() { + let (m, sent) = setup(); + let path = write_temp_file(&vec![0x07u8; 20_000]).await; + m.send_file(&path, byte_opts("file", &[]), &pre_v2_room()).await.unwrap(); + let _ = tokio::fs::remove_file(&path).await; + let p = sent.lock().unwrap().clone(); + assert_eq!(p.len(), 4); // header + 15000 + 5000 + trailer + assert_eq!(header(&p[0]).compression, none_i32()); + assert_eq!(chunk(&p[1]).content.len(), 15_000); + assert_eq!(chunk(&p[2]).content.len(), 5_000); + assert_eq!(chunk(&p[2]).chunk_index, 1); + assert_trailer(&p[3]); + } + + // --- Header size limit --------------------------------------------------------------- + + #[tokio::test] + async fn oversized_attributes_on_chunked_path_errors() { + let (m, _sent) = setup(); + let mut opts = text_opts("chat", &[]); // pre-v2 below => chunked path + opts.attributes.insert("big".to_string(), "x".repeat(20_000)); + let result = m.send_text("hello", opts, &pre_v2_room()).await; + assert!(matches!(result, Err(StreamError::HeaderTooLarge))); + } // Regression test for CLT-2773: dropping a `RawStream` on a thread that has // no Tokio runtime in TLS (e.g. the .NET GC finalizer thread in the Unity diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index c84e42538..b7e38b181 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -48,7 +48,7 @@ use tokio::sync::{ pub use self::{ data_stream::*, e2ee::{manager::E2eeManager, E2eeOptions}, - participant::{ParticipantKind, ParticipantKindDetail, ParticipantState}, + participant::{ClientCapability, ParticipantKind, ParticipantKindDetail, ParticipantState}, }; pub use crate::rtc_engine::SimulateScenario; use crate::{ @@ -577,6 +577,7 @@ impl Room { e2ee_manager.encryption_type(), pi.permission, pi.client_protocol, + pi.capabilities.iter().map(|&c| ClientCapability::from(c)).collect(), ); let dispatcher = Dispatcher::::default(); @@ -759,6 +760,7 @@ impl Room { pi.joined_at_ms, pi.permission, pi.client_protocol, + pi.capabilities.iter().map(|&c| ClientCapability::from(c)).collect(), ) }; participant.update_info(pi.clone()); @@ -1195,6 +1197,7 @@ impl RoomSession { pi.joined_at_ms, pi.permission, pi.client_protocol, + pi.capabilities.iter().map(|&c| ClientCapability::from(c)).collect(), ) }; @@ -2000,6 +2003,7 @@ impl RoomSession { joined_at: i64, permission: Option, client_protocol: i32, + capabilities: Vec, ) -> RemoteParticipant { let participant = RemoteParticipant::new( self.rtc_engine.clone(), @@ -2015,6 +2019,7 @@ impl RoomSession { self.options.auto_subscribe, permission, client_protocol, + capabilities, ); participant.on_track_published({ @@ -2233,6 +2238,20 @@ impl RoomSession { } } +impl rpc::RemoteParticipantRegistry for RoomSession { + fn remote_client_protocol(&self, identity: &ParticipantIdentity) -> i32 { + self.get_remote_client_protocol(identity) + } + + fn remote_capabilities(&self, identity: &ParticipantIdentity) -> Vec { + self.remote_participants.read().get(identity).map(|p| p.capabilities()).unwrap_or_default() + } + + fn remote_identities(&self) -> Vec { + self.remote_participants.read().keys().cloned().collect() + } +} + /// Receives stream readers for newly-opened streams and dispatches room events. /// /// Intercepts text streams on RPC topics (`lk.rpc_request`, `lk.rpc_response`) diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 12c4ddad9..a5be6f0e8 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -23,8 +23,8 @@ use std::{ }; use super::{ - ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantKindDetail, ParticipantState, - ParticipantTrackPermission, + ClientCapability, ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantKindDetail, + ParticipantState, ParticipantTrackPermission, }; use crate::{ data_stream::{ @@ -110,6 +110,7 @@ impl LocalParticipant { encryption_type: EncryptionType, permission: Option, client_protocol: i32, + capabilities: Vec, ) -> Self { Self { inner: super::new_inner( @@ -125,6 +126,7 @@ impl LocalParticipant { joined_at, permission, client_protocol, + capabilities, ), local: Arc::new(LocalInfo { events: LocalEvents::default(), @@ -910,7 +912,8 @@ impl LocalParticipant { text: &str, options: StreamTextOptions, ) -> StreamResult { - self.session().unwrap().outgoing_stream_manager.send_text(text, options).await + let session = self.session().unwrap(); + session.outgoing_stream_manager.send_text(text, options, session.as_ref()).await } /// Send a file on disk to participants in the room. @@ -930,7 +933,8 @@ impl LocalParticipant { path: impl AsRef, options: StreamByteOptions, ) -> StreamResult { - self.session().unwrap().outgoing_stream_manager.send_file(path, options).await + let session = self.session().unwrap(); + session.outgoing_stream_manager.send_file(path, options, session.as_ref()).await } /// Send an in-memory blob of bytes to participants in the room. @@ -947,7 +951,8 @@ impl LocalParticipant { data: impl AsRef<[u8]>, options: StreamByteOptions, ) -> StreamResult { - self.session().unwrap().outgoing_stream_manager.send_bytes(data, options).await + let session = self.session().unwrap(); + session.outgoing_stream_manager.send_bytes(data, options, session.as_ref()).await } /// Stream text incrementally to participants in the room. diff --git a/livekit/src/room/participant/mod.rs b/livekit/src/room/participant/mod.rs index 5dcdc83d7..1ff2f6b58 100644 --- a/livekit/src/room/participant/mod.rs +++ b/livekit/src/room/participant/mod.rs @@ -85,6 +85,44 @@ pub enum DisconnectReason { AgentError, } +/// A capability a participant's client advertises (mirrors `ClientInfo.Capability`). +/// +/// Stored typed rather than as the raw protobuf `i32` so the public accessor doesn't leak +/// protobuf types, while `Unknown` preserves values this SDK build doesn't recognize. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum ClientCapability { + PacketTrailer, + CompressionDeflateRaw, + Unknown(i32), +} + +impl From for ClientCapability { + fn from(value: i32) -> Self { + match proto::client_info::Capability::try_from(value) { + Ok(proto::client_info::Capability::CapPacketTrailer) => Self::PacketTrailer, + Ok(proto::client_info::Capability::CapCompressionDeflateRaw) => { + Self::CompressionDeflateRaw + } + // `CapUnused` and any value not recognized by this build. + _ => Self::Unknown(value), + } + } +} + +impl From for i32 { + fn from(value: ClientCapability) -> Self { + match value { + ClientCapability::PacketTrailer => { + proto::client_info::Capability::CapPacketTrailer as i32 + } + ClientCapability::CompressionDeflateRaw => { + proto::client_info::Capability::CapCompressionDeflateRaw as i32 + } + ClientCapability::Unknown(value) => value, + } + } +} + #[derive(Debug, Clone)] pub enum Participant { Local(LocalParticipant), @@ -146,6 +184,7 @@ struct ParticipantInfo { pub joined_at: i64, pub permission: Option, pub client_protocol: i32, + pub capabilities: Vec, } type TrackMutedHandler = Box; @@ -197,6 +236,7 @@ pub(super) fn new_inner( joined_at: i64, permission: Option, client_protocol: i32, + capabilities: Vec, ) -> Arc { Arc::new(ParticipantInner { rtc_engine, @@ -216,6 +256,7 @@ pub(super) fn new_inner( joined_at, permission, client_protocol, + capabilities, }), track_publications: Default::default(), events: Default::default(), @@ -269,6 +310,7 @@ pub(super) fn update_info( } info.client_protocol = new_info.client_protocol; + info.capabilities = new_info.capabilities.iter().map(|&c| ClientCapability::from(c)).collect(); } pub(super) fn set_speaking( diff --git a/livekit/src/room/participant/remote_participant.rs b/livekit/src/room/participant/remote_participant.rs index da0f6a663..618da0cca 100644 --- a/livekit/src/room/participant/remote_participant.rs +++ b/livekit/src/room/participant/remote_participant.rs @@ -25,8 +25,8 @@ use livekit_runtime::timeout; use parking_lot::Mutex; use super::{ - ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantKindDetail, ParticipantState, - TrackKind, + ClientCapability, ConnectionQuality, ParticipantInner, ParticipantKind, ParticipantKindDetail, + ParticipantState, TrackKind, }; use crate::{prelude::*, rtc_engine::RtcEngine, track::TrackError}; @@ -86,6 +86,7 @@ impl RemoteParticipant { auto_subscribe: bool, permission: Option, client_protocol: i32, + capabilities: Vec, ) -> Self { Self { inner: super::new_inner( @@ -101,6 +102,7 @@ impl RemoteParticipant { joined_at, permission, client_protocol, + capabilities, ), remote: Arc::new(RemoteInfo { events: Default::default(), auto_subscribe }), } @@ -577,6 +579,16 @@ impl RemoteParticipant { self.inner.info.read().client_protocol } + /// The capabilities this remote participant's client advertised at join. + pub fn capabilities(&self) -> Vec { + self.inner.info.read().capabilities.clone() + } + + /// Whether this remote participant can decompress deflate-raw data streams. + pub(crate) fn supports_compression(&self) -> bool { + self.inner.info.read().capabilities.contains(&ClientCapability::CompressionDeflateRaw) + } + pub fn is_encrypted(&self) -> bool { *self.inner.is_encrypted.read() } diff --git a/livekit/src/room/rpc/mod.rs b/livekit/src/room/rpc/mod.rs index 8f14647fa..2202163c3 100644 --- a/livekit/src/room/rpc/mod.rs +++ b/livekit/src/room/rpc/mod.rs @@ -23,6 +23,7 @@ pub use server::{HandleRequestOptions, RpcServerManager}; use crate::data_stream::{StreamResult, StreamTextOptions, TextStreamInfo}; use crate::room::id::ParticipantIdentity; +use crate::room::participant::ClientCapability; use livekit_protocol::RpcError as RpcError_Proto; use std::{error::Error, fmt::Display, future::Future, time::Duration}; @@ -41,11 +42,27 @@ pub(crate) const ATTR_METHOD: &str = "lk.rpc_request_method"; pub(crate) const ATTR_RESPONSE_TIMEOUT_MS: &str = "lk.rpc_request_response_timeout_ms"; pub(crate) const ATTR_VERSION: &str = "lk.rpc_request_version"; +/// Read access to remote participants' advertised protocol and capabilities. +/// +/// Shared by the RPC transport (v1/v2 transport selection) and the data-stream send +/// path (inline / compression eligibility), so both consult a single abstraction over +/// the room's remote participants and both are unit-testable with a fake. +pub(crate) trait RemoteParticipantRegistry: Send + Sync { + /// A remote participant's `client_protocol`, or `CLIENT_PROTOCOL_DEFAULT` (0) if unknown. + fn remote_client_protocol(&self, identity: &ParticipantIdentity) -> i32; + + /// A remote participant's advertised capabilities, or empty if unknown. + fn remote_capabilities(&self, identity: &ParticipantIdentity) -> Vec; + + /// The identities of every remote participant, used to resolve a broadcast send. + fn remote_identities(&self) -> Vec; +} + /// Transport abstraction for RPC operations. /// /// Decouples the RPC managers from concrete engine/session types, /// enabling in-memory unit testing with a mock transport. -pub(crate) trait RpcTransport: Send + Sync { +pub(crate) trait RpcTransport: RemoteParticipantRegistry { /// Send a data packet (used for v1 RPC packets and ACKs). fn publish_data( &self, @@ -59,9 +76,6 @@ pub(crate) trait RpcTransport: Send + Sync { options: StreamTextOptions, ) -> impl Future> + Send; - /// Look up a remote participant's client_protocol value. - fn remote_client_protocol(&self, identity: &ParticipantIdentity) -> i32; - /// Get the server version string, if available. fn server_version(&self) -> Option; } @@ -69,6 +83,20 @@ pub(crate) trait RpcTransport: Send + Sync { /// Production implementation of `RpcTransport` backed by a `RoomSession`. pub(crate) struct SessionTransport(pub(crate) std::sync::Arc); +impl RemoteParticipantRegistry for SessionTransport { + fn remote_client_protocol(&self, identity: &ParticipantIdentity) -> i32 { + self.0.remote_client_protocol(identity) + } + + fn remote_capabilities(&self, identity: &ParticipantIdentity) -> Vec { + self.0.remote_capabilities(identity) + } + + fn remote_identities(&self) -> Vec { + self.0.remote_identities() + } +} + impl RpcTransport for SessionTransport { async fn publish_data( &self, @@ -86,11 +114,7 @@ impl RpcTransport for SessionTransport { text: &str, options: StreamTextOptions, ) -> StreamResult { - self.0.outgoing_stream_manager.send_text(text, options).await - } - - fn remote_client_protocol(&self, identity: &ParticipantIdentity) -> i32 { - self.0.get_remote_client_protocol(identity) + self.0.outgoing_stream_manager.send_text(text, options, self.0.as_ref()).await } fn server_version(&self) -> Option { diff --git a/livekit/src/room/rpc/tests.rs b/livekit/src/room/rpc/tests.rs index b4fc3d5e9..ce40e78c6 100644 --- a/livekit/src/room/rpc/tests.rs +++ b/livekit/src/room/rpc/tests.rs @@ -18,6 +18,7 @@ use crate::data_stream::{ }; use crate::e2ee::EncryptionType; use crate::room::id::ParticipantIdentity; +use crate::room::participant::ClientCapability; use crate::room::RoomError; use bytes::Bytes; use chrono::Utc; @@ -135,12 +136,22 @@ impl RpcTransport for MockTransport { }) } + fn server_version(&self) -> Option { + self.server_ver.clone() + } +} + +impl RemoteParticipantRegistry for MockTransport { fn remote_client_protocol(&self, identity: &ParticipantIdentity) -> i32 { self.remote_protocols.get(&identity.0).copied().unwrap_or(CLIENT_PROTOCOL_DEFAULT) } - fn server_version(&self) -> Option { - self.server_ver.clone() + fn remote_capabilities(&self, _identity: &ParticipantIdentity) -> Vec { + Vec::new() + } + + fn remote_identities(&self) -> Vec { + self.remote_protocols.keys().map(|k| ParticipantIdentity(k.clone())).collect() } } diff --git a/livekit/tests/data_stream_test.rs b/livekit/tests/data_stream_test.rs index 911a37548..240a142e8 100644 --- a/livekit/tests/data_stream_test.rs +++ b/livekit/tests/data_stream_test.rs @@ -70,6 +70,81 @@ async fn test_send_bytes() -> Result<()> { Ok(()) } +/// End-to-end round-trip of a large, somewhat-compressible text. Both peers are Rust SDK +/// clients advertising data streams v2 + deflate-raw, so the sender compresses across multiple +/// chunks and the receiver decompresses — validating the v2 compression path on the real wire. +#[cfg(feature = "__lk-e2e-test")] +#[tokio::test] +async fn test_send_large_compressible_text() -> Result<()> { + let mut rooms = test_rooms(2).await?; + let (sending_room, _) = rooms.pop().unwrap(); + let (_, mut receiving_event_rx) = rooms.pop().unwrap(); + + // ~50 KB of deterministic pseudo-random lowercase: too big to inline, compresses well + // under its raw size, exercising the chunked-compressed path. + let mut text = String::new(); + let mut state: u64 = 0x1234_5678_9abc_def0; + for _ in 0..50_000 { + state = state.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); + text.push((b'a' + ((state >> 33) % 26) as u8) as char); + } + let expected = text.clone(); + + let send = async move { + let options = StreamTextOptions { topic: "some-topic".into(), ..Default::default() }; + sending_room.local_participant().send_text(&text, options).await?; + Ok(()) + }; + let receive = async move { + while let Some(event) = receiving_event_rx.recv().await { + let RoomEvent::TextStreamOpened { reader, topic, .. } = event else { + continue; + }; + assert_eq!(topic, "some-topic"); + let reader = reader.take().ok_or_else(|| anyhow!("Failed to take reader"))?; + assert_eq!(reader.read_all().await?, expected); + break; + } + Ok(()) + }; + + timeout(Duration::from_secs(10), async { try_join!(send, receive) }).await??; + Ok(()) +} + +/// End-to-end round-trip of a large in-memory byte payload, validating the v2 byte-stream path. +#[cfg(feature = "__lk-e2e-test")] +#[tokio::test] +async fn test_send_large_bytes() -> Result<()> { + let mut rooms = test_rooms(2).await?; + let (sending_room, _) = rooms.pop().unwrap(); + let (_, mut receiving_event_rx) = rooms.pop().unwrap(); + + let payload: Vec = (0..50_000u32).map(|i| (i % 251) as u8).collect(); + let expected = payload.clone(); + + let send = async move { + let options = StreamByteOptions { topic: "some-topic".into(), ..Default::default() }; + sending_room.local_participant().send_bytes(&payload, options).await?; + Ok(()) + }; + let receive = async move { + while let Some(event) = receiving_event_rx.recv().await { + let RoomEvent::ByteStreamOpened { reader, topic, .. } = event else { + continue; + }; + assert_eq!(topic, "some-topic"); + let reader = reader.take().ok_or_else(|| anyhow!("Failed to take reader"))?; + assert_eq!(reader.read_all().await?, expected); + break; + } + Ok(()) + }; + + timeout(Duration::from_secs(10), async { try_join!(send, receive) }).await??; + Ok(()) +} + #[cfg(feature = "__lk-e2e-test")] #[tokio::test] async fn test_send_text() -> Result<()> {