-
Notifications
You must be signed in to change notification settings - Fork 415
feat: add Workflows support #918
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
connyay
wants to merge
29
commits into
cloudflare:main
Choose a base branch
from
connyay:cjh-workflow
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
45c284d
feat: add Workflows support
connyay 93f3c5a
clippy?
connyay d19624f
backout toolchain bump
connyay 4536c0c
fix: wrap workflow JsFuture calls with SendFuture for axum compatibility
connyay b270455
Merge branch 'main' into cjh-workflow
guybedford 95c8f7e
fix: wrap workflow callback with AssertUnwindSafe for panic-unwind co…
guybedford 23c317e
fix: use JS NonRetryableError, async bindings, and retryable callbacks
connyay 8c2e668
Merge branch 'main' into cjh-workflow
connyay 446dc86
test NonRetryableError
connyay 55ab563
Simplify workflow code: fix __wf_ handler leak, deduplicate test help…
connyay 501c3c8
test: add wait_for_event/send_event workflow integration test
connyay e6a264b
Merge branch 'main' into cjh-workflow
connyay 99b52f9
add test coverage for pause(), resume(), restart(), and terminate()
connyay 92d6778
fix: resolve CI failures for clippy and panic-unwind
guybedford 66d6fb8
Merge branch 'main' into cjh-workflow
guybedford 84b0120
Merge branch 'main' into cjh-workflow
guybedford 44b2bfd
Merge branch 'main' into cjh-workflow
guybedford 7eaee7a
fix: wrap handler! async blocks in SendFuture to satisfy axum Send bound
guybedford 187dad2
Merge branch 'main' into cjh-workflow
guybedford c89bee9
pr comments
connyay 51c2b1d
use WorkflowSleepDuration everywhere
connyay 834db2c
thread through WorkflowStepContext
connyay f2f4926
feat(workflow): support step context and ReadableStream returns
connyay 44360ea
Merge branch 'main' into cjh-workflow
connyay b154754
tiny comment cleanup
connyay 30f2d4e
refactor(workflow): typed Input/Output, fix serializer round-trips
connyay c68b874
cleanup workflow macro rustdoc
connyay 16dba8e
Merge branch 'main' into cjh-workflow
connyay 7a88382
fmt
connyay File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| [package] | ||
| name = "workflow-example" | ||
| version = "0.1.0" | ||
| edition = "2021" | ||
|
|
||
| [package.metadata.release] | ||
| release = false | ||
|
|
||
| [lib] | ||
| crate-type = ["cdylib"] | ||
|
|
||
| [dependencies] | ||
| serde = { version = "1", features = ["derive"] } | ||
| serde_json = "1" | ||
| worker = { path = "../../worker", features = ["workflow"] } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,182 @@ | ||
| use serde::{Deserialize, Serialize}; | ||
| use worker::*; | ||
|
|
||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| pub struct MyParams { | ||
| pub email: String, | ||
| pub name: String, | ||
| } | ||
|
|
||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| pub struct MyOutput { | ||
| pub message: String, | ||
| pub steps_completed: u32, | ||
| } | ||
|
|
||
| #[workflow] | ||
| pub struct MyWorkflow { | ||
| #[allow(dead_code)] | ||
| env: Env, | ||
| } | ||
|
|
||
| impl WorkflowEntrypoint for MyWorkflow { | ||
| fn new(_ctx: Context, env: Env) -> Self { | ||
| Self { env } | ||
| } | ||
|
|
||
| async fn run( | ||
| &self, | ||
| event: WorkflowEvent<serde_json::Value>, | ||
| step: WorkflowStep, | ||
| ) -> Result<serde_json::Value> { | ||
| console_log!("Workflow started with instance ID: {}", event.instance_id); | ||
|
|
||
| let params: MyParams = | ||
| serde_json::from_value(event.payload).map_err(|e| Error::RustError(e.to_string()))?; | ||
|
|
||
| let email_for_validation = params.email.clone(); | ||
| step.do_with_config( | ||
| "validate-params", | ||
| StepConfig { | ||
| retries: Some(RetryConfig { | ||
| limit: 3, | ||
| delay: "1 second".to_string(), | ||
| backoff: None, | ||
| }), | ||
| timeout: None, | ||
| }, | ||
| move || { | ||
| let email = email_for_validation.clone(); | ||
| async move { | ||
| if !email.contains('@') { | ||
| return Err(NonRetryableError::new("invalid email address").into()); | ||
| } | ||
| Ok(serde_json::json!({ "valid": true })) | ||
| } | ||
| }, | ||
| ) | ||
| .await?; | ||
|
|
||
| let name_for_step1 = params.name.clone(); | ||
| let step1_result = step | ||
| .do_("initial-processing", move || { | ||
| let name = name_for_step1.clone(); | ||
| async move { | ||
| console_log!("Processing for user: {}", name); | ||
| Ok(serde_json::json!({ | ||
| "processed": true, | ||
| "user": name | ||
| })) | ||
| } | ||
| }) | ||
| .await?; | ||
|
|
||
| console_log!("Step 1 completed: {:?}", step1_result); | ||
|
|
||
| console_log!("Step 2: Sleeping for 10 seconds..."); | ||
| step.sleep("wait-for-processing", "10 seconds").await?; | ||
|
|
||
| let email_for_step3 = params.email.clone(); | ||
| let notification_result = step | ||
| .do_with_config( | ||
| "send-notification", | ||
| StepConfig { | ||
| retries: Some(RetryConfig { | ||
| limit: 3, | ||
| delay: "5 seconds".to_string(), | ||
| backoff: Some(Backoff::Exponential), | ||
| }), | ||
| timeout: Some("1 minute".to_string()), | ||
| }, | ||
| move || { | ||
| let email = email_for_step3.clone(); | ||
| async move { | ||
| console_log!("Sending notification to: {}", email); | ||
| if js_sys::Math::random() < 0.5 { | ||
| return Err("notification service temporarily unavailable".into()); | ||
| } | ||
| Ok(serde_json::json!({ | ||
| "notification_sent": true, | ||
| "email": email | ||
| })) | ||
| } | ||
| }, | ||
| ) | ||
| .await?; | ||
|
|
||
| console_log!("Step 3 completed: {:?}", notification_result); | ||
|
|
||
| let output = MyOutput { | ||
| message: format!("Workflow completed for {}", params.name), | ||
| steps_completed: 3, | ||
| }; | ||
|
|
||
| Ok(serde_json::to_value(output).unwrap()) | ||
| } | ||
| } | ||
|
|
||
| #[event(fetch)] | ||
| async fn fetch(mut req: Request, env: Env, _ctx: Context) -> Result<Response> { | ||
| let url = req.url()?; | ||
| let path = url.path(); | ||
| let workflow = env.workflow("MY_WORKFLOW")?; | ||
|
|
||
| match (req.method(), path) { | ||
| (Method::Post, "/workflow") => { | ||
| let params: MyParams = req.json().await?; | ||
|
|
||
| let instance = workflow | ||
| .create(Some(CreateOptions { | ||
| params: Some(params), | ||
| ..Default::default() | ||
| })) | ||
| .await?; | ||
|
|
||
| Response::from_json(&serde_json::json!({ | ||
| "id": instance.id()?, | ||
| "message": "Workflow created" | ||
| })) | ||
| } | ||
|
|
||
| (Method::Get, path) if path.starts_with("/workflow/") => { | ||
| let id = path.trim_start_matches("/workflow/"); | ||
| let instance = workflow.get(id).await?; | ||
| let status = instance.status().await?; | ||
|
|
||
| Response::from_json(&serde_json::json!({ | ||
| "id": instance.id()?, | ||
| "status": format!("{:?}", status.status), | ||
| "error": status.error, | ||
| "output": status.output | ||
| })) | ||
| } | ||
|
|
||
| (Method::Post, path) if path.starts_with("/workflow/") && path.ends_with("/pause") => { | ||
| let id = path | ||
| .trim_start_matches("/workflow/") | ||
| .trim_end_matches("/pause"); | ||
| let instance = workflow.get(id).await?; | ||
| instance.pause().await?; | ||
|
|
||
| Response::from_json(&serde_json::json!({ | ||
| "id": instance.id()?, | ||
| "message": "Workflow paused" | ||
| })) | ||
| } | ||
|
|
||
| (Method::Post, path) if path.starts_with("/workflow/") && path.ends_with("/resume") => { | ||
| let id = path | ||
| .trim_start_matches("/workflow/") | ||
| .trim_end_matches("/resume"); | ||
| let instance = workflow.get(id).await?; | ||
| instance.resume().await?; | ||
|
|
||
| Response::from_json(&serde_json::json!({ | ||
| "id": instance.id()?, | ||
| "message": "Workflow resumed" | ||
| })) | ||
| } | ||
|
|
||
| _ => Response::error("Not Found", 404), | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| name = "workflow-example" | ||
| main = "build/worker/shim.mjs" | ||
| compatibility_date = "2024-10-22" | ||
|
|
||
| [build] | ||
| # For development: use local worker-build binary | ||
| # For production: command = "cargo install -q worker-build && worker-build --release" | ||
| command = "RUSTFLAGS='--cfg=web_sys_unstable_apis' ../../target/release/worker-build --release" | ||
|
|
||
| [[workflows]] | ||
| name = "my-workflow" | ||
| binding = "MY_WORKFLOW" | ||
| class_name = "MyWorkflow" |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we come up with a better typing solution here? It is quite restrictive to rely so heavily on serde. Can we instead make this generic with the bound being something something as simple as IntoWasmAbi?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does JsValue work here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean specifically is making this a generic argument
Tso thatrun<T>can take any type with the IntoWasmAbi trait from wasm bindgen?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually with the Wasm Bindgen generics you could just make
T: JsGenericon the newJsGenerictrait.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JsGenericis a bit awkward to work with here.JsCastis slightly less awkward:thoughts/opinions? or am I missing something/doing something wrong with JsGeneric?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I likely need to properly follow the API design details here, I will aim to do a thorough review next week after this current release. Sorry for the delay and thanks for the patience as always!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All good! Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dog fooded this branch a bit more in another project and 30f2d4e is feeling pretty dang ergonomic to me. I know this doubles down on serde, when you were trying to not rely so heavily on it :-/
I'm still happy to try other directions.