Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ View the SDK documentation [here](https://surrealdb.com/docs/integration/librari
- Support of remote connection to SurrealDB.
- Mutable POJOs (Java 8+) and immutable `record` classes (JDK 16+) for `create` / `select`.
- All geometry types (Point, LineString, Polygon, their multi-variants, and GeometryCollection) for reading and writing.
- Live queries: subscribe to table changes with `selectLive` and terminate them by id with `kill`.
- Supported on JAVA JDK 8, 11, 17, 21, 25.
- Supported architectures:
- Linux (ARM) aarch64
Expand Down Expand Up @@ -250,10 +251,3 @@ On Windows:
cargo build
./gradlew.bat -i test
```

### Planned Features

- Futures
- Killing live queries by ID

[Open an issue for feature requests](https://github.com/surrealdb/surrealdb.java/issues)
24 changes: 23 additions & 1 deletion src/main/java/com/surrealdb/LiveStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ public class LiveStream implements AutoCloseable {
*/
private volatile long handle;

LiveStream(long handle) {
/**
* The live query UUID. Available immediately (before the first notification)
* and unaffected by {@link #close()}.
*/
private final String queryId;

LiveStream(long handle, String queryId) {
this.handle = handle;
this.queryId = queryId;
}

/**
Expand Down Expand Up @@ -80,6 +87,21 @@ public void close() {
}
}

/**
* Returns the UUID of this live query.
*
* <p>
* The id is available immediately after {@link Surreal#selectLive(String)}
* returns — before any notification arrives — and can be passed to
* {@link Surreal#kill(String)} to stop the live query. It is also carried by
* every {@link LiveNotification} (see {@link LiveNotification#getQueryId()}).
*
* @return the live query UUID as a string
*/
public String getQueryId() {
return queryId;
}

private static native LiveNotification nextNative(long handle);

private static native void releaseNative(long handle);
Expand Down
44 changes: 42 additions & 2 deletions src/main/java/com/surrealdb/Surreal.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ private static native long upsertRecordIdRangeValue(long ptr, String table, long

private static native boolean importSql(long ptr, String path);

private static native long selectLive(long ptr, String table);
private static native LiveStream selectLive(long ptr, String table);

private static native void kill(long ptr, String queryId);

@Override
final String toString(long ptr) {
Expand Down Expand Up @@ -284,7 +286,45 @@ public boolean importSql(String path) {
* the subscription fails
*/
public LiveStream selectLive(String table) {
return new LiveStream(selectLive(getPtr(), table));
return selectLive(getPtr(), table);
}

/**
* Stops the live query with the given id. After the server processes the kill,
* the query delivers no further notifications.
*
* <p>
* The id is the UUID returned by {@link LiveStream#getQueryId()} (or carried by
* each {@link LiveNotification}). A kill does not by itself close a local
* {@link LiveStream}: a thread blocked in {@link LiveStream#next()} keeps
* waiting (no further notifications arrive, on both the embedded and WebSocket
* engines). Call {@link LiveStream#close()} to release a local stream and
* unblock {@code next()}. Use {@code kill} for a live query you only hold the
* id of (e.g. one started elsewhere or read from a notification).
*
* <p>
* This is best-effort: the SurrealDB client does not report a server-side KILL
* rejection (e.g. a live query owned by another session), so only an invalid
* UUID or a connection failure raises.
*
* @param queryId
* the live query UUID to terminate
* @throws SurrealException
* if the id is not a valid UUID or the request cannot be sent
*/
public void kill(String queryId) {
kill(getPtr(), queryId);
}

/**
* Stops the live query with the given id.
*
* @param queryId
* the live query UUID to terminate
* @see #kill(String)
*/
public void kill(java.util.UUID queryId) {
kill(getPtr(), queryId.toString());
}

/**
Expand Down
184 changes: 123 additions & 61 deletions src/main/rust/surreal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use std::path::Path;
use std::ptr::null_mut;
use std::str::FromStr;
use std::sync::Arc;

use crate::error::SurrealError;
Expand All @@ -22,7 +23,7 @@ use std::ops::Bound;
use std::result::Result as StdResult;
use surrealdb::engine::any::Any;
use surrealdb::opt::auth::{Database, Namespace, Record as AuthRecord, Root};
use surrealdb::types::{RecordId, RecordIdKey, RecordIdKeyRange, SurrealValue, ToSql, Value};
use surrealdb::types::{RecordId, RecordIdKey, RecordIdKeyRange, SurrealValue, ToSql, Uuid, Value};
use surrealdb::{IndexedResults, Result, Surreal};

#[no_mangle]
Expand Down Expand Up @@ -514,74 +515,100 @@ pub extern "system" fn Java_com_surrealdb_Surreal_run<'local>(

/// JNI implementation of `Surreal.selectLive(long ptr, String table)`.
///
/// Sets up a live query on `table` and returns a `LiveStreamChannel` handle
/// that the Java `LiveStream` wrapper reads from.
/// Starts a live query on `table` and returns a fully-constructed Java
/// `LiveStream` object carrying both the native `LiveStreamChannel` handle and
/// the live-query UUID.
///
/// ## Architecture
///
/// ```text
/// Java thread Background thread SurrealDB engine
/// ─────────── ───────────────── ────────────────
/// Java thread Background thread SurrealDB engine
/// ─────────── ───────────────── ────────────────
/// selectLive()
/// ├─ spawn ──────▶ .select(table).live().await ──▶ subscribe
/// │ │
/// │ ready_rx.recv() │ ready_tx.send(Ok/Err)
/// │◀──────────────────┤
/// │ │
/// ▼ ▼
/// return handle loop { select! { notification ──▶ tx_thread.send() } }
/// │
/// nextNative() │
/// rx.recv() ◀─────────────────┘
/// ├─ query("LIVE SELECT …").await ──────────────────────────▶ subscribe + UUID
/// ├─ res.take(0) ──▶ live-query UUID (surfaces errors eagerly)
/// ├─ res.stream(0) ──▶ QueryStream<Value>
/// ├─ spawn ─────────────────────────────▶ loop { select! { notif ─▶ tx_thread.send() } }
/// ▼ │
/// new LiveStream(handle, uuid) │
/// nextNative() │
/// rx.recv() ◀──────────────────────────────────────┘
/// ```
///
/// A dedicated OS thread runs `block_on` on the shared tokio runtime to drive
/// the live-query stream. Notifications are forwarded through an unbounded
/// `async_channel` to the Java side, which reads them via `nextNative`.
///
/// ## Readiness handshake
///
/// The background thread signals via a `std::sync::mpsc` channel once the
/// subscription is established (`Ok(())`) or has failed (`Err(e)`).
/// `selectLive` blocks on this signal so that errors (e.g. table does not
/// exist) are thrown at call site instead of being deferred to `next()`.
/// Unlike the `.select(table).live()` builder (whose query id is private), the
/// raw `LIVE SELECT` runs synchronously on the calling thread, so subscription
/// errors (e.g. the table does not exist) are surfaced eagerly via `take(0)`
/// rather than deferred to `next()`. The statement returns a `Value::Uuid`,
/// which we read up front so it is available from `LiveStream.getQueryId()`
/// before any notification arrives. A dedicated OS thread then drives the
/// notification stream on the shared tokio runtime, forwarding notifications
/// through an unbounded `async_channel` that the Java side reads via
/// `nextNative`. `take(0)` and `stream(0)` read independent maps, so reading
/// the UUID does not consume the stream.
#[no_mangle]
pub extern "system" fn Java_com_surrealdb_Surreal_selectLive<'local>(
mut env: EnvUnowned<'local>,
_class: JClass<'local>,
ptr: jlong,
table: JString<'local>,
) -> jlong {
) -> jobject {
with_env_body!(env, env, {
let surreal = get_surreal_ref!(env, ptr, || 0);
let table = get_rust_string!(env, &table, || 0);
let surreal = get_surreal_ref!(env, ptr, null_mut);
let table = get_rust_string!(env, &table, null_mut);

// Run the LIVE SELECT synchronously on the calling thread. type::table($tb)
// binds the table name safely (no SQL injection).
let mut params = BTreeMap::new();
params.insert("tb".to_string(), Value::String(table));
let res =
surrealdb_query::<Value>(surreal, "LIVE SELECT * FROM type::table($tb)", Some(params));
let mut res = check_query_result!(env, res, null_mut);

// The statement result at index 0 is the live-query UUID. Reading it via
// take(0) also surfaces subscription errors (e.g. the table does not exist).
let uuid_str = match res.take::<Value>(0) {
Ok(Value::Uuid(uuid)) => uuid.to_string(),
Comment thread
emmanuel-keller marked this conversation as resolved.
// Some servers/protocols return the live-query id wrapped in a one-element
// array; unwrap it, mirroring the SDK's `.select(table).live()` builder.
Ok(Value::Array(mut arr)) if arr.len() == 1 => match arr.pop() {
Some(Value::Uuid(uuid)) => uuid.to_string(),
other => {
return SurrealError::SurrealDBJni(format!(
"LIVE SELECT did not return a UUID: {other:?}"
))
.exception(env, null_mut);
}
},
Ok(other) => {
return SurrealError::SurrealDBJni(format!(
"LIVE SELECT did not return a UUID: {}",
other.to_sql()
))
.exception(env, null_mut);
}
Err(e) => return SurrealError::SurrealDB(e).exception(env, null_mut),
};

// The notification stream lives in a separate map from the results, so the
// take(0) above does not disturb it.
let qstream = match res.stream::<Value>(0) {
Ok(s) => s,
Err(e) => return SurrealError::SurrealDB(e).exception(env, null_mut),
};

// Notification channel: background thread produces, nextNative consumes.
let (tx, rx) = async_channel::unbounded();
// Shutdown channel: dropping shutdown_tx signals the background thread to exit.
let (shutdown_tx, shutdown_rx) = async_channel::bounded::<()>(1);
// Readiness channel: background thread confirms subscription before we return.
let (ready_tx, ready_rx) =
std::sync::mpsc::channel::<std::result::Result<(), surrealdb::Error>>();

let tx_thread = tx.clone();
let surreal_clone = surreal.clone();
let join_handle = std::thread::spawn(move || {
TOKIO_RUNTIME.block_on(async move {
let mut stream = match surreal_clone.select(table).live().await {
Ok(s) => {
let _ = ready_tx.send(Ok(()));
s
}
Err(e) => {
let _ = ready_tx.send(Err(e));
return;
}
};
let mut qstream = qstream;
loop {
tokio::select! {
_ = shutdown_rx.recv() => break,
item = stream.next() => match item {
item = qstream.next() => match item {
Some(i) => {
let _ = tx_thread.send(i).await;
Comment thread
emmanuel-keller marked this conversation as resolved.
}
Expand All @@ -592,29 +619,64 @@ pub extern "system" fn Java_com_surrealdb_Surreal_selectLive<'local>(
});
});

// Block until the subscription is confirmed or fails.
match ready_rx.recv() {
Ok(Ok(())) => {}
Ok(Err(e)) => {
let _ = join_handle.join();
return SurrealError::from(e).exception(env, || 0);
}
Err(_) => {
let _ = join_handle.join();
return SurrealError::SurrealDBJni(
"Live query background thread exited unexpectedly".to_string(),
)
.exception(env, || 0);
}
}

let recv_mutex = std::sync::Arc::new(parking_lot::Mutex::new(()));
JniTypes::new_live_stream((
let handle = JniTypes::new_live_stream((
recv_mutex,
parking_lot::Mutex::new(Some(join_handle)),
parking_lot::Mutex::new(Some(shutdown_tx)),
parking_lot::Mutex::new(Some(rx)),
))
));

// Construct and return a LiveStream(handle, queryId), mirroring how live.rs
// builds a LiveNotification.
let uuid_raw = new_string!(env, uuid_str, null_mut);
let uuid_jstr = unsafe { JObject::from_raw(env, uuid_raw) };
let class = match env.find_class(jni_str!("com/surrealdb/LiveStream")) {
Ok(c) => c,
Err(e) => return SurrealError::from(e).exception(env, null_mut),
};
let args = [JValue::Long(handle), JValue::Object(&uuid_jstr)];
match env.new_object(class, jni_sig!("(JLjava/lang/String;)V"), &args) {
Ok(obj) => obj.into_raw(),
Err(e) => SurrealError::from(e).exception(env, null_mut),
}
})
}

/// JNI implementation of `Surreal.kill(long ptr, String queryId)`.
///
/// Terminates the live query identified by `queryId` (a UUID string, e.g. from
/// `LiveStream.getQueryId()` or `LiveNotification.getQueryId()`) by running a
/// `KILL` statement on the same connection. After the server processes the
/// kill, the associated `LiveStream` ends and `LiveStream.next()` returns
/// empty. An invalid UUID or a connection failure is surfaced as a
/// `SurrealException`.
#[no_mangle]
pub extern "system" fn Java_com_surrealdb_Surreal_kill<'local>(
mut env: EnvUnowned<'local>,
_class: JClass<'local>,
ptr: jlong,
query_id: JString<'local>,
) {
with_env_body!(env, env, {
let surreal = get_surreal_ref!(env, ptr, || ());
let query_id = get_rust_string!(env, &query_id, || ());
let uuid = match Uuid::from_str(&query_id) {
Ok(uuid) => uuid,
Err(_) => {
return SurrealError::SurrealDBJni(format!("Invalid live query id: {query_id}"))
.exception(env, || ());
}
};
let mut params = BTreeMap::new();
params.insert("id".to_string(), Value::Uuid(uuid));
// Best-effort: the surrealdb client SDK discards the KILL statement result
// (`QueryType::Kill => {}` when building IndexedResults) and its typed kill is
// `pub(crate)`, so a server-side KILL rejection (e.g. a live query owned by
// another session) is not observable from the client. Only transport/connection
// errors surface here, via check_query_result!.
let res = surrealdb_query::<Value>(surreal, "KILL $id", Some(params));
let _ = check_query_result!(env, res, || ());
})
}

Expand Down
Loading
Loading