Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
45c284d
feat: add Workflows support
connyay Jan 28, 2026
93f3c5a
clippy?
connyay Jan 28, 2026
d19624f
backout toolchain bump
connyay Jan 29, 2026
4536c0c
fix: wrap workflow JsFuture calls with SendFuture for axum compatibility
connyay Jan 29, 2026
b270455
Merge branch 'main' into cjh-workflow
guybedford Feb 21, 2026
95c8f7e
fix: wrap workflow callback with AssertUnwindSafe for panic-unwind co…
guybedford Feb 21, 2026
23c317e
fix: use JS NonRetryableError, async bindings, and retryable callbacks
connyay Feb 26, 2026
8c2e668
Merge branch 'main' into cjh-workflow
connyay Feb 26, 2026
446dc86
test NonRetryableError
connyay Feb 26, 2026
55ab563
Simplify workflow code: fix __wf_ handler leak, deduplicate test help…
connyay Mar 12, 2026
501c3c8
test: add wait_for_event/send_event workflow integration test
connyay Mar 12, 2026
e6a264b
Merge branch 'main' into cjh-workflow
connyay Mar 31, 2026
99b52f9
add test coverage for pause(), resume(), restart(), and terminate()
connyay Apr 1, 2026
92d6778
fix: resolve CI failures for clippy and panic-unwind
guybedford Apr 1, 2026
66d6fb8
Merge branch 'main' into cjh-workflow
guybedford Apr 1, 2026
84b0120
Merge branch 'main' into cjh-workflow
guybedford Apr 1, 2026
44b2bfd
Merge branch 'main' into cjh-workflow
guybedford Apr 1, 2026
7eaee7a
fix: wrap handler! async blocks in SendFuture to satisfy axum Send bound
guybedford Apr 1, 2026
187dad2
Merge branch 'main' into cjh-workflow
guybedford Apr 1, 2026
c89bee9
pr comments
connyay Apr 2, 2026
51c2b1d
use WorkflowSleepDuration everywhere
connyay Apr 2, 2026
834db2c
thread through WorkflowStepContext
connyay Apr 2, 2026
f2f4926
feat(workflow): support step context and ReadableStream returns
connyay Apr 21, 2026
44360ea
Merge branch 'main' into cjh-workflow
connyay Apr 27, 2026
b154754
tiny comment cleanup
connyay Apr 27, 2026
30f2d4e
refactor(workflow): typed Input/Output, fix serializer round-trips
connyay Apr 27, 2026
c68b874
cleanup workflow macro rustdoc
connyay Apr 27, 2026
16dba8e
Merge branch 'main' into cjh-workflow
connyay May 31, 2026
7a88382
fmt
connyay May 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 10 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ web-sys = { version = "0.3.90", features = [
"WritableStream",
"WritableStreamDefaultWriter",
] }
worker = { version = "0.7.5", path = "worker", features = ["queue", "d1", "axum", "timezone"] }
worker = { version = "0.7.5", path = "worker", features = ["queue", "d1", "axum", "timezone", "workflow"] }
worker-codegen = { path = "worker-codegen", version = "0.2.0" }
worker-macros = { version = "0.7.5", path = "worker-macros", features = ["queue"] }
worker-sys = { version = "0.7.5", path = "worker-sys", features = ["d1", "queue"] }
Expand Down
15 changes: 15 additions & 0 deletions examples/workflow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "workflow-example"
version = "0.1.0"
edition = "2021"

[package.metadata.release]
release = false

[lib]
crate-type = ["cdylib"]

[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
worker = { path = "../../worker", features = ["workflow"] }
182 changes: 182 additions & 0 deletions examples/workflow/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use serde::{Deserialize, Serialize};
use worker::*;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MyParams {
pub email: String,
pub name: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MyOutput {
pub message: String,
pub steps_completed: u32,
}

#[workflow]
pub struct MyWorkflow {
#[allow(dead_code)]
env: Env,
}

impl WorkflowEntrypoint for MyWorkflow {
fn new(_ctx: Context, env: Env) -> Self {
Self { env }
}

async fn run(
&self,
event: WorkflowEvent<serde_json::Value>,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we come up with a better typing solution here? It is quite restrictive to rely so heavily on serde. Can we instead make this generic with the bound being something something as simple as IntoWasmAbi?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does JsValue work here?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean specifically is making this a generic argument T so that run<T> can take any type with the IntoWasmAbi trait from wasm bindgen?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually with the Wasm Bindgen generics you could just make T: JsGeneric on the new JsGeneric trait.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JsGeneric is a bit awkward to work with here.

/// The event passed to a workflow's run method.
#[derive(Debug, Clone)]
pub struct WorkflowEvent<T = JsValue> {
    pub payload: T,
    pub timestamp: crate::Date,
    pub instance_id: String,
}

impl<T: JsGeneric> WorkflowEvent<T> {
    pub fn from_js(value: JsValue) -> Result<Self> {
        let payload = get_property(&value, "payload")?;
        Ok(Self {
            // SAFETY: JsGeneric guarantees T has ErasableGeneric<Repr = JsValue>,
            // so T and JsValue have an identical memory layout.
            payload: unsafe {
                core::mem::transmute_copy(&core::mem::ManuallyDrop::new(payload))
            },
            timestamp: get_timestamp_property(&value, "timestamp")?,
            instance_id: get_string_property(&value, "instanceId")?,
        })
    }
}

JsCast is slightly less awkward:

/// The event passed to a workflow's run method.
#[derive(Debug, Clone)]
pub struct WorkflowEvent<T = JsValue> {
    pub payload: T,
    pub timestamp: crate::Date,
    pub instance_id: String,
}

impl<T: JsCast> WorkflowEvent<T> {
    pub fn from_js(value: JsValue) -> Result<Self> {
        Ok(Self {
            payload: get_property(&value, "payload")?
                .dyn_into()
                .map_err(|_| crate::Error::JsError("payload is not the expected type".into()))?,
            timestamp: get_timestamp_property(&value, "timestamp")?,
            instance_id: get_string_property(&value, "instanceId")?,
        })
    }
}

thoughts/opinions? or am I missing something/doing something wrong with JsGeneric?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I likely need to properly follow the API design details here, I will aim to do a thorough review next week after this current release. Sorry for the delay and thanks for the patience as always!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good! Thank you!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dog fooded this branch a bit more in another project and 30f2d4e is feeling pretty dang ergonomic to me. I know this doubles down on serde, when you were trying to not rely so heavily on it :-/

I'm still happy to try other directions.

step: WorkflowStep,
) -> Result<serde_json::Value> {
console_log!("Workflow started with instance ID: {}", event.instance_id);

let params: MyParams =
serde_json::from_value(event.payload).map_err(|e| Error::RustError(e.to_string()))?;

let email_for_validation = params.email.clone();
step.do_with_config(
"validate-params",
StepConfig {
retries: Some(RetryConfig {
limit: 3,
delay: "1 second".to_string(),
backoff: None,
}),
timeout: None,
},
move || {
let email = email_for_validation.clone();
async move {
if !email.contains('@') {
return Err(NonRetryableError::new("invalid email address").into());
}
Ok(serde_json::json!({ "valid": true }))
}
},
)
.await?;

let name_for_step1 = params.name.clone();
let step1_result = step
.do_("initial-processing", move || {
let name = name_for_step1.clone();
async move {
console_log!("Processing for user: {}", name);
Ok(serde_json::json!({
"processed": true,
"user": name
}))
}
})
.await?;

console_log!("Step 1 completed: {:?}", step1_result);

console_log!("Step 2: Sleeping for 10 seconds...");
step.sleep("wait-for-processing", "10 seconds").await?;

let email_for_step3 = params.email.clone();
let notification_result = step
.do_with_config(
"send-notification",
StepConfig {
retries: Some(RetryConfig {
limit: 3,
delay: "5 seconds".to_string(),
backoff: Some(Backoff::Exponential),
}),
timeout: Some("1 minute".to_string()),
},
move || {
let email = email_for_step3.clone();
async move {
console_log!("Sending notification to: {}", email);
if js_sys::Math::random() < 0.5 {
return Err("notification service temporarily unavailable".into());
}
Ok(serde_json::json!({
"notification_sent": true,
"email": email
}))
}
},
)
.await?;

console_log!("Step 3 completed: {:?}", notification_result);

let output = MyOutput {
message: format!("Workflow completed for {}", params.name),
steps_completed: 3,
};

Ok(serde_json::to_value(output).unwrap())
}
}

#[event(fetch)]
async fn fetch(mut req: Request, env: Env, _ctx: Context) -> Result<Response> {
let url = req.url()?;
let path = url.path();
let workflow = env.workflow("MY_WORKFLOW")?;

match (req.method(), path) {
(Method::Post, "/workflow") => {
let params: MyParams = req.json().await?;

let instance = workflow
.create(Some(CreateOptions {
params: Some(params),
..Default::default()
}))
.await?;

Response::from_json(&serde_json::json!({
"id": instance.id()?,
"message": "Workflow created"
}))
}

(Method::Get, path) if path.starts_with("/workflow/") => {
let id = path.trim_start_matches("/workflow/");
let instance = workflow.get(id).await?;
let status = instance.status().await?;

Response::from_json(&serde_json::json!({
"id": instance.id()?,
"status": format!("{:?}", status.status),
"error": status.error,
"output": status.output
}))
}

(Method::Post, path) if path.starts_with("/workflow/") && path.ends_with("/pause") => {
let id = path
.trim_start_matches("/workflow/")
.trim_end_matches("/pause");
let instance = workflow.get(id).await?;
instance.pause().await?;

Response::from_json(&serde_json::json!({
"id": instance.id()?,
"message": "Workflow paused"
}))
}

(Method::Post, path) if path.starts_with("/workflow/") && path.ends_with("/resume") => {
let id = path
.trim_start_matches("/workflow/")
.trim_end_matches("/resume");
let instance = workflow.get(id).await?;
instance.resume().await?;

Response::from_json(&serde_json::json!({
"id": instance.id()?,
"message": "Workflow resumed"
}))
}

_ => Response::error("Not Found", 404),
}
}
13 changes: 13 additions & 0 deletions examples/workflow/wrangler.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name = "workflow-example"
main = "build/worker/shim.mjs"
compatibility_date = "2024-10-22"

[build]
# For development: use local worker-build binary
# For production: command = "cargo install -q worker-build && worker-build --release"
command = "RUSTFLAGS='--cfg=web_sys_unstable_apis' ../../target/release/worker-build --release"

[[workflows]]
name = "my-workflow"
binding = "MY_WORKFLOW"
class_name = "MyWorkflow"
Loading
Loading