Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ serde_json = "1.0"
sha2 = "0.10"
shlex = "1.3"
thiserror = "2.0"
relaycast = "=1.0.0"
relaycast = { git = "https://github.com/AgentWorkforce/relaycast.git", rev = "0cb8ff8d2b96ba347ef0826d9422764d2dff0c71" }
tokio = { version = "1.44", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
Expand Down
69 changes: 31 additions & 38 deletions src/relaycast_ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use relaycast::{
agent::DmOptions, format_registration_error,
retry_agent_registration as sdk_retry_agent_registration, AgentClient, AgentRegistrationClient,
AgentRegistrationError, AgentRegistrationRetryOutcome, MessageListQuery, RelayCast,
RelayCastOptions, RelayError, ReleaseAgentRequest, WsClient, WsClientOptions, WsLifecycleEvent,
RelayCastOptions, ReleaseAgentRequest, WsClient, WsClientOptions, WsLifecycleEvent,
};
use serde_json::{json, Value};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -502,9 +502,11 @@ impl RelaycastHttpClient {
};

let config = relay
.workspace_stream_set(true)
.ensure_workspace_stream_enabled()
.await
.map_err(|error| anyhow::anyhow!("relaycast workspace_stream_set failed: {error}"))?;
.map_err(|error| {
anyhow::anyhow!("relaycast ensure_workspace_stream_enabled failed: {error}")
})?;
tracing::debug!(
enabled = config.enabled,
default_enabled = config.default_enabled,
Expand Down Expand Up @@ -548,30 +550,24 @@ impl RelaycastHttpClient {
return Ok(());
}
};
match agent_client.create_channel(request).await {
Ok(_) => {
tracing::info!(channel = %name, "created default channel");
}
Err(RelayError::Api { status: 409, .. }) => {
tracing::debug!(channel = %name, "default channel already exists");
match agent_client.ensure_channel_joined(request).await {
Ok(outcome) => {
if outcome.created {
tracing::info!(channel = %name, "created default channel");
} else {
tracing::debug!(channel = %name, "default channel already exists");
}
if outcome.joined {
tracing::info!(channel = %name, "broker joined default channel");
} else {
tracing::debug!(channel = %name, "broker already joined default channel");
}
}
Err(error) => {
tracing::warn!(channel = %name, error = %error, "failed to create default channel");
tracing::warn!(channel = %name, error = %error, "failed to ensure broker joined default channel");
continue;
}
}
// Join so the broker receives message.created WS events for this channel.
match agent_client.join_channel(name).await {
Ok(_) => {
tracing::info!(channel = %name, "broker joined default channel");
}
Err(RelayError::Api { status: 409, .. }) => {
tracing::debug!(channel = %name, "broker already joined default channel");
}
Err(error) => {
tracing::warn!(channel = %name, error = %error, "failed to join default channel");
}
}
}
Ok(())
}
Expand Down Expand Up @@ -611,24 +607,21 @@ impl RelaycastHttpClient {
topic: None,
metadata: None,
};
match agent_client.create_channel(request).await {
Ok(_) => tracing::info!(channel = %name, "created extra channel"),
Err(RelayError::Api { status: 409, .. }) => {
tracing::debug!(channel = %name, "extra channel already exists");
}
Err(error) => {
tracing::warn!(channel = %name, error = %error, "failed to create extra channel");
continue;
}
}
// Join the channel so the broker receives message.created WS events.
match agent_client.join_channel(name).await {
Ok(_) => tracing::info!(channel = %name, "broker joined extra channel"),
Err(RelayError::Api { status: 409, .. }) => {
tracing::debug!(channel = %name, "broker already joined extra channel");
match agent_client.ensure_channel_joined(request).await {
Ok(outcome) => {
if outcome.created {
tracing::info!(channel = %name, "created extra channel");
} else {
tracing::debug!(channel = %name, "extra channel already exists");
}
if outcome.joined {
tracing::info!(channel = %name, "broker joined extra channel");
} else {
tracing::debug!(channel = %name, "broker already joined extra channel");
}
}
Err(error) => {
tracing::warn!(channel = %name, error = %error, "failed to join extra channel");
tracing::warn!(channel = %name, error = %error, "failed to ensure broker joined extra channel");
}
}
}
Expand Down