Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions c/include/arrow-adbc/adbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,8 @@ typedef void (*AdbcWarningHandler)(const struct AdbcError* warning, void* user_d
/// driver and the driver manager.
/// @{

struct AdbcSerializableHandle;

/// \brief An instance of an initialized database driver.
///
/// This provides a common interface for vendor-specific driver
Expand Down Expand Up @@ -1516,6 +1518,23 @@ struct ADBC_EXPORT AdbcDriver {
AdbcStatusCode (*StatementExecuteMulti)(struct AdbcStatement*,
struct AdbcMultiResultSet*, struct AdbcError*);

AdbcStatusCode (*ConnectionBeginIngestPartitions)(struct AdbcConnection*,
struct ArrowSchema*,
struct AdbcSerializableHandle*,
struct AdbcError*);
AdbcStatusCode (*ConnectionWriteIngestPartition)(struct AdbcConnection*, const uint8_t*,
size_t, struct ArrowArrayStream*,
struct AdbcSerializableHandle*,
struct AdbcError*);
AdbcStatusCode (*ConnectionCompleteIngestPartitions)(struct AdbcConnection*,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lidavidm I think we need to distinguish between 3 possible groups of status codes of a Complete call:

  • success: ADBC_STATUS_OK
  • retryable: Complete failed, but a client should retry a complete call w/o restaging the data or regenerating receipts. The example is delta/iceberg concurrency conflict error.
  • terminal: something else went wrong. start from scratch.

I couldn't really map retryable status to any existing status code. I'm thinking of adding either ADBC_STATUS_CONFLICT or ADBC_STATUS_RETRY. wdyt?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just an out parameter to indicate whether it was success-or-retryable? A new status code would be a big change

@tokoko tokoko Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me push back a little. out param feels clean for C interface, but replicating the same for language apis will be a patchwork of different solutions, they would have to either go into exceptions or have additional output in the signature depending on the language. btw, what makes a new status code a big change, would something break? are there code paths that rely on exhaustive checks?

Another alternative is to put some additional metadata inside an Error message. That's probably abuse, but also could work.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hear you, but my worry is that a new error code could be in principle returned from any API function, which would break existing users.

If we say this code is only used for this particular API function, then I think it makes more sense for an out parameter than an error code (which has a non-local impact).

Also language bindings could use different strategies, e.g. Rust would return an Result<PartitionedIngestStatus> or something and not an out parameter, and deal with the messiness at the FFI layer. This is already the case for various APIs (e.g. Java treats bulk ingest itself differently).

const uint8_t*, size_t, size_t,
const uint8_t**, const size_t*,
int64_t*, struct AdbcError*);
AdbcStatusCode (*ConnectionAbortIngestPartitions)(struct AdbcConnection*,
const uint8_t*, size_t, size_t,
const uint8_t**, const size_t*,
struct AdbcError*);

/// @}
};

Expand Down Expand Up @@ -2398,6 +2417,204 @@ AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,

/// @}

/// \defgroup adbc-connection-ingest-partition Partitioned Bulk Ingest
/// @{

/// \brief A driver-owned opaque byte buffer with a release callback.
///
/// Used by partitioned bulk ingest for both handles (returned by
/// AdbcConnectionBeginIngestPartitions) and receipts (returned by
/// AdbcConnectionWriteIngestPartition).
///
/// The bytes are opaque and serializable: the caller may copy
/// `bytes[0..length)` and ship that copy across processes or hosts.
///
/// The struct itself is owned by the driver. Call `release` exactly
/// once to free it.
///
/// \since ADBC API revision 1.2.0
struct AdbcSerializableHandle {
/// \brief The length of `bytes`.
size_t length;

/// \brief The serialized bytes (driver-owned).
const uint8_t* bytes;

/// \brief Private driver state.
void* private_data;

/// \brief Release the memory. Sets `release` to NULL.
void (*release)(struct AdbcSerializableHandle* self);
};
/// @}

/// \addtogroup adbc-connection-ingest-partition
/// Some drivers can accept bulk writes from a distributed writer: a
/// coordinator configures an ingest, many workers write partitions in
/// parallel (possibly from different processes or hosts), and the
/// coordinator commits or aborts atomically.
///
/// This mirrors the read-side partitioned execution model. The
/// coordinator calls AdbcConnectionBeginIngestPartitions to obtain an
/// opaque, serializable handle. The handle is shipped to workers by
/// the caller (e.g. a Spark driver sending it to executors). Workers
/// call AdbcConnectionWriteIngestPartition on their own connections —
/// the connection does not have to be the same one that created the
/// handle. Each write returns an opaque receipt. The coordinator
/// collects receipts and calls AdbcConnectionCompleteIngestPartitions
/// (or AdbcConnectionAbortIngestPartitions on failure).
///
/// Handles and receipts are driver-defined opaque byte strings. They
/// are safe to transmit between processes and to use concurrently
/// from multiple connections.
///
/// Drivers are not required to support partitioned ingest.
///
/// \since ADBC API revision 1.2.0
///
/// @{

/// \brief Begin a partitioned bulk ingest.
///
/// The target table, mode, and optional catalog/schema are configured
/// via the ADBC_INGEST_OPTION_* connection options before calling
/// this function. The same option keys used for single-writer
/// statement-level ingest apply here at the connection level:
/// ADBC_INGEST_OPTION_TARGET_TABLE (required),
/// ADBC_INGEST_OPTION_MODE (required),
/// ADBC_INGEST_OPTION_TARGET_CATALOG (optional),
/// ADBC_INGEST_OPTION_TARGET_DB_SCHEMA (optional).
///
/// For ADBC_INGEST_OPTION_MODE_CREATE,
/// ADBC_INGEST_OPTION_MODE_CREATE_APPEND, and
/// ADBC_INGEST_OPTION_MODE_REPLACE, `schema` is required and the
/// driver creates (or recreates) the target table at this call. For
/// ADBC_INGEST_OPTION_MODE_APPEND, `schema` is optional; if provided,
/// the driver validates it against the target and returns
/// ADBC_STATUS_ALREADY_EXISTS on mismatch.
///
/// The returned handle is opaque, serializable, and usable from any
/// connection that can open the same database. The caller releases
/// it via `out_handle->release`; the bytes can be copied and shipped
/// to workers before release.
///
/// \since ADBC API revision 1.2.0
/// \param[in] connection The coordinator's connection.
/// \param[in] schema Arrow schema of the data to be written.
/// Required for create/replace/create_append modes; optional for
/// append.
/// \param[out] out_handle Driver-owned handle. Must be released by
/// the caller via `out_handle->release`.
/// \param[out] error Error details, if any.
/// \return ADBC_STATUS_INVALID_ARGUMENT if mode requires a schema
/// but none was provided, or if required options are missing.
/// \return ADBC_STATUS_ALREADY_EXISTS if append mode is requested
/// and the target schema disagrees with the provided schema.
/// \return ADBC_STATUS_NOT_IMPLEMENTED if the driver does not
/// support partitioned ingest.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionBeginIngestPartitions(
struct AdbcConnection* connection, struct ArrowSchema* schema,
struct AdbcSerializableHandle* out_handle, struct AdbcError* error);

/// \brief Write one partition of a partitioned bulk ingest.
///
/// Called by a worker, typically on a different connection than the
/// one that created the handle. The driver reads the bound stream
/// to completion, writes its contents to driver-specific staging
/// (per-call: a unique staging table, unique object-store path, etc.
/// — never shared across concurrent writes), and returns an opaque
/// receipt.
///
/// The stream's schema should be compatible with the target table's
/// schema. Drivers may validate this at any point during the write;
/// on mismatch the call fails and produces no receipt. The exact
/// validation mechanism is driver-specific (e.g., RDBMS drivers may
/// rely on the staging table DDL to enforce compatibility).
///
/// On error of any kind, `out_receipt` is left with `release ==
/// NULL` and the caller should retry the whole partition. Partial
/// receipts are never produced. The driver may, however, leave
/// partial server-side state (for example, a per-call staging
/// table); the caller must still call `AbortIngestPartitions` for
/// the handle (with no receipt for this failed write) to release
/// any staging resources, or rely on driver housekeeping.
///
/// This call is safe to invoke concurrently from many connections
/// using the same handle.
///
/// \since ADBC API revision 1.2.0
/// \param[in] connection The worker's connection.
/// \param[in] handle The handle bytes from Begin.
/// \param[in] handle_len Length of handle.
/// \param[in] data Arrow stream of partition data. The driver
/// consumes the stream and releases it.
/// \param[out] out_receipt Driver-owned receipt. Must be released
/// by the caller via `out_receipt->release`.
/// \param[out] error Error details, if any.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionWriteIngestPartition(
struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len,
struct ArrowArrayStream* data, struct AdbcSerializableHandle* out_receipt,
struct AdbcError* error);

/// \brief Complete a partitioned bulk ingest.
///
/// Atomically promotes all writes named by `receipts` into the
/// target table. Semantics of "atomic" are driver-specific: RDBMS
/// drivers typically swap staging into the target in a transaction;
/// table-format drivers (Iceberg, Delta) write a catalog or
/// transaction-log entry referencing the data files in the
/// receipts.
///
/// After Complete returns successfully, the handle is consumed and
/// must not be used again.
///
/// Receipts from failed writes, or writes whose receipts were never
/// observed by the coordinator, are not included in the commit.
/// Their staging data is orphaned and is cleaned up as described in
/// AdbcConnectionAbortIngestPartitions.
///
/// \since ADBC API revision 1.2.0
/// \param[in] connection A connection — typically the coordinator's,
/// but any connection that can open the same database works.
/// \param[in] handle The handle from Begin.
/// \param[in] handle_len Length of handle.
/// \param[in] num_receipts Number of receipts in the batch.
/// \param[in] receipts Array of receipt byte-pointers.
/// \param[in] receipt_lens Array of receipt lengths.
/// \param[out] rows_affected Number of rows committed, or -1 if
/// unknown. Pass NULL if not wanted.
/// \param[out] error Error details, if any.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionCompleteIngestPartitions(
struct AdbcConnection* connection, const uint8_t* handle, size_t handle_len,
size_t num_receipts, const uint8_t** receipts, const size_t* receipt_lens,
int64_t* rows_affected, struct AdbcError* error);

/// \brief Abort a partitioned bulk ingest.
///
/// Best-effort: the driver may clean up any resources associated
/// with the handle. The handle is consumed.
///
/// \since ADBC API revision 1.2.0
/// \param[in] connection A connection.
/// \param[in] handle The handle from Begin.
/// \param[in] handle_len Length of handle.
/// \param[in] num_receipts Number of receipts, or 0.
/// \param[in] receipts Array of receipt byte-pointers, or NULL.
/// \param[in] receipt_lens Array of receipt lengths, or NULL.
/// \param[out] error Error details, if any.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionAbortIngestPartitions(struct AdbcConnection* connection,
const uint8_t* handle,
size_t handle_len, size_t num_receipts,
const uint8_t** receipts,
const size_t* receipt_lens,
struct AdbcError* error);

/// @}

/// \defgroup adbc-connection-transaction Transaction Semantics
///
/// Connections start out in auto-commit mode by default (if
Expand Down
Loading
Loading