From 362a06e15c4521f5cae2666cff0fef92ae3812d4 Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Fri, 19 Jun 2026 11:52:01 -0300 Subject: [PATCH 1/6] Add live query kill() and expose the live-query UUID up front MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `selectLive` now starts the live query via the public `LIVE SELECT` query path instead of the `.select(table).live()` builder (whose query id is `pub(crate)`). This lets the driver capture the live-query UUID eagerly from the statement result and expose it via the new `LiveStream.getQueryId()` — available before any notification arrives. Subscription errors (e.g. unknown table) are still surfaced eagerly. Add `Surreal.kill(String)` / `Surreal.kill(UUID)` to terminate a live query by id via a validated `KILL` statement, and enable the previously `@Disabled` `killLiveQuery_byQueryId` test plus new embedded and WebSocket coverage. Note: `kill()` terminates the query (notifications stop) but does not close a client `LiveStream` on either engine — the KILL `Killed` notification carries `session: None` and is dropped by the routers — so `LiveStream.close()` is what releases a local stream. Documented in the `kill` javadoc and asserted by the tests. --- src/main/java/com/surrealdb/LiveStream.java | 24 ++- src/main/java/com/surrealdb/Surreal.java | 39 +++- src/main/rust/surreal.rs | 168 +++++++++++------- .../java/com/surrealdb/LiveQueryTests.java | 84 ++++++++- .../surrealdb/LiveQueryWebSocketTests.java | 53 ++++++ 5 files changed, 295 insertions(+), 73 deletions(-) diff --git a/src/main/java/com/surrealdb/LiveStream.java b/src/main/java/com/surrealdb/LiveStream.java index 534582b2..2b84c84d 100644 --- a/src/main/java/com/surrealdb/LiveStream.java +++ b/src/main/java/com/surrealdb/LiveStream.java @@ -39,8 +39,15 @@ public class LiveStream implements AutoCloseable { */ private volatile long handle; - LiveStream(long handle) { + /** + * The live query UUID. Available immediately (before the first notification) + * and unaffected by {@link #close()}. + */ + private final String queryId; + + LiveStream(long handle, String queryId) { this.handle = handle; + this.queryId = queryId; } /** @@ -80,6 +87,21 @@ public void close() { } } + /** + * Returns the UUID of this live query. + * + *

+ * The id is available immediately after {@link Surreal#selectLive(String)} + * returns — before any notification arrives — and can be passed to + * {@link Surreal#kill(String)} to stop the live query. It is also carried by + * every {@link LiveNotification} (see {@link LiveNotification#getQueryId()}). + * + * @return the live query UUID as a string + */ + public String getQueryId() { + return queryId; + } + private static native LiveNotification nextNative(long handle); private static native void releaseNative(long handle); diff --git a/src/main/java/com/surrealdb/Surreal.java b/src/main/java/com/surrealdb/Surreal.java index 615288c3..a65730c0 100644 --- a/src/main/java/com/surrealdb/Surreal.java +++ b/src/main/java/com/surrealdb/Surreal.java @@ -147,7 +147,9 @@ private static native long upsertRecordIdRangeValue(long ptr, String table, long private static native boolean importSql(long ptr, String path); - private static native long selectLive(long ptr, String table); + private static native LiveStream selectLive(long ptr, String table); + + private static native void kill(long ptr, String queryId); @Override final String toString(long ptr) { @@ -284,7 +286,40 @@ public boolean importSql(String path) { * the subscription fails */ public LiveStream selectLive(String table) { - return new LiveStream(selectLive(getPtr(), table)); + return selectLive(getPtr(), table); + } + + /** + * Stops the live query with the given id. After the server processes the kill, + * the query delivers no further notifications. + * + *

+ * The id is the UUID returned by {@link LiveStream#getQueryId()} (or carried by + * each {@link LiveNotification}). A kill does not by itself close a local + * {@link LiveStream}: a thread blocked in {@link LiveStream#next()} keeps + * waiting (no further notifications arrive, on both the embedded and WebSocket + * engines). Call {@link LiveStream#close()} to release a local stream and + * unblock {@code next()}. Use {@code kill} for a live query you only hold the + * id of (e.g. one started elsewhere or read from a notification). + * + * @param queryId + * the live query UUID to terminate + * @throws SurrealException + * if the id is not a valid UUID or the kill fails + */ + public void kill(String queryId) { + kill(getPtr(), queryId); + } + + /** + * Stops the live query with the given id. + * + * @param queryId + * the live query UUID to terminate + * @see #kill(String) + */ + public void kill(java.util.UUID queryId) { + kill(getPtr(), queryId.toString()); } /** diff --git a/src/main/rust/surreal.rs b/src/main/rust/surreal.rs index 451770eb..3d12400c 100644 --- a/src/main/rust/surreal.rs +++ b/src/main/rust/surreal.rs @@ -1,6 +1,7 @@ use std::collections::BTreeMap; use std::path::Path; use std::ptr::null_mut; +use std::str::FromStr; use std::sync::Arc; use crate::error::SurrealError; @@ -22,7 +23,7 @@ use std::ops::Bound; use std::result::Result as StdResult; use surrealdb::engine::any::Any; use surrealdb::opt::auth::{Database, Namespace, Record as AuthRecord, Root}; -use surrealdb::types::{RecordId, RecordIdKey, RecordIdKeyRange, SurrealValue, ToSql, Value}; +use surrealdb::types::{RecordId, RecordIdKey, RecordIdKeyRange, SurrealValue, ToSql, Uuid, Value}; use surrealdb::{IndexedResults, Result, Surreal}; #[no_mangle] @@ -514,74 +515,89 @@ pub extern "system" fn Java_com_surrealdb_Surreal_run<'local>( /// JNI implementation of `Surreal.selectLive(long ptr, String table)`. /// -/// Sets up a live query on `table` and returns a `LiveStreamChannel` handle -/// that the Java `LiveStream` wrapper reads from. +/// Starts a live query on `table` and returns a fully-constructed Java +/// `LiveStream` object carrying both the native `LiveStreamChannel` handle and +/// the live-query UUID. /// /// ## Architecture /// /// ```text -/// Java thread Background thread SurrealDB engine -/// ─────────── ───────────────── ──────────────── +/// Java thread Background thread SurrealDB engine +/// ─────────── ───────────────── ──────────────── /// selectLive() -/// ├─ spawn ──────▶ .select(table).live().await ──▶ subscribe -/// │ │ -/// │ ready_rx.recv() │ ready_tx.send(Ok/Err) -/// │◀──────────────────┤ -/// │ │ -/// ▼ ▼ -/// return handle loop { select! { notification ──▶ tx_thread.send() } } -/// │ -/// nextNative() │ -/// rx.recv() ◀─────────────────┘ +/// ├─ query("LIVE SELECT …").await ──────────────────────────▶ subscribe + UUID +/// ├─ res.take(0) ──▶ live-query UUID (surfaces errors eagerly) +/// ├─ res.stream(0) ──▶ QueryStream +/// ├─ spawn ─────────────────────────────▶ loop { select! { notif ─▶ tx_thread.send() } } +/// ▼ │ +/// new LiveStream(handle, uuid) │ +/// nextNative() │ +/// rx.recv() ◀──────────────────────────────────────┘ /// ``` /// -/// A dedicated OS thread runs `block_on` on the shared tokio runtime to drive -/// the live-query stream. Notifications are forwarded through an unbounded -/// `async_channel` to the Java side, which reads them via `nextNative`. -/// -/// ## Readiness handshake -/// -/// The background thread signals via a `std::sync::mpsc` channel once the -/// subscription is established (`Ok(())`) or has failed (`Err(e)`). -/// `selectLive` blocks on this signal so that errors (e.g. table does not -/// exist) are thrown at call site instead of being deferred to `next()`. +/// Unlike the `.select(table).live()` builder (whose query id is private), the +/// raw `LIVE SELECT` runs synchronously on the calling thread, so subscription +/// errors (e.g. the table does not exist) are surfaced eagerly via `take(0)` +/// rather than deferred to `next()`. The statement returns a `Value::Uuid`, +/// which we read up front so it is available from `LiveStream.getQueryId()` +/// before any notification arrives. A dedicated OS thread then drives the +/// notification stream on the shared tokio runtime, forwarding notifications +/// through an unbounded `async_channel` that the Java side reads via +/// `nextNative`. `take(0)` and `stream(0)` read independent maps, so reading +/// the UUID does not consume the stream. #[no_mangle] pub extern "system" fn Java_com_surrealdb_Surreal_selectLive<'local>( mut env: EnvUnowned<'local>, _class: JClass<'local>, ptr: jlong, table: JString<'local>, -) -> jlong { +) -> jobject { with_env_body!(env, env, { - let surreal = get_surreal_ref!(env, ptr, || 0); - let table = get_rust_string!(env, &table, || 0); + let surreal = get_surreal_ref!(env, ptr, null_mut); + let table = get_rust_string!(env, &table, null_mut); + + // Run the LIVE SELECT synchronously on the calling thread. type::table($tb) + // binds the table name safely (no SQL injection). + let mut params = BTreeMap::new(); + params.insert("tb".to_string(), Value::String(table)); + let res = + surrealdb_query::(surreal, "LIVE SELECT * FROM type::table($tb)", Some(params)); + let mut res = check_query_result!(env, res, null_mut); + + // The statement result at index 0 is the live-query UUID. Reading it via + // take(0) also surfaces subscription errors (e.g. the table does not exist). + let uuid_str = match res.take::(0) { + Ok(Value::Uuid(uuid)) => uuid.to_string(), + Ok(other) => { + return SurrealError::SurrealDBJni(format!( + "LIVE SELECT did not return a UUID: {}", + other.to_sql() + )) + .exception(env, null_mut); + } + Err(e) => return SurrealError::SurrealDB(e).exception(env, null_mut), + }; + + // The notification stream lives in a separate map from the results, so the + // take(0) above does not disturb it. + let qstream = match res.stream::(0) { + Ok(s) => s, + Err(e) => return SurrealError::SurrealDB(e).exception(env, null_mut), + }; // Notification channel: background thread produces, nextNative consumes. let (tx, rx) = async_channel::unbounded(); // Shutdown channel: dropping shutdown_tx signals the background thread to exit. let (shutdown_tx, shutdown_rx) = async_channel::bounded::<()>(1); - // Readiness channel: background thread confirms subscription before we return. - let (ready_tx, ready_rx) = - std::sync::mpsc::channel::>(); let tx_thread = tx.clone(); - let surreal_clone = surreal.clone(); let join_handle = std::thread::spawn(move || { TOKIO_RUNTIME.block_on(async move { - let mut stream = match surreal_clone.select(table).live().await { - Ok(s) => { - let _ = ready_tx.send(Ok(())); - s - } - Err(e) => { - let _ = ready_tx.send(Err(e)); - return; - } - }; + let mut qstream = qstream; loop { tokio::select! { _ = shutdown_rx.recv() => break, - item = stream.next() => match item { + item = qstream.next() => match item { Some(i) => { let _ = tx_thread.send(i).await; } @@ -592,29 +608,59 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectLive<'local>( }); }); - // Block until the subscription is confirmed or fails. - match ready_rx.recv() { - Ok(Ok(())) => {} - Ok(Err(e)) => { - let _ = join_handle.join(); - return SurrealError::from(e).exception(env, || 0); - } - Err(_) => { - let _ = join_handle.join(); - return SurrealError::SurrealDBJni( - "Live query background thread exited unexpectedly".to_string(), - ) - .exception(env, || 0); - } - } - let recv_mutex = std::sync::Arc::new(parking_lot::Mutex::new(())); - JniTypes::new_live_stream(( + let handle = JniTypes::new_live_stream(( recv_mutex, parking_lot::Mutex::new(Some(join_handle)), parking_lot::Mutex::new(Some(shutdown_tx)), parking_lot::Mutex::new(Some(rx)), - )) + )); + + // Construct and return a LiveStream(handle, queryId), mirroring how live.rs + // builds a LiveNotification. + let uuid_raw = new_string!(env, uuid_str, null_mut); + let uuid_jstr = unsafe { JObject::from_raw(env, uuid_raw) }; + let class = match env.find_class(jni_str!("com/surrealdb/LiveStream")) { + Ok(c) => c, + Err(e) => return SurrealError::from(e).exception(env, null_mut), + }; + let args = [JValue::Long(handle), JValue::Object(&uuid_jstr)]; + match env.new_object(class, jni_sig!("(JLjava/lang/String;)V"), &args) { + Ok(obj) => obj.into_raw(), + Err(e) => SurrealError::from(e).exception(env, null_mut), + } + }) +} + +/// JNI implementation of `Surreal.kill(long ptr, String queryId)`. +/// +/// Terminates the live query identified by `queryId` (a UUID string, e.g. from +/// `LiveStream.getQueryId()` or `LiveNotification.getQueryId()`) by running a +/// `KILL` statement on the same connection. After the server processes the +/// kill, the associated `LiveStream` ends and `LiveStream.next()` returns +/// empty. An invalid UUID or a connection failure is surfaced as a +/// `SurrealException`. +#[no_mangle] +pub extern "system" fn Java_com_surrealdb_Surreal_kill<'local>( + mut env: EnvUnowned<'local>, + _class: JClass<'local>, + ptr: jlong, + query_id: JString<'local>, +) { + with_env_body!(env, env, { + let surreal = get_surreal_ref!(env, ptr, || ()); + let query_id = get_rust_string!(env, &query_id, || ()); + let uuid = match Uuid::from_str(&query_id) { + Ok(uuid) => uuid, + Err(_) => { + return SurrealError::SurrealDBJni(format!("Invalid live query id: {query_id}")) + .exception(env, || ()); + } + }; + let mut params = BTreeMap::new(); + params.insert("id".to_string(), Value::Uuid(uuid)); + let res = surrealdb_query::(surreal, "KILL $id", Some(params)); + let _ = check_query_result!(env, res, || ()); }) } diff --git a/src/test/java/com/surrealdb/LiveQueryTests.java b/src/test/java/com/surrealdb/LiveQueryTests.java index 2a84db0b..2d54999f 100644 --- a/src/test/java/com/surrealdb/LiveQueryTests.java +++ b/src/test/java/com/surrealdb/LiveQueryTests.java @@ -5,6 +5,7 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -13,7 +14,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import com.surrealdb.pojos.Person; @@ -279,15 +279,81 @@ void selectLiveOnNonExistentTableThrowsImmediately() { } /** - * Placeholder for future Surreal.kill(liveQueryId) support. The query ID is - * available from {@link LiveNotification#getQueryId()}, but the Java client - * does not yet expose kill(). Use {@link LiveStream#close()} to stop a live - * query. + * The live query UUID is available from {@link LiveStream#getQueryId()} + * immediately, before any notification arrives, and is a valid UUID. */ @Test - @Disabled("Surreal.kill(liveQueryId) not yet in Java API") - void killLiveQuery_byQueryId() { - // When kill(uuid) is added: start live query, get queryId from first - // notification or API, call surreal.kill(queryId), assert stream ends. + void selectLiveExposesQueryIdUpFront() { + try (Surreal surreal = new Surreal()) { + surreal.connect("memory").useNs("test").useDb("test"); + surreal.query("DEFINE TABLE person SCHEMALESS"); + try (LiveStream stream = surreal.selectLive("person")) { + String queryId = stream.getQueryId(); + assertNotNull(queryId, "Live query id should be available before any notification"); + // Round-trips through java.util.UUID, i.e. it is a canonical UUID string. + assertEquals(queryId, java.util.UUID.fromString(queryId).toString()); + } + } + } + + /** + * {@link Surreal#kill(String)} terminates a live query by its id, after which + * the query delivers no further notifications. + * + *

+ * The kill does not push a "killed" event to an existing client stream (on + * either the embedded or WebSocket engine), so {@link LiveStream#next()} keeps + * blocking rather than returning empty; use {@link LiveStream#close()} to + * release a local stream. Here we assert the contract that matters: no + * notifications are delivered after the kill. + */ + @Test + void killLiveQuery_byQueryId() throws Exception { + AtomicReference error = new AtomicReference<>(); + AtomicInteger count = new AtomicInteger(); + CountDownLatch consuming = new CountDownLatch(1); + + try (Surreal surreal = new Surreal()) { + surreal.connect("memory").useNs("test").useDb("test"); + surreal.query("DEFINE TABLE person SCHEMALESS"); + + try (LiveStream stream = surreal.selectLive("person")) { + String queryId = stream.getQueryId(); + assertNotNull(queryId, "Live query id should be available up front"); + + Thread consumer = new Thread(() -> { + try { + consuming.countDown(); + Optional n; + while ((n = stream.next()).isPresent()) { + count.incrementAndGet(); + } + } catch (Throwable t) { + error.set(t); + } + }); + consumer.setDaemon(true); + consumer.start(); + + assertTrue(consuming.await(2, TimeUnit.SECONDS), "Consumer thread did not start in time"); + Thread.sleep(500); + + // A create before the kill is delivered. + surreal.create(new RecordId("person", 1), Helpers.tobie); + Thread.sleep(500); + assertEquals(1, count.get(), "Notification before kill should be delivered"); + + // After the kill, a further create must NOT be delivered. + surreal.kill(queryId); + Thread.sleep(300); + surreal.create(new RecordId("person", 2), Helpers.jaime); + Thread.sleep(800); + assertEquals(1, count.get(), "No notifications should be delivered after kill"); + } + + if (error.get() != null) { + fail("next() threw an exception: " + error.get()); + } + } } } diff --git a/src/test/java/com/surrealdb/LiveQueryWebSocketTests.java b/src/test/java/com/surrealdb/LiveQueryWebSocketTests.java index 5e6b18a9..a60a8e73 100644 --- a/src/test/java/com/surrealdb/LiveQueryWebSocketTests.java +++ b/src/test/java/com/surrealdb/LiveQueryWebSocketTests.java @@ -15,6 +15,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assumptions.assumeTrue; import org.junit.jupiter.api.Test; @@ -228,4 +229,56 @@ void selectLive_overWebSocket_close_unblocksNext() throws Exception { assertFalse(got.get().isPresent(), "next() should return empty after close()"); } } + + /** + * {@link Surreal#kill(String)} over a WebSocket connection terminates the live + * query: a create before the kill is delivered, one after is not. (As on the + * embedded engine, the kill does not close the client stream; {@code close()} + * does that.) + */ + @Test + void selectLive_overWebSocket_kill_stopsNotifications() throws Exception { + Surreal surreal = tryConnectWs(); + assumeTrue(surreal != null, "SurrealDB not reachable at " + WS_URL); + java.util.concurrent.atomic.AtomicInteger count = new java.util.concurrent.atomic.AtomicInteger(); + AtomicReference err = new AtomicReference<>(); + CountDownLatch started = new CountDownLatch(1); + try (Surreal s = surreal) { + s.query("REMOVE TABLE IF EXISTS ws_person; DEFINE TABLE ws_person SCHEMALESS"); + try (LiveStream stream = s.selectLive("ws_person")) { + String queryId = stream.getQueryId(); + assertNotNull(queryId, "Live query id should be available up front"); + Thread consumer = new Thread(() -> { + try { + started.countDown(); + Optional n; + while ((n = stream.next()).isPresent()) { + count.incrementAndGet(); + } + } catch (Throwable t) { + err.set(t); + } + }); + consumer.setDaemon(true); + consumer.start(); + started.await(2, TimeUnit.SECONDS); + Thread.sleep(300); + + // A create before the kill is delivered. + s.create(new RecordId("ws_person", 1), Helpers.tobie); + Thread.sleep(700); + assertEquals(1, count.get(), "Notification before kill should be delivered over WS"); + + // After the kill, a further create must NOT be delivered. + s.kill(queryId); + Thread.sleep(300); + s.create(new RecordId("ws_person", 2), Helpers.jaime); + Thread.sleep(900); + assertEquals(1, count.get(), "No notifications should be delivered after kill over WS"); + } + if (err.get() != null) { + fail("next() threw an exception: " + err.get()); + } + } + } } From 54c29e067dda0bcb96457e204584af98f3e5cb92 Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Fri, 19 Jun 2026 12:02:58 -0300 Subject: [PATCH 2/6] Surface per-statement KILL failures in kill() kill() previously only checked the outer query result and discarded the response, so a server-rejected KILL (e.g. an unknown or not-owned live query, which the KILL statement raises as a per-statement error stored at index 0) was swallowed and kill() returned normally. Take/check the index-0 result via take_one_result! so the failure surfaces as a SurrealException. --- src/main/rust/surreal.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/rust/surreal.rs b/src/main/rust/surreal.rs index 3d12400c..3fb5dee1 100644 --- a/src/main/rust/surreal.rs +++ b/src/main/rust/surreal.rs @@ -660,7 +660,10 @@ pub extern "system" fn Java_com_surrealdb_Surreal_kill<'local>( let mut params = BTreeMap::new(); params.insert("id".to_string(), Value::Uuid(uuid)); let res = surrealdb_query::(surreal, "KILL $id", Some(params)); - let _ = check_query_result!(env, res, || ()); + let mut res = check_query_result!(env, res, || ()); + // Surface per-statement KILL failures (e.g. unknown or not-owned live query), + // which are stored at index 0 even when the outer query result is Ok. + let _ = take_one_result!(env, res, || ()); }) } From a03b2cc1ddae580926bddbfc77d7260536d7c38c Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Fri, 19 Jun 2026 12:22:20 -0300 Subject: [PATCH 3/6] Document kill() as best-effort; the SDK discards the KILL result The surrealdb client SDK builds IndexedResults with `QueryType::Kill => {}`, discarding the KILL statement result (including any per-statement error), and its typed kill is `pub(crate)`. There is therefore no public path to observe a server-side KILL rejection (e.g. a live query owned by another session): take(0) on the KILL response only ever yields Value::None. Revert the ineffective take_one_result check added in 54c29e0 (keep check_query_result! for transport/connection errors) and document kill() as best-effort in both the javadoc and the code. --- src/main/java/com/surrealdb/Surreal.java | 7 ++++++- src/main/rust/surreal.rs | 10 ++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/surrealdb/Surreal.java b/src/main/java/com/surrealdb/Surreal.java index a65730c0..187f26f5 100644 --- a/src/main/java/com/surrealdb/Surreal.java +++ b/src/main/java/com/surrealdb/Surreal.java @@ -302,10 +302,15 @@ public LiveStream selectLive(String table) { * unblock {@code next()}. Use {@code kill} for a live query you only hold the * id of (e.g. one started elsewhere or read from a notification). * + *

+ * This is best-effort: the SurrealDB client does not report a server-side KILL + * rejection (e.g. a live query owned by another session), so only an invalid + * UUID or a connection failure raises. + * * @param queryId * the live query UUID to terminate * @throws SurrealException - * if the id is not a valid UUID or the kill fails + * if the id is not a valid UUID or the request cannot be sent */ public void kill(String queryId) { kill(getPtr(), queryId); diff --git a/src/main/rust/surreal.rs b/src/main/rust/surreal.rs index 3fb5dee1..b485aac7 100644 --- a/src/main/rust/surreal.rs +++ b/src/main/rust/surreal.rs @@ -659,11 +659,13 @@ pub extern "system" fn Java_com_surrealdb_Surreal_kill<'local>( }; let mut params = BTreeMap::new(); params.insert("id".to_string(), Value::Uuid(uuid)); + // Best-effort: the surrealdb client SDK discards the KILL statement result + // (`QueryType::Kill => {}` when building IndexedResults) and its typed kill is + // `pub(crate)`, so a server-side KILL rejection (e.g. a live query owned by + // another session) is not observable from the client. Only transport/connection + // errors surface here, via check_query_result!. let res = surrealdb_query::(surreal, "KILL $id", Some(params)); - let mut res = check_query_result!(env, res, || ()); - // Surface per-statement KILL failures (e.g. unknown or not-owned live query), - // which are stored at index 0 even when the outer query result is Ok. - let _ = take_one_result!(env, res, || ()); + let _ = check_query_result!(env, res, || ()); }) } From 2d8be84a05539f246736fbacbfff91ed79359e21 Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Fri, 19 Jun 2026 12:37:01 -0300 Subject: [PATCH 4/6] selectLive: accept array-wrapped live-query UUID (builder parity) The `.select(table).live()` builder unwraps a one-element `Value::Array([Value::Uuid])` result shape (method/live.rs). Mirror that in the raw-query path's UUID extraction so selectLive() preserves the builder's result-shape handling across server/protocol versions. With surrealdb 3.1.4 the `QueryType::Live` path normalizes the id to a bare `Value::Uuid` via `value.into_uuid()` before storing it in IndexedResults, so `take(0)` yields a scalar there (covered by the existing tests); the array arm is defensive parity for any path/version that stores the raw shape. --- src/main/rust/surreal.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/main/rust/surreal.rs b/src/main/rust/surreal.rs index b485aac7..6cbceb9e 100644 --- a/src/main/rust/surreal.rs +++ b/src/main/rust/surreal.rs @@ -568,6 +568,17 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectLive<'local>( // take(0) also surfaces subscription errors (e.g. the table does not exist). let uuid_str = match res.take::(0) { Ok(Value::Uuid(uuid)) => uuid.to_string(), + // Some servers/protocols return the live-query id wrapped in a one-element + // array; unwrap it, mirroring the SDK's `.select(table).live()` builder. + Ok(Value::Array(mut arr)) if arr.len() == 1 => match arr.pop() { + Some(Value::Uuid(uuid)) => uuid.to_string(), + other => { + return SurrealError::SurrealDBJni(format!( + "LIVE SELECT did not return a UUID: {other:?}" + )) + .exception(env, null_mut); + } + }, Ok(other) => { return SurrealError::SurrealDBJni(format!( "LIVE SELECT did not return a UUID: {}", From 515aafaebde82dde6406b0f284096f5626981fe2 Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Fri, 19 Jun 2026 14:39:01 -0300 Subject: [PATCH 5/6] Update README for live queries; drop obsolete planned items Add a Features bullet documenting live queries (selectLive + kill). Remove the two Planned Features entries: "Killing live queries by ID" (implemented in this PR) and "Futures" (no longer a SurrealDB feature). --- README.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/README.md b/README.md index ee2f5d08..7426aae5 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ View the SDK documentation [here](https://surrealdb.com/docs/integration/librari - Support of remote connection to SurrealDB. - Mutable POJOs (Java 8+) and immutable `record` classes (JDK 16+) for `create` / `select`. - All geometry types (Point, LineString, Polygon, their multi-variants, and GeometryCollection) for reading and writing. +- Live queries: subscribe to table changes with `selectLive` and terminate them by id with `kill`. - Supported on JAVA JDK 8, 11, 17, 21, 25. - Supported architectures: - Linux (ARM) aarch64 @@ -253,7 +254,4 @@ cargo build ### Planned Features -- Futures -- Killing live queries by ID - [Open an issue for feature requests](https://github.com/surrealdb/surrealdb.java/issues) From a913ba81380f0a69c29763efb78efe4134dc131f Mon Sep 17 00:00:00 2001 From: Emmanuel Keller Date: Fri, 19 Jun 2026 14:41:47 -0300 Subject: [PATCH 6/6] Remove empty Planned Features section from README Both items were removed (one shipped, one no longer a SurrealDB feature), leaving the section empty; drop the heading and the lone feature-request link. --- README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/README.md b/README.md index 7426aae5..c171d0cc 100644 --- a/README.md +++ b/README.md @@ -251,7 +251,3 @@ On Windows: cargo build ./gradlew.bat -i test ``` - -### Planned Features - -[Open an issue for feature requests](https://github.com/surrealdb/surrealdb.java/issues)