Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b861454
feat: commit initial data streams v2 implementation
1egoman Jun 26, 2026
87b96c7
feat: add ability to render data streams in wgpu_room example app
1egoman Jun 26, 2026
ca896ff
feat: render participant capabilities and client protocol versions
1egoman Jun 26, 2026
833b6ee
fix: Create data_streams_v2.md
1egoman Jun 26, 2026
46a0ac0
fix: remove ClientCapability from prelude
1egoman Jul 1, 2026
c92a56f
feat: allow data stream compress option to be toggled via livekit-ffi
1egoman Jul 1, 2026
7dcdde1
fix: run cargo fmt
1egoman Jul 1, 2026
5952b79
generated protobuf
github-actions[bot] Jul 1, 2026
3ec690a
fix: address toc-tou issue with decompressor state
1egoman Jul 1, 2026
0d93e09
fix: cleanup data stream v2 test fixture initialization
1egoman Jul 1, 2026
7162dbc
feat: make client capabilities proto deserialization use TryFrom
1egoman Jul 1, 2026
10c34c7
fix: run cargo fmt
1egoman Jul 1, 2026
ae7c6d1
fix: remove dead code
1egoman Jul 1, 2026
d537a7f
fix: add missing data_streams_ui.rs file
1egoman Jul 1, 2026
4c5138f
fix: bump protocol version to try to fix ci build error
1egoman Jul 1, 2026
8e7509f
generated protobuf
github-actions[bot] Jul 1, 2026
f9449ca
fix: downgrade protocol to before data track schema metadata
1egoman Jul 1, 2026
3768723
refactor: convert data stream open tuple to struct and add is_compres…
1egoman Jul 1, 2026
db5c6aa
feat: add is_compressed / is_inline to ByteStreamInfo / TextStreamInfo
1egoman Jul 1, 2026
e8d64d2
Revert "refactor: convert data stream open tuple to struct and add is…
1egoman Jul 1, 2026
c70a15e
feat: explicitly check is_compressed / is_inline in e2e tests
1egoman Jul 1, 2026
c8d4a44
fix: add e2e test that tests compress=false path
1egoman Jul 1, 2026
03c81b3
feat: add e2e feature on compressed / inline fields
1egoman Jul 1, 2026
ea5514f
feat: get random bytes e2e test to pass by using much higher quality …
1egoman Jul 1, 2026
7495a24
fix: comment out part of old example code
1egoman Jul 1, 2026
8b7ec4d
refactor: move RemoteParticipantRegistry to participant/registry.rs
1egoman Jul 2, 2026
54b160c
refactor: restructure send_text / send_bytes
1egoman Jul 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/data_streams_v2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
livekit: patch
livekit-api: patch
livekit-datatrack: patch
livekit-ffi: patch
livekit-protocol: patch
livekit-uniffi: patch
---

Add data streams v2 - #1192 (@1egoman)
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions examples/wgpu_room/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
data_streams_ui::DataStreamsUiState,
data_track::{LocalDataTrackTile, RemoteDataTrackTile, MAX_VALUE, TIME_WINDOW},
rpc_ui::RpcUiState,
service::{AsyncCmd, LkService, UiCmd},
Comment thread
1egoman marked this conversation as resolved.
Expand All @@ -25,6 +26,7 @@ struct AppState {
enum RightTab {
Participants,
Rpc,
DataStreams,
}

pub struct LkApp {
Expand All @@ -38,6 +40,7 @@ pub struct LkApp {
render_state: egui_wgpu::RenderState,
service: LkService,
rpc_ui: RpcUiState,
data_streams_ui: DataStreamsUiState,
right_tab: RightTab,
}

Expand Down Expand Up @@ -74,6 +77,7 @@ impl LkApp {
connection_failure: None,
render_state: cc.wgpu_render_state.clone().unwrap(),
rpc_ui: RpcUiState::default(),
data_streams_ui: DataStreamsUiState::default(),
right_tab: RightTab::Participants,
}
}
Expand Down Expand Up @@ -130,11 +134,28 @@ impl LkApp {
self.remote_data_tracks
.push(RemoteDataTrackTile::new(self.async_runtime.handle(), track));
}
RoomEvent::TextStreamOpened { reader, topic, participant_identity } => {
self.data_streams_ui.on_text_stream(
reader,
topic,
participant_identity,
&self.service,
);
}
RoomEvent::ByteStreamOpened { reader, topic, participant_identity } => {
self.data_streams_ui.on_byte_stream(
reader,
topic,
participant_identity,
&self.service,
);
}
RoomEvent::Disconnected { reason: _ } => {
self.video_renderers.clear();
self.local_data_tracks.clear();
self.remote_data_tracks.clear();
self.rpc_ui.on_disconnect();
self.data_streams_ui.on_disconnect();
}
_ => {}
}
Expand Down Expand Up @@ -264,6 +285,7 @@ impl LkApp {
ui.horizontal(|ui| {
ui.selectable_value(&mut self.right_tab, RightTab::Participants, "Participants");
ui.selectable_value(&mut self.right_tab, RightTab::Rpc, "RPC");
ui.selectable_value(&mut self.right_tab, RightTab::DataStreams, "Data Streams");
});
ui.separator();

Expand All @@ -281,6 +303,13 @@ impl LkApp {
rpc_ui.show(ui, service, &room);
});
}
RightTab::DataStreams => {
let service = &self.service;
let data_streams_ui = &mut self.data_streams_ui;
egui::ScrollArea::vertical().show(ui, |ui| {
data_streams_ui.show(ui, service, &room);
});
}
}
}

Expand All @@ -300,6 +329,15 @@ impl LkApp {
sorted_tracks.sort_by(|a, b| a.as_str().cmp(b.as_str()));

ui.monospace(&participant.identity().0);
ui.label(format!("Client protocol: {}", participant.client_protocol()));
let caps = participant.capabilities();
let caps_str = if caps.is_empty() {
"(none)".to_string()
} else {
caps.iter().map(|c| format!("{:?}", c)).collect::<Vec<_>>().join(", ")
};
ui.label(format!("Capabilities: {}", caps_str));

for tsid in sorted_tracks {
let publication = tracks.get(&tsid).unwrap().clone();

Expand Down
1 change: 1 addition & 0 deletions examples/wgpu_room/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::time::Duration;

mod app;
mod data_streams_ui;

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Build (aarch64-unknown-linux-gnu)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Test (x86_64-unknown-linux-gnu)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Build (aarch64-apple-ios)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Test (aarch64-apple-darwin)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Build (aarch64-linux-android)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Build (x86_64-unknown-linux-gnu)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Build (armv7-linux-androideabi)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Build (x86_64-linux-android)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Build (aarch64-apple-ios-sim)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Build (x86_64-apple-darwin)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Test (x86_64-pc-windows-msvc)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Build (aarch64-apple-darwin)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Build (aarch64-pc-windows-msvc)

file not found for module `data_streams_ui`

Check failure on line 8 in examples/wgpu_room/src/main.rs

View workflow job for this annotation

GitHub Actions / Build (x86_64-pc-windows-msvc)

file not found for module `data_streams_ui`
mod data_track;
mod logo_track;
mod rpc_ui;
Expand Down
14 changes: 11 additions & 3 deletions livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,26 @@ const VALIDATE_TIMEOUT: Duration = Duration::from_secs(3);
pub const PROTOCOL_VERSION: u32 = 17;

/// Capabilities the Rust SDK advertises to the SFU at connect time.
const CLIENT_CAPABILITIES: &[proto::client_info::Capability] =
&[proto::client_info::Capability::CapPacketTrailer];
///
/// `CapCompressionDeflateRaw` is always advertised because the SDK's deflate-raw codec
/// (flate2/miniz_oxide) is pure-Rust and compiled in unconditionally.
const CLIENT_CAPABILITIES: &[proto::client_info::Capability] = &[
proto::client_info::Capability::CapPacketTrailer,
proto::client_info::Capability::CapCompressionDeflateRaw,
];

/// Default value for `ClientInfo.client_protocol` when a participant has not
/// advertised one (treat as v1-only / no data-stream RPC support).
pub const CLIENT_PROTOCOL_DEFAULT: i32 = 0;
/// `ClientInfo.client_protocol` value indicating support for RPC v2 over data streams.
pub const CLIENT_PROTOCOL_DATA_STREAM_RPC: i32 = 1;
/// `ClientInfo.client_protocol` value indicating support for data streams v2
/// (inline single-packet sends; compression is gated separately via capabilities).
pub const CLIENT_PROTOCOL_DATA_STREAM_V2: i32 = 2;

/// The client protocol which is sent to other clients and indicates the set of apis that other
/// clients should assume this client supports.
const CLIENT_PROTOCOL_VERSION: i32 = CLIENT_PROTOCOL_DATA_STREAM_RPC;
const CLIENT_PROTOCOL_VERSION: i32 = CLIENT_PROTOCOL_DATA_STREAM_V2;

#[derive(Error, Debug)]
pub enum SignalError {
Expand Down
4 changes: 4 additions & 0 deletions livekit-ffi/src/conversion/data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ impl From<proto::StreamTextOptions> for StreamTextOptions {
reply_to_stream_id: options.reply_to_stream_id,
attached_stream_ids: options.attached_stream_ids,
generated: options.generated,
// Compression opt-out is not yet exposed over FFI; default to on.
compress: None,
Comment thread
1egoman marked this conversation as resolved.
Outdated
}
}
}
Expand All @@ -90,6 +92,8 @@ impl From<proto::StreamByteOptions> for StreamByteOptions {
name: options.name,
mime_type: options.mime_type,
total_length: options.total_length,
// Compression opt-out is not yet exposed over FFI; default to on.
compress: None,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion livekit-protocol/protocol
Submodule protocol updated 251 files
1 change: 1 addition & 0 deletions livekit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ semver = "1.0"
libloading = { version = "0.8.6" }
bytes = "1.10.1"
bmrng = "0.5.2"
flate2 = "1"
base64 = "0.22"
Comment on lines 53 to 56

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that flate2 is now a required dependency of livekit. Previously it was an optional dependency of livekit-api but that's it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is is worth doing yet, but it might be worth adding a "compression" feature to this crate to conditionally include this dep. It would be enabled by default, but would provide users who do not want compression to disable it at the feature level.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lean towards not doing it and adding it later if people wanted to opt out. But, I could be convinced otherwise if there's a really good use case. Also, in case it's relevant, flate2 is ~80kb.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern would be more how it affects compile time and binary size; probably insignificant but it might be worth checking with cargo build --timings and cargo bloat --release --crates (crate) respectively.


[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions livekit/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ pub use crate::{
},
id::*,
participant::{
ConnectionQuality, DisconnectReason, LocalParticipant, Participant, PerformRpcData,
RemoteParticipant, RpcError, RpcErrorCode, RpcInvocationData,
ClientCapability, ConnectionQuality, DisconnectReason, LocalParticipant, Participant,
Comment thread
1egoman marked this conversation as resolved.
Outdated
PerformRpcData, RemoteParticipant, RpcError, RpcErrorCode, RpcInvocationData,
},
publication::{LocalTrackPublication, RemoteTrackPublication, TrackPublication},
track::{
Expand Down
Loading
Loading