Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions java/lance-jni/src/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ fn inner_merge_insert<'local>(
let conflict_retries = extract_conflict_retries(env, &jparam)?;
let retry_timeout_ms = extract_retry_timeout_ms(env, &jparam)?;
let skip_auto_cleanup = extract_skip_auto_cleanup(env, &jparam)?;
let allow_external_blob_outside_bases =
extract_allow_external_blob_outside_bases(env, &jparam)?;

let (new_ds, merge_stats) = unsafe {
let dataset = env.get_rust_field::<_, _, BlockingDataset>(jdataset, NATIVE_DATASET)?;
Expand All @@ -65,6 +67,7 @@ fn inner_merge_insert<'local>(
.conflict_retries(conflict_retries)
.retry_timeout(Duration::from_millis(retry_timeout_ms as u64))
.skip_auto_cleanup(skip_auto_cleanup)
.with_allow_external_blob_outside_bases(allow_external_blob_outside_bases)
.try_build()?;

let stream_ptr = batch_address as *mut FFI_ArrowArrayStream;
Expand Down Expand Up @@ -229,6 +232,16 @@ fn extract_skip_auto_cleanup<'local>(env: &mut JNIEnv<'local>, jparam: &JObject)
Ok(skip_auto_cleanup)
}

fn extract_allow_external_blob_outside_bases<'local>(
env: &mut JNIEnv<'local>,
jparam: &JObject,
) -> Result<bool> {
let allow_external_blob_outside_bases = env
.call_method(jparam, "allowExternalBlobOutsideBases", "()Z", &[])?
.z()?;
Ok(allow_external_blob_outside_bases)
}

const MERGE_STATS_CLASS: &str = "org/lance/merge/MergeInsertStats";
const MERGE_STATS_CONSTRUCTOR_SIG: &str = "(JJJIJJ)V";
const MERGE_RESULT_CLASS: &str = "org/lance/merge/MergeInsertResult";
Expand Down
23 changes: 23 additions & 0 deletions java/src/main/java/org/lance/merge/MergeInsertParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class MergeInsertParams {
private int conflictRetries = 10;
private long retryTimeoutMs = 30 * 1000;
private boolean skipAutoCleanup = false;
private boolean allowExternalBlobOutsideBases = false;

public MergeInsertParams(List<String> on) {
this.on = on;
Expand Down Expand Up @@ -223,6 +224,23 @@ public MergeInsertParams withSkipAutoCleanup(boolean skipAutoCleanup) {
return this;
}

/**
* Configure whether external blob URIs outside registered bases are allowed.
*
* <p>By default, external blob URIs must resolve to a registered non-dataset-root base path. Set
* this to true to store unmatched external URIs as absolute references during merge insert
* writes.
*
* @param allowExternalBlobOutsideBases Whether to allow external blob URIs outside registered
* bases.
* @return This MergeInsertParams instance
*/
public MergeInsertParams withAllowExternalBlobOutsideBases(
boolean allowExternalBlobOutsideBases) {
this.allowExternalBlobOutsideBases = allowExternalBlobOutsideBases;
return this;
}

public List<String> on() {
return on;
}
Expand Down Expand Up @@ -275,6 +293,10 @@ public boolean skipAutoCleanup() {
return skipAutoCleanup;
}

public boolean allowExternalBlobOutsideBases() {
return allowExternalBlobOutsideBases;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand All @@ -292,6 +314,7 @@ public String toString() {
.add("conflictRetries", conflictRetries)
.add("retryTimeoutMs", retryTimeoutMs)
.add("skipAutoCleanup", skipAutoCleanup)
.add("allowExternalBlobOutsideBases", allowExternalBlobOutsideBases)
.toString();
}

Expand Down
22 changes: 22 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,28 @@ def use_index(self, use_index: bool) -> "MergeInsertBuilder":
"""
return super(MergeInsertBuilder, self).use_index(use_index)

def allow_external_blob_outside_bases(self, allow: bool) -> "MergeInsertBuilder":
"""
Configure whether external blob URIs outside registered bases are allowed.

By default, external blob URIs must resolve to a registered non-dataset-root
base path. Set this to True to store unmatched external URIs as absolute
references during merge insert writes.

Parameters
----------
allow : bool
Whether to allow external blob URIs outside registered bases.

Returns
-------
MergeInsertBuilder
The builder instance for method chaining.
"""
return super(MergeInsertBuilder, self).allow_external_blob_outside_bases(
allow
)

def explain_plan(
self, schema: Optional[pa.Schema] = None, verbose: bool = False
) -> str:
Expand Down
27 changes: 27 additions & 0 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,33 @@ def test_blob_extension_write_external(tmp_path):
assert f.read() == b"hello"


def test_blob_extension_merge_insert_external_outside_bases(tmp_path):
blob_path = tmp_path / "external_blob.bin"
blob_path.write_bytes(b"merge")
uri = blob_path.as_uri()

table = pa.table({"id": [1], "blob": lance.blob_array([b"initial"])})
ds = lance.write_dataset(
table,
tmp_path / "test_ds_v2_external_merge_insert",
data_storage_version="2.2",
)

source = pa.table({"id": [2], "blob": lance.blob_array([uri])})
stats = (
ds.merge_insert("id")
.allow_external_blob_outside_bases(True)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: This merge_insert call is missing a required action (for example when_not_matched_insert_all()), so execute(source) will raise ValueError instead of inserting the new row.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At python/python/tests/test_blob.py, line 553:

<comment>This merge_insert call is missing a required action (for example `when_not_matched_insert_all()`), so `execute(source)` will raise `ValueError` instead of inserting the new row.</comment>

<file context>
@@ -535,6 +535,33 @@ def test_blob_extension_write_external(tmp_path):
+    source = pa.table({"id": [2], "blob": lance.blob_array([uri])})
+    stats = (
+        ds.merge_insert("id")
+        .allow_external_blob_outside_bases(True)
+        .execute(source)
+    )
</file context>
Suggested change
.allow_external_blob_outside_bases(True)
.allow_external_blob_outside_bases(True)
.when_not_matched_insert_all()

.execute(source)
Comment on lines +551 to +554

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Python test fails because merge insert builder is not configured to do any work

The test test_blob_extension_merge_insert_external_outside_bases calls ds.merge_insert("id").allow_external_blob_outside_bases(True).execute(source) without ever calling .when_not_matched_insert_all(). The Python MergeInsertBuilder initializes both when_matched and when_not_matched to DoNothing (python/src/dataset.rs:214-216), which means insert_not_matched=false. When execute calls try_build() (rust/lance/src/dataset/write/merge_insert.rs:562-569), all three guard conditions (!insert_not_matched, when_matched == DoNothing, delete_not_matched_by_source == Keep) are true, so it returns an error: "The merge insert job is not configured to change the data in any way". The test will fail, leaving the feature without Python test coverage. Note: the Rust test test_merge_insert_allows_external_blobs_outside_bases does not have this problem because the Rust MergeInsertBuilder::try_new defaults insert_not_matched to true.

Suggested change
stats = (
ds.merge_insert("id")
.allow_external_blob_outside_bases(True)
.execute(source)
stats = (
ds.merge_insert("id")
.when_not_matched_insert_all()
.allow_external_blob_outside_bases(True)
.execute(source)
)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

)

assert stats["num_inserted_rows"] == 1
payloads = []
for blob in ds.take_blobs("blob", indices=[0, 1]):
with blob as f:
payloads.append(f.read())
assert b"merge" in payloads


@pytest.mark.parametrize(
("position", "size"),
[
Expand Down
8 changes: 8 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ impl MergeInsertBuilder {
Ok(slf)
}

pub fn allow_external_blob_outside_bases(
mut slf: PyRefMut<'_, Self>,
allow: bool,
) -> PyResult<PyRefMut<'_, Self>> {
slf.builder.with_allow_external_blob_outside_bases(allow);
Ok(slf)
}

pub fn execute(&mut self, new_data: &Bound<PyAny>) -> PyResult<Py<PyAny>> {
let py = new_data.py();
let new_data = convert_reader(new_data)?;
Expand Down
138 changes: 134 additions & 4 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,17 @@ struct MergeInsertParams {
source_dedupe_behavior: SourceDedupeBehavior,
// Number of inner commit retries for manifest version conflicts. Default is 20.
commit_retries: Option<u32>,
// Allow writing external blob URIs that cannot be mapped to a registered base.
allow_external_blob_outside_bases: bool,
}

impl MergeInsertParams {
fn write_params(&self) -> WriteParams {
WriteParams {
allow_external_blob_outside_bases: self.allow_external_blob_outside_bases,
..Default::default()
}
}
}

/// A MergeInsertJob inserts new rows, deletes old rows, and updates existing rows all as
Expand Down Expand Up @@ -459,6 +470,7 @@ impl MergeInsertBuilder {
use_index: true,
source_dedupe_behavior: SourceDedupeBehavior::Fail,
commit_retries: None,
allow_external_blob_outside_bases: false,
},
})
}
Expand Down Expand Up @@ -557,6 +569,16 @@ impl MergeInsertBuilder {
self
}

/// Configure whether external blob URIs outside registered bases are allowed.
///
/// By default, external blob URIs must resolve to a registered non-dataset-root
/// base path. Set this to true to store unmatched external URIs as absolute
/// references.
pub fn with_allow_external_blob_outside_bases(&mut self, allow: bool) -> &mut Self {
self.params.allow_external_blob_outside_bases = allow;
self
}

/// Crate a merge insert job
pub fn try_build(&mut self) -> Result<MergeInsertJob> {
if !self.params.insert_not_matched
Expand Down Expand Up @@ -894,6 +916,7 @@ impl MergeInsertJob {
dataset: Arc<Dataset>,
source: SendableRecordBatchStream,
current_version: u64,
write_params: WriteParams,
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
) -> Result<(Vec<Fragment>, Vec<Fragment>, Vec<u32>)> {
// Expected source schema: _rowaddr, updated_cols*
use datafusion::logical_expr::{col, lit};
Expand Down Expand Up @@ -1154,6 +1177,7 @@ impl MergeInsertJob {
batches: Vec<RecordBatch>,
new_fragments: Arc<Mutex<Vec<Fragment>>>,
reservation_size: usize,
write_params: WriteParams,
) -> Result<usize> {
// Batches still have _rowaddr (used elsewhere to merge with existing data)
// We need to remove it before writing to Lance files.
Expand Down Expand Up @@ -1184,8 +1208,8 @@ impl MergeInsertJob {
&dataset.base,
write_schema,
stream,
Default::default(), // TODO: support write params.
None, // Merge insert doesn't use target_bases
write_params,
None, // Merge insert doesn't use target_bases
)
.await?;

Expand Down Expand Up @@ -1258,6 +1282,7 @@ impl MergeInsertJob {
batches,
new_fragments.clone(),
memory_size,
write_params.clone(),
);
Comment on lines 1290 to 1294

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Propagate blob write params to matched-update fragment rewrites

When allow_external_blob_outside_bases is enabled, the option is only forwarded on the insert/new-fragment branch, but matched-row updates in the update_fragments path still write via open_writer / Updater defaults. In merge-insert runs with a source subschema (the path that enters update_fragments), updating blob columns with external URIs still uses default blob validation and can reject URIs outside registered bases even though the caller explicitly enabled this option. This makes the new flag ineffective for a real merge-insert update scenario.

Useful? React with 👍 / 👎.

tasks.spawn(fut);
}
Expand Down Expand Up @@ -1707,6 +1732,7 @@ impl MergeInsertJob {
self.dataset.clone(),
Box::pin(stream),
self.dataset.manifest.version + 1,
self.params.write_params(),
)
.await?;

Expand All @@ -1731,7 +1757,7 @@ impl MergeInsertJob {
&self.dataset.base,
self.dataset.schema().clone(),
Box::pin(stream),
WriteParams::default(),
self.params.write_params(),
None, // Merge insert doesn't use target_bases
)
.await?;
Expand Down Expand Up @@ -2362,6 +2388,7 @@ impl Merger {
#[cfg(test)]
mod tests {
use super::*;
use crate::blob::{BlobArrayBuilder, blob_field};
use crate::dataset::scanner::ColumnOrdering;
use crate::dataset::write::merge_insert::inserted_rows::{
KeyExistenceFilter, KeyExistenceFilterBuilder, extract_key_value_from_batch,
Expand Down Expand Up @@ -2391,7 +2418,7 @@ mod tests {
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::{FutureExt, StreamExt, TryStreamExt, future::try_join_all};
use lance_arrow::FixedSizeListArrayExt;
use lance_core::utils::tempfile::TempStrDir;
use lance_core::utils::tempfile::{TempDir, TempStrDir};
use lance_datafusion::{datagen::DatafusionDatagenExt, utils::reader_to_stream};
use lance_datagen::{BatchCount, Dimension, RowCount, Seed, array};
use lance_index::IndexType;
Expand All @@ -2409,6 +2436,39 @@ mod tests {
t
}

fn blob_merge_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
blob_field("blob", true),
]))
}

fn blob_batch(schema: Arc<Schema>, id: i32, blob_array: arrow_array::ArrayRef) -> RecordBatch {
RecordBatch::try_new(
schema,
vec![Arc::new(Int32Array::from(vec![id])), blob_array],
)
.unwrap()
}

async fn blob_merge_dataset(dataset_dir: &TempDir) -> Dataset {
let schema = blob_merge_schema();
let mut blob_builder = BlobArrayBuilder::new(1);
blob_builder.push_bytes(b"initial").unwrap();
let batch = blob_batch(schema.clone(), 1, blob_builder.finish().unwrap());

Dataset::write(
RecordBatchIterator::new(vec![Ok(batch)], schema),
&dataset_dir.path_str(),
Some(WriteParams {
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
}),
)
.await
.unwrap()
}

async fn check_then_refresh_dataset(
new_data: RecordBatch,
mut job: MergeInsertJob,
Expand Down Expand Up @@ -2670,6 +2730,76 @@ mod tests {
}
}

#[tokio::test]
async fn test_merge_insert_rejects_external_blobs_outside_bases_by_default() {
let dataset_dir = TempDir::default();
let external_dir = TempDir::default();
let external_path = external_dir.std_path().join("external.bin");
std::fs::write(&external_path, b"merge-external").unwrap();
let external_uri = format!("file://{}", external_path.display());

let dataset = Arc::new(blob_merge_dataset(&dataset_dir).await);
let schema = blob_merge_schema();
let mut blob_builder = BlobArrayBuilder::new(1);
blob_builder.push_uri(external_uri).unwrap();
let batch = blob_batch(schema.clone(), 2, blob_builder.finish().unwrap());

let err = MergeInsertBuilder::try_new(dataset, vec!["id".to_string()])
.unwrap()
.try_build()
.unwrap()
.execute_reader(Box::new(RecordBatchIterator::new([Ok(batch)], schema)))
.await
.unwrap_err();

assert!(
err.to_string()
.contains("outside registered external bases"),
"{err:?}"
);
}

#[tokio::test]
async fn test_merge_insert_allows_external_blobs_outside_bases() {
let dataset_dir = TempDir::default();
let external_dir = TempDir::default();
let external_path = external_dir.std_path().join("external.bin");
std::fs::write(&external_path, b"merge-external").unwrap();
let external_uri = format!("file://{}", external_path.display());

let dataset = Arc::new(blob_merge_dataset(&dataset_dir).await);
let schema = blob_merge_schema();
let mut blob_builder = BlobArrayBuilder::new(1);
blob_builder.push_uri(external_uri).unwrap();
let batch = blob_batch(schema.clone(), 2, blob_builder.finish().unwrap());

let (dataset, stats) = MergeInsertBuilder::try_new(dataset, vec!["id".to_string()])
.unwrap()
.with_allow_external_blob_outside_bases(true)
.try_build()
.unwrap()
.execute_reader(Box::new(RecordBatchIterator::new([Ok(batch)], schema)))
.await
.unwrap();

assert_eq!(stats.num_inserted_rows, 1);
let blobs = dataset
.take_blobs_by_indices(&[0, 1], "blob")
.await
.unwrap();
let payloads = try_join_all(
blobs
.into_iter()
.map(|blob| async move { blob.read().await.map(|bytes| bytes.as_ref().to_vec()) }),
)
.await
.unwrap();
assert!(
payloads.iter().any(|payload| payload == b"merge-external"),
"{payloads:?}"
);
}

#[tokio::test]
async fn test_merge_insert_defaults_to_unenforced_primary_key() {
// Define a simple schema with an unenforced primary key on `id`.
Expand Down
Loading