diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index aa051cd14..f3c102ae9 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -107,6 +107,7 @@ set(ICEBERG_SOURCES update/merging_snapshot_update.cc update/overwrite_files.cc update/pending_update.cc + update/rewrite_files.cc update/row_delta.cc update/set_snapshot.cc update/snapshot_manager.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 58d79a402..c45056e22 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -132,6 +132,7 @@ iceberg_sources = files( 'update/merging_snapshot_update.cc', 'update/overwrite_files.cc', 'update/pending_update.cc', + 'update/rewrite_files.cc', 'update/row_delta.cc', 'update/set_snapshot.cc', 'update/snapshot_manager.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 9dbc5acf7..0a2b54082 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -36,6 +36,7 @@ #include "iceberg/update/fast_append.h" #include "iceberg/update/merge_append.h" #include "iceberg/update/overwrite_files.h" +#include "iceberg/update/rewrite_files.h" #include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" @@ -245,6 +246,12 @@ Result> Table::NewOverwrite() { return OverwriteFiles::Make(name().name, std::move(ctx)); } +Result> Table::NewRewriteFiles() { + ICEBERG_ASSIGN_OR_RAISE( + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return RewriteFiles::Make(name().name, std::move(ctx)); +} + Result> Table::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE( auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); @@ -360,6 +367,10 @@ Result> StaticTable::NewOverwrite() { return NotSupported("Cannot create an overwrite for a static table"); } +Result> StaticTable::NewRewriteFiles() { + return NotSupported("Cannot create a rewrite files for a static table"); +} + Result> StaticTable::NewSnapshotManager() { return NotSupported("Cannot create a snapshot manager for a static table"); } diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 64ed21ef8..b52b5d29d 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -188,6 +188,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new OverwriteFiles to overwrite data files and commit the changes. virtual Result> NewOverwrite(); + /// \brief Create a new RewriteFiles to replace files in this table and commit the + /// changes. + virtual Result> NewRewriteFiles(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. virtual Result> NewSnapshotManager(); @@ -263,6 +267,8 @@ class ICEBERG_EXPORT StaticTable : public Table { Result> NewOverwrite() override; + Result> NewRewriteFiles() override; + Result> NewSnapshotManager() override; private: diff --git a/src/iceberg/test/.meson.build.swp b/src/iceberg/test/.meson.build.swp deleted file mode 100644 index 58638fe24..000000000 Binary files a/src/iceberg/test/.meson.build.swp and /dev/null differ diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 0756c1eef..b53449319 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -230,6 +230,7 @@ if(ICEBERG_BUILD_BUNDLE) merge_append_test.cc merging_snapshot_update_test.cc name_mapping_update_test.cc + rewrite_files_test.cc row_delta_test.cc snapshot_manager_test.cc transaction_test.cc diff --git a/src/iceberg/test/merging_snapshot_update_test.cc b/src/iceberg/test/merging_snapshot_update_test.cc index 1b50a124e..5fab810ef 100644 --- a/src/iceberg/test/merging_snapshot_update_test.cc +++ b/src/iceberg/test/merging_snapshot_update_test.cc @@ -1405,6 +1405,43 @@ TEST_F(MergingSnapshotUpdateTest, IsError(ErrorKind::kValidationFailed)); } +TEST_F(MergingSnapshotUpdateTest, + ValidateNoNewDeletesForDataFilesIgnoresEqualityDeletesWhenFlagIsTrue) { + // This tests the behavior that RewriteFiles::SetDataSequenceNumber() and + // RewriteFiles::RewriteDataFiles() enable: when a data sequence number is + // set for rewritten data files, concurrent equality deletes at higher + // sequence numbers still apply to the new files and are NOT a conflict. + // Only position deletes should still fail (tested separately by + // ValidateNoNewDeletesForDataFilesFailsOnPositionDeleteWhenIgnoringEqualityDeletes). + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + + auto del_file = MakeEqualityDeleteFile("/delete/del_a.parquet", 1L); + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwriteUpdate()); + EXPECT_THAT(op->AddDelete(del_file), IsOk()); + const int64_t second_snapshot_id = op->GeneratedSnapshotId(); + ICEBERG_UNWRAP_OR_FAIL(auto manifests, op->Apply(*table_->metadata(), first_snapshot)); + ICEBERG_UNWRAP_OR_FAIL( + auto second_snapshot, + MakeSyntheticSnapshot(DataOperation::kOverwrite, second_snapshot_id, + first_snapshot->snapshot_id, + first_snapshot->sequence_number + 1, manifests)); + + auto metadata = std::make_shared(*table_->metadata()); + metadata->snapshots.push_back(second_snapshot); + metadata->current_snapshot_id = second_snapshot->snapshot_id; + metadata->last_sequence_number = second_snapshot->sequence_number; + + DataFileSet replaced_files; + replaced_files.insert(file_a_); + // With ignore_equality_deletes=true, concurrently-added equality deletes + // should NOT cause a conflict. + EXPECT_THAT(TestMergeAppend::ValidateNoNewDeletesForDataFilesForTest( + *metadata, first_snapshot->snapshot_id, replaced_files, second_snapshot, + file_io_, /*ignore_equality_deletes=*/true), + IsOk()); +} + TEST_F(MergingSnapshotUpdateTest, ValidateNoNewDeletesForDataFilesUsesConfiguredCaseSensitivity) { CommitFileA(); diff --git a/src/iceberg/test/rewrite_files_test.cc b/src/iceberg/test/rewrite_files_test.cc new file mode 100644 index 000000000..f3f6f2c61 --- /dev/null +++ b/src/iceberg/test/rewrite_files_test.cc @@ -0,0 +1,958 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/rewrite_files.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/avro/avro_register.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/mock_catalog.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/update/row_delta.h" +#include "iceberg/update/update_properties.h" + +namespace iceberg { + +class RewriteFilesTest : public MinimalUpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + void SetUp() override { + MinimalUpdateTestBase::SetUp(); + table_->metadata()->format_version = format_version(); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + + file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L); + file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L); + rewritten_file_a_ = + MakeDataFile("/data/file_a_rewritten.parquet", /*partition_x=*/1L); + rewritten_file_b_ = + MakeDataFile("/data/file_b_rewritten.parquet", /*partition_x=*/2L); + delete_file_a_ = MakeDeleteFile("/data/delete_a.parquet", /*partition_x=*/1L); + rewritten_delete_file_a_ = + MakeDeleteFile("/data/delete_a_rewritten.parquet", /*partition_x=*/1L); + eq_delete_file_ = + MakeEqualityDeleteFile("/data/eq_delete_a.parquet", /*partition_x=*/1L); + } + + virtual int8_t format_version() const { + return TableMetadata::kDefaultTableFormatVersion; + } + + /// \brief Skip the current parameterization if format version is below the given + /// minimum. + void AssumeFormatVersionAtLeast(int8_t min_version) { + if (format_version() < min_version) { + GTEST_SKIP() << "Requires format version >= " << static_cast(min_version); + } + } + + std::shared_ptr MakeDataFile(const std::string& path, int64_t partition_x) { + auto f = std::make_shared(); + f->content = DataFile::Content::kData; + f->file_path = table_location_ + path; + f->file_format = FileFormatType::kParquet; + f->partition = PartitionValues(std::vector{Literal::Long(partition_x)}); + f->file_size_in_bytes = 1024; + f->record_count = 100; + f->partition_spec_id = spec_->spec_id(); + return f; + } + + std::shared_ptr MakeDeleteFile(const std::string& path, int64_t partition_x) { + auto f = MakeDataFile(path, partition_x); + f->content = DataFile::Content::kPositionDeletes; + return f; + } + + std::shared_ptr MakeEqualityDeleteFile(const std::string& path, + int64_t partition_x) { + auto f = MakeDeleteFile(path, partition_x); + f->content = DataFile::Content::kEqualityDeletes; + f->equality_ids = {1}; + return f; + } + + Result> NewRewriteFiles() { + return table_->NewRewriteFiles(); + } + + /// \brief Commit file_a_ with FastAppend so the table has data to rewrite. + void CommitFileA() { + ICEBERG_UNWRAP_OR_FAIL(auto fa, table_->NewFastAppend()); + fa->AppendFile(file_a_); + EXPECT_THAT(fa->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + /// \brief Read all manifest entries from a set of manifests. + Result> ReadAllEntries( + std::span manifests) { + std::vector result; + for (const auto& manifest : manifests) { + ICEBERG_ASSIGN_OR_RAISE( + auto spec, table_->metadata()->PartitionSpecById(manifest.partition_spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, file_io_, schema_, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + result.insert(result.end(), entries.begin(), entries.end()); + } + return result; + } + + /// \brief Get data manifests from a snapshot. + Result> DataManifests( + const std::shared_ptr& snapshot) { + SnapshotCache snapshot_cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(file_io_)); + return std::vector(manifests.begin(), manifests.end()); + } + + /// \brief Get delete manifests from a snapshot. + Result> DeleteManifests( + const std::shared_ptr& snapshot) { + SnapshotCache snapshot_cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DeleteManifests(file_io_)); + return std::vector(manifests.begin(), manifests.end()); + } + + /// \brief List Avro metadata files in the mock filesystem. + std::vector MetadataAvroFiles() { + auto arrow_io = std::dynamic_pointer_cast(file_io_); + EXPECT_NE(arrow_io, nullptr); + if (arrow_io == nullptr) { + return {}; + } + + ::arrow::fs::FileSelector selector; + selector.base_dir = table_location_ + "/metadata"; + selector.recursive = false; + + auto maybe_infos = arrow_io->fs()->GetFileInfo(selector); + EXPECT_TRUE(maybe_infos.ok()) << maybe_infos.status().ToString(); + if (!maybe_infos.ok()) { + return {}; + } + + std::vector files; + for (const auto& info : maybe_infos.ValueOrDie()) { + if (info.type() == ::arrow::fs::FileType::File && info.path().ends_with(".avro")) { + files.push_back(info.path()); + } + } + std::ranges::sort(files); + return files; + } + + /// \brief Set commit retry properties on the table. + void SetCommitRetryProperties(int32_t retries) { + ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties()); + props->Set(std::string(TableProperties::kCommitNumRetries.key()), + std::to_string(retries)); + props->Set(std::string(TableProperties::kCommitMinRetryWaitMs.key()), "1"); + props->Set(std::string(TableProperties::kCommitMaxRetryWaitMs.key()), "1"); + props->Set(std::string(TableProperties::kCommitTotalRetryTimeMs.key()), "1000"); + EXPECT_THAT(props->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + /// \brief Re-bind table_ to use a mock catalog that fails UpdateTable N times, + /// then delegates to the real catalog. The call_count pointer is incremented on + /// each UpdateTable call (successful or not). + void BindTableWithFailingCommits(int failures, int* update_call_count = nullptr) { + auto mock_catalog = std::make_shared<::testing::NiceMock>(); + std::weak_ptr<::testing::NiceMock> weak_catalog = mock_catalog; + + ON_CALL(*mock_catalog, LoadTable(::testing::_)) + .WillByDefault([this, weak_catalog](const TableIdentifier& identifier) + -> Result> { + ICEBERG_ASSIGN_OR_RAISE(auto loaded, catalog_->LoadTable(identifier)); + auto catalog = weak_catalog.lock(); + ICEBERG_PRECHECK(catalog != nullptr, "Mock catalog expired"); + return Table::Make(loaded->name(), loaded->metadata(), + std::string(loaded->metadata_file_location()), loaded->io(), + catalog); + }); + + ON_CALL(*mock_catalog, UpdateTable(::testing::_, ::testing::_, ::testing::_)) + .WillByDefault( + [this, weak_catalog, failures, update_call_count]( + const TableIdentifier& identifier, + const std::vector>& requirements, + const std::vector>& updates) mutable + -> Result> { + if (update_call_count != nullptr) { + ++*update_call_count; + } + if (failures-- > 0) { + return CommitFailed("Injected failure"); + } + ICEBERG_ASSIGN_OR_RAISE( + auto updated, catalog_->UpdateTable(identifier, requirements, updates)); + auto catalog = weak_catalog.lock(); + ICEBERG_PRECHECK(catalog != nullptr, "Mock catalog expired"); + return Table::Make(updated->name(), updated->metadata(), + std::string(updated->metadata_file_location()), + updated->io(), catalog); + }); + + ICEBERG_UNWRAP_OR_FAIL(auto bound_table, + Table::Make(table_->name(), table_->metadata(), + std::string(table_->metadata_file_location()), + table_->io(), mock_catalog)); + table_ = std::move(bound_table); + mock_catalogs_.push_back(std::move(mock_catalog)); + } + + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; + std::shared_ptr rewritten_file_a_; + std::shared_ptr rewritten_file_b_; + std::shared_ptr delete_file_a_; + std::shared_ptr rewritten_delete_file_a_; + std::shared_ptr eq_delete_file_; + + /// \brief Mock catalogs kept alive during the test (for failure injection). + std::vector>> mock_catalogs_; +}; + +class RewriteFilesFormatVersionTest : public RewriteFilesTest, + public ::testing::WithParamInterface { + protected: + int8_t format_version() const override { return GetParam(); } +}; + +// ============================================================================ +// Tests that run on all format versions (v1+) +// ============================================================================ + +// Rewrite a single data file: replace file_a_ with rewritten_file_a_. +TEST_P(RewriteFilesFormatVersionTest, AddAndDelete) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kReplace); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles)), + 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles)), 1); +} + +TEST_P(RewriteFilesFormatVersionTest, DeleteDataFileCopiesCallerFile) { + CommitFileA(); + + const std::string original_path = file_a_->file_path; + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + + file_a_->file_path = file_b_->file_path; + + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto rw_missing_original, NewRewriteFiles()); + auto missing_file = std::make_shared(*file_a_); + missing_file->file_path = original_path; + rw_missing_original->DeleteDataFile(missing_file); + rw_missing_original->AddDataFile(file_b_); + auto result = rw_missing_original->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Missing required files to delete")); +} + +// Rewrite one of several data files, verifying only the target is affected. +TEST_P(RewriteFilesFormatVersionTest, AddAndDeletePartialRewrite) { + CommitFileA(); + + { + ICEBERG_UNWRAP_OR_FAIL(auto fa, table_->NewFastAppend()); + fa->AppendFile(file_b_); + EXPECT_THAT(fa->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kReplace); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles)), + 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles)), 1); +} + +// Rewrite via the 4-set Rewrite() API replacing data files only. +TEST_P(RewriteFilesFormatVersionTest, Rewrite) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->Rewrite({file_a_}, {}, {rewritten_file_a_}, {}); + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kReplace); +} + +// Limiting validation scope to after a given snapshot avoids spurious conflicts. +TEST_P(RewriteFilesFormatVersionTest, ValidateFromSnapshot) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + auto snapshot_id = snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->ValidateFromSnapshot(snapshot_id); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + EXPECT_THAT(rw->Commit(), IsOk()); +} + +// Committing a rewrite to the main branch via ToBranch. +TEST_P(RewriteFilesFormatVersionTest, ToBranch) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->ToBranch("main"); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + EXPECT_THAT(rw->Commit(), IsOk()); +} + +// Null check on DeleteDataFile. +TEST_P(RewriteFilesFormatVersionTest, DeleteDataFileNullCheck) { + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(nullptr); + EXPECT_THAT(rw->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +// Null check on AddDataFile. +TEST_P(RewriteFilesFormatVersionTest, AddDataFileNullCheck) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(nullptr); + EXPECT_THAT(rw->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +// Null checks on AddDeleteFile +TEST_P(RewriteFilesFormatVersionTest, AddDeleteFileNullCheck) { + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->AddDeleteFile(nullptr); + EXPECT_THAT(rw->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +// Adding a data file after deleting one — the basic RewriteFiles pattern. +TEST_P(RewriteFilesFormatVersionTest, AddDataFile) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + EXPECT_THAT(rw->Commit(), IsOk()); +} + +// Deleting a file that was never added fails with missing required files. +TEST_P(RewriteFilesFormatVersionTest, DeleteNonExistentFile) { + CommitFileA(); // table now has file_a_ + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + // file_b_ was never added — deleting it should fail with missing required files + rw->DeleteDataFile(file_b_); + rw->AddDataFile(rewritten_file_b_); + auto result = rw->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Missing required files to delete")); +} + +// Rewriting a file that was already deleted in a prior commit must fail. +TEST_P(RewriteFilesFormatVersionTest, AlreadyDeletedFile) { + CommitFileA(); // table now has file_a_ + + // First rewrite: file_a_ → rewritten_file_a_ + { + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + // Second rewrite: try to delete file_a_ again (already deleted) + { + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(file_b_); + auto result = rw->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Missing required files to delete")); + } +} + +// Inject commit failures that exhaust all retries, then verify the commit +// ultimately fails with CommitFailed. +TEST_P(RewriteFilesFormatVersionTest, Failure) { + CommitFileA(); + + constexpr int32_t kRetries = 3; + constexpr int32_t kInjectedFailures = kRetries + 1; // more failures than retries + int call_count = 0; + + SetCommitRetryProperties(kRetries); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_before, table_->current_snapshot()); + const auto sequence_number_before = table_->metadata()->last_sequence_number; + const auto metadata_avro_files_before = MetadataAvroFiles(); + + BindTableWithFailingCommits(kInjectedFailures, &call_count); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + + auto result = rw->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + // We expect 1 (initial attempt) + kRetries (retries) = kRetries + 1 calls, + // all of which fail. + EXPECT_EQ(call_count, kRetries + 1); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_after, table_->current_snapshot()); + EXPECT_EQ(snapshot_after->snapshot_id, snapshot_before->snapshot_id); + EXPECT_EQ(table_->metadata()->last_sequence_number, sequence_number_before); + EXPECT_EQ(MetadataAvroFiles(), metadata_avro_files_before); +} + +// Inject transient commit failures that stay within the retry budget, then +// verify the commit eventually succeeds with the correct state. +TEST_P(RewriteFilesFormatVersionTest, Recovery) { + CommitFileA(); + + constexpr int32_t kRetries = 4; + constexpr int32_t kInjectedFailures = 2; // fewer failures than retries + int call_count = 0; + + SetCommitRetryProperties(kRetries); + BindTableWithFailingCommits(kInjectedFailures, &call_count); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + // We expect 1 initial attempt + kInjectedFailures retries = + // kInjectedFailures + 1 calls, with the last one succeeding. + EXPECT_EQ(call_count, kInjectedFailures + 1); + + // Verify the rewrite actually succeeded + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kReplace); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles)), + 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles)), 1); +} + +// ============================================================================ +// Tests that require format version >= 2 +// ============================================================================ + +// Rewrite with an explicit data sequence number via SetDataSequenceNumber. +TEST_P(RewriteFilesFormatVersionTest, DataSequenceNumber) { + AssumeFormatVersionAtLeast(2); + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->SetDataSequenceNumber(5); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kReplace); +} + +// Bulk rewrite with sequence number via the RewriteDataFiles convenience method. +TEST_P(RewriteFilesFormatVersionTest, RewriteDataFiles) { + AssumeFormatVersionAtLeast(2); + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->RewriteDataFiles({file_a_}, {rewritten_file_a_}, /*sequence_number=*/3); + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kReplace); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles)), + 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles)), 1); +} + +// Deleting a file from an empty table fails with missing required files. +TEST_P(RewriteFilesFormatVersionTest, EmptyTable) { + AssumeFormatVersionAtLeast(2); + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + auto result = rw->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Missing required files to delete")); +} + +// Only adding files without any deletions must fail validation. +TEST_P(RewriteFilesFormatVersionTest, DeleteOnly) { + AssumeFormatVersionAtLeast(2); + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->AddDataFile(rewritten_file_a_); // no FileToDelete → must fail + EXPECT_THAT(rw->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +// Adding data files without deleting data, or adding delete files without deleting +// delete files, must fail validation. +TEST_P(RewriteFilesFormatVersionTest, AddOnly) { + AssumeFormatVersionAtLeast(2); + // Sub-case 1: adding data files without deleting any data files should fail + { + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDeleteFile(delete_file_a_); + rw->AddDataFile(rewritten_file_a_); + EXPECT_THAT(rw->Commit(), IsError(ErrorKind::kValidationFailed)); + } + + // Sub-case 2: adding delete files without deleting any delete files should fail + { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->DeleteDataFile(file_a_); + rw->AddDeleteFile(rewritten_delete_file_a_); + EXPECT_THAT(rw->Commit(), IsError(ErrorKind::kValidationFailed)); + } +} + +// Rewrite both data and delete files in a single commit, verifying that the +// manifest entries have the correct statuses (DELETED for replaced files, +// ADDED for new files). ValidateFromSnapshot scopes conflict detection to +// after the RowDelta commit so the delete being rewritten is not flagged as +// a concurrent addition. +TEST_P(RewriteFilesFormatVersionTest, RewriteDataAndDeleteFiles) { + AssumeFormatVersionAtLeast(2); + // Create data file via FastAppend + CommitFileA(); + + // Create delete file via RowDelta + { + ICEBERG_UNWRAP_OR_FAIL(auto row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(delete_file_a_); + EXPECT_THAT(row_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + // Snapshot after RowDelta: the delete file exists from this point onward. + ICEBERG_UNWRAP_OR_FAIL(auto after_delta_snapshot, table_->current_snapshot()); + + // Rewrite both data and delete files + { + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->ValidateFromSnapshot(after_delta_snapshot->snapshot_id); + rw->DeleteDataFile(file_a_); + rw->DeleteDeleteFile(delete_file_a_); + rw->AddDataFile(rewritten_file_a_); + rw->AddDeleteFile(rewritten_delete_file_a_); + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + // Verify snapshot summary + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kReplace); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles)), + 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles)), 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kRemovedDeleteFiles)), + 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles)), + 1); + + // Verify manifest entry statuses + ICEBERG_UNWRAP_OR_FAIL(auto data_manifests, DataManifests(snapshot)); + ICEBERG_UNWRAP_OR_FAIL(auto data_entries, ReadAllEntries(data_manifests)); + ASSERT_EQ(data_entries.size(), 2); + for (const auto& entry : data_entries) { + if (entry.data_file->file_path == file_a_->file_path) { + EXPECT_EQ(entry.status, ManifestStatus::kDeleted); + } else if (entry.data_file->file_path == rewritten_file_a_->file_path) { + EXPECT_EQ(entry.status, ManifestStatus::kAdded); + EXPECT_EQ(entry.snapshot_id, snapshot->snapshot_id); + } else { + FAIL() << "Unexpected data file: " << entry.data_file->file_path; + } + } + + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifests, DeleteManifests(snapshot)); + ICEBERG_UNWRAP_OR_FAIL(auto delete_entries, ReadAllEntries(delete_manifests)); + ASSERT_EQ(delete_entries.size(), 2); + for (const auto& entry : delete_entries) { + if (entry.data_file->file_path == delete_file_a_->file_path) { + EXPECT_EQ(entry.status, ManifestStatus::kDeleted); + } else if (entry.data_file->file_path == rewritten_delete_file_a_->file_path) { + EXPECT_EQ(entry.status, ManifestStatus::kAdded); + EXPECT_EQ(entry.snapshot_id, snapshot->snapshot_id); + } else { + FAIL() << "Unexpected delete file: " << entry.data_file->file_path; + } + } +} + +// Rewrite data files with an explicit old data sequence number, then verify +// that the rewritten manifest entry carries the assigned sequence number. +TEST_P(RewriteFilesFormatVersionTest, RewriteDataAndAssignOldSequenceNumber) { + AssumeFormatVersionAtLeast(2); + CommitFileA(); + + constexpr int64_t kOldSequenceNumber = 1; + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->SetDataSequenceNumber(kOldSequenceNumber); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + ICEBERG_UNWRAP_OR_FAIL(auto data_manifests, DataManifests(snapshot)); + ICEBERG_UNWRAP_OR_FAIL(auto data_entries, ReadAllEntries(data_manifests)); + ASSERT_EQ(data_entries.size(), 2); + bool found_rewritten = false; + for (const auto& entry : data_entries) { + if (entry.data_file->file_path == rewritten_file_a_->file_path) { + found_rewritten = true; + EXPECT_EQ(entry.status, ManifestStatus::kAdded); + ASSERT_TRUE(entry.sequence_number.has_value()); + EXPECT_EQ(entry.sequence_number.value(), kOldSequenceNumber); + } else if (entry.data_file->file_path != file_a_->file_path) { + FAIL() << "Unexpected data file: " << entry.data_file->file_path; + } + } + EXPECT_TRUE(found_rewritten) << "Rewritten data file should be present"; +} + +// Create equality deletes via RowDelta then rewrite them as position deletes +// in a single RewriteFiles commit. +TEST_P(RewriteFilesFormatVersionTest, ReplaceEqualityDeletesWithPositionDeletes) { + AssumeFormatVersionAtLeast(2); + CommitFileA(); + + // Add an equality delete via RowDelta + { + ICEBERG_UNWRAP_OR_FAIL(auto row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(eq_delete_file_); + EXPECT_THAT(row_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto after_delta_snapshot, table_->current_snapshot()); + + // Replace the equality delete with a position delete + { + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->ValidateFromSnapshot(after_delta_snapshot->snapshot_id); + rw->DeleteDataFile(file_a_); + rw->DeleteDeleteFile(eq_delete_file_); + rw->AddDataFile(rewritten_file_a_); + rw->AddDeleteFile(rewritten_delete_file_a_); + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kReplace); + EXPECT_EQ( + std::stoll(snapshot->summary.at(SnapshotSummaryFields::kRemovedEqDeleteFiles)), 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kAddedPosDeleteFiles)), + 1); + + // Verify the delete manifest shows the eq delete as DELETED and pos delete as ADDED + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifests, DeleteManifests(snapshot)); + ICEBERG_UNWRAP_OR_FAIL(auto delete_entries, ReadAllEntries(delete_manifests)); + bool found_deleted_eq = false; + bool found_added_pos = false; + for (const auto& entry : delete_entries) { + if (entry.status == ManifestStatus::kDeleted && + entry.data_file->content == DataFile::Content::kEqualityDeletes) { + found_deleted_eq = true; + } + if (entry.status == ManifestStatus::kAdded && + entry.data_file->content == DataFile::Content::kPositionDeletes) { + found_added_pos = true; + } + } + EXPECT_TRUE(found_deleted_eq) << "Equality delete should be marked DELETED"; + EXPECT_TRUE(found_added_pos) << "Position delete should be marked ADDED"; +} + +// Remove all deletes: create a data file and an associated equality delete, +// then rewrite the data file while removing the delete file entirely (empty +// delete add set). +TEST_P(RewriteFilesFormatVersionTest, RemoveAllDeletes) { + AssumeFormatVersionAtLeast(2); + CommitFileA(); + + // Add an equality delete via RowDelta + { + ICEBERG_UNWRAP_OR_FAIL(auto row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(eq_delete_file_); + EXPECT_THAT(row_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + // Verify delete file exists before rewrite + { + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifests, DeleteManifests(snapshot)); + EXPECT_GT(delete_manifests.size(), 0); + } + + ICEBERG_UNWRAP_OR_FAIL(auto after_delta_snapshot, table_->current_snapshot()); + + // Rewrite: delete the data file and the equality delete, add rewritten data, + // add no new delete files (empty delete add set). + { + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->ValidateFromSnapshot(after_delta_snapshot->snapshot_id); + rw->DeleteDataFile(file_a_); + rw->DeleteDeleteFile(eq_delete_file_); + rw->AddDataFile(rewritten_file_a_); + // no AddDeleteFile call — delete add set is empty + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kReplace); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles)), + 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles)), 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kRemovedDeleteFiles)), + 1); + // No added delete files expected + EXPECT_EQ(snapshot->summary.count(SnapshotSummaryFields::kAddedDeleteFiles), 0); + + // Verify delete manifests show the eq delete as DELETED and no added deletes + ICEBERG_UNWRAP_OR_FAIL(auto delete_manifests, DeleteManifests(snapshot)); + ICEBERG_UNWRAP_OR_FAIL(auto delete_entries, ReadAllEntries(delete_manifests)); + bool found_deleted_delete = false; + for (const auto& entry : delete_entries) { + if (entry.status == ManifestStatus::kDeleted) { + found_deleted_delete = true; + } + EXPECT_NE(entry.status, ManifestStatus::kAdded) + << "No new delete files should be added"; + } + EXPECT_TRUE(found_deleted_delete) << "Original delete file should be marked DELETED"; +} + +// Verify that RewriteFiles detects new delete files that were committed after +// the validation snapshot boundary, preventing data loss. +TEST_P(RewriteFilesFormatVersionTest, NewDeleteFile) { + AssumeFormatVersionAtLeast(2); + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + + // Concurrently add an equality delete targeting the data file we plan to rewrite + { + ICEBERG_UNWRAP_OR_FAIL(auto row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(eq_delete_file_); + EXPECT_THAT(row_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + // Try to rewrite the data file, validating from before the delete was added + { + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->ValidateFromSnapshot(starting_snapshot->snapshot_id); + rw->DeleteDataFile(file_a_); + rw->AddDataFile(rewritten_file_a_); + + auto result = rw->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("found new delete for replaced data file")); + EXPECT_THAT(result, HasErrorMessage(file_a_->file_path)); + } +} + +// Inject commit failures that exhaust retries when rewriting both data and +// delete files. Verify the commit fails with CommitFailed after exhausting +// retries. +TEST_P(RewriteFilesFormatVersionTest, FailureWhenRewriteBothDataAndDeleteFiles) { + AssumeFormatVersionAtLeast(2); + CommitFileA(); + + // Create delete file via RowDelta first + { + ICEBERG_UNWRAP_OR_FAIL(auto row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(delete_file_a_); + EXPECT_THAT(row_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto after_delta_snapshot, table_->current_snapshot()); + + constexpr int32_t kRetries = 2; + constexpr int32_t kInjectedFailures = kRetries + 1; + int call_count = 0; + + SetCommitRetryProperties(kRetries); + const auto sequence_number_before = table_->metadata()->last_sequence_number; + const auto metadata_avro_files_before = MetadataAvroFiles(); + + BindTableWithFailingCommits(kInjectedFailures, &call_count); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->ValidateFromSnapshot(after_delta_snapshot->snapshot_id); + rw->DeleteDataFile(file_a_); + rw->DeleteDeleteFile(delete_file_a_); + rw->AddDataFile(rewritten_file_a_); + rw->AddDeleteFile(rewritten_delete_file_a_); + + auto result = rw->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_EQ(call_count, kRetries + 1); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_after, table_->current_snapshot()); + EXPECT_EQ(snapshot_after->snapshot_id, after_delta_snapshot->snapshot_id); + EXPECT_EQ(table_->metadata()->last_sequence_number, sequence_number_before); + EXPECT_EQ(MetadataAvroFiles(), metadata_avro_files_before); +} + +// Inject transient commit failures that stay within the retry budget when +// rewriting both data and delete files, then verify success. +TEST_P(RewriteFilesFormatVersionTest, RecoverWhenRewriteBothDataAndDeleteFiles) { + AssumeFormatVersionAtLeast(2); + CommitFileA(); + + // Create delete file via RowDelta + { + ICEBERG_UNWRAP_OR_FAIL(auto row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(delete_file_a_); + EXPECT_THAT(row_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto after_delta_snapshot, table_->current_snapshot()); + + constexpr int32_t kRetries = 4; + constexpr int32_t kInjectedFailures = 2; + int call_count = 0; + + SetCommitRetryProperties(kRetries); + BindTableWithFailingCommits(kInjectedFailures, &call_count); + + ICEBERG_UNWRAP_OR_FAIL(auto rw, NewRewriteFiles()); + rw->ValidateFromSnapshot(after_delta_snapshot->snapshot_id); + rw->DeleteDataFile(file_a_); + rw->DeleteDeleteFile(delete_file_a_); + rw->AddDataFile(rewritten_file_a_); + rw->AddDeleteFile(rewritten_delete_file_a_); + + EXPECT_THAT(rw->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + EXPECT_EQ(call_count, kInjectedFailures + 1); + + // Verify the rewrite succeeded and data is correct + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kReplace); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles)), + 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles)), 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kRemovedDeleteFiles)), + 1); + EXPECT_EQ(std::stoll(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles)), + 1); +} + +// ============================================================================ +// TODO(WZhuo): Tests blocked on missing infrastructure in iceberg-cpp. +// ============================================================================ +// +// TODO(RemovingDataFileAlsoRemovesDV): +// Blocked by: format v3 DV auto-cleanup not yet supported. +// Creates data+delete files via RowDelta (v3), rewrites with deleteFile. +// Verifies the DV for the removed data file is automatically cleaned up. +// Java guard: assumeThat(formatVersion).isGreaterThanOrEqualTo(3) +// +// TODO(DeleteWithDuplicateEntriesInManifest): +// Blocked by: cannot yet append the same file twice to create duplicate manifest +// entries. Appends FILE_A twice, then rewrites one copy. Verifies manifest entry +// statuses (DELETED for the rewritten copy, EXISTING for the other). +// Java guard: none (runs on all versions) + +INSTANTIATE_TEST_SUITE_P(FormatVersions, RewriteFilesFormatVersionTest, + ::testing::Values(int8_t{1}, int8_t{2})); + +} // namespace iceberg diff --git a/src/iceberg/test/table_test.cc b/src/iceberg/test/table_test.cc index ef21f12d6..716d92967 100644 --- a/src/iceberg/test/table_test.cc +++ b/src/iceberg/test/table_test.cc @@ -164,6 +164,7 @@ TEST(StaticTableTest, NewMutatingOperationsAreNotSupported) { EXPECT_THAT(table->NewDeleteFiles(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewRowDelta(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewOverwrite(), IsError(ErrorKind::kNotSupported)); + EXPECT_THAT(table->NewRewriteFiles(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewSnapshotManager(), IsError(ErrorKind::kNotSupported)); } diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 169e7ec90..7abce27cb 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -38,6 +38,7 @@ #include "iceberg/update/merge_append.h" #include "iceberg/update/overwrite_files.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/rewrite_files.h" #include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" @@ -521,6 +522,13 @@ Result> Transaction::NewOverwrite() { return overwrite; } +Result> Transaction::NewRewriteFiles() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr rewrite_files, + RewriteFiles::Make(ctx_->table->name().name, ctx_)); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(rewrite_files)); + return rewrite_files; +} + Result> Transaction::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, UpdateStatistics::Make(ctx_)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 49b607d60..47714ba70 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -118,6 +118,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewOverwrite(); + /// \brief Create a new RewriteFiles to replace files in this table and commit the + /// changes. + Result> NewRewriteFiles(); + /// \brief Create a new SnapshotManager to manage snapshots. Result> NewSnapshotManager(); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 784b3e03b..ce11821a2 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -244,6 +244,7 @@ class FastAppend; class MergeAppend; class OverwriteFiles; class PendingUpdate; +class RewriteFiles; class RowDelta; class SetSnapshot; class SnapshotManager; diff --git a/src/iceberg/update/merging_snapshot_update.h b/src/iceberg/update/merging_snapshot_update.h index d11a9b1f9..dbfb79937 100644 --- a/src/iceberg/update/merging_snapshot_update.h +++ b/src/iceberg/update/merging_snapshot_update.h @@ -141,6 +141,11 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { /// \brief Override the data sequence number assigned to all newly-added data files. void SetNewDataFilesDataSequenceNumber(int64_t sequence_number); + /// \brief Returns true if SetNewDataFilesDataSequenceNumber was called. + bool HasDataSequenceNumber() const { + return new_data_files_data_seq_number_.has_value(); + } + /// \brief Set case sensitivity for row filter and expression evaluation. void CaseSensitive(bool case_sensitive); diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 4ba4168d4..83c64f363 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -24,6 +24,7 @@ install_headers( 'merging_snapshot_update.h', 'overwrite_files.h', 'pending_update.h', + 'rewrite_files.h', 'row_delta.h', 'set_snapshot.h', 'snapshot_manager.h', diff --git a/src/iceberg/update/rewrite_files.cc b/src/iceberg/update/rewrite_files.cc new file mode 100644 index 000000000..71943b629 --- /dev/null +++ b/src/iceberg/update/rewrite_files.cc @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/rewrite_files.h" + +#include +#include +#include + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +RewriteFiles::RewriteFiles(std::string table_name, + std::shared_ptr ctx) + : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) { + // Replace files must fail if any of the deleted paths is missing and cannot be deleted + FailMissingDeletePaths(); +} + +Result> RewriteFiles::Make( + std::string table_name, std::shared_ptr ctx) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create RewriteFiles without a context"); + return std::unique_ptr( + new RewriteFiles(std::move(table_name), std::move(ctx))); +} + +RewriteFiles& RewriteFiles::DeleteDataFile(const std::shared_ptr& data_file) { + auto staged_file = + data_file == nullptr ? nullptr : std::make_shared(*data_file); + ICEBERG_BUILDER_RETURN_IF_ERROR(MergingSnapshotUpdate::DeleteDataFile(staged_file)); + replaced_data_files_.insert(std::move(staged_file)); + return *this; +} + +RewriteFiles& RewriteFiles::DeleteDeleteFile( + const std::shared_ptr& delete_file) { + ICEBERG_BUILDER_RETURN_IF_ERROR(MergingSnapshotUpdate::DeleteDeleteFile(delete_file)); + return *this; +} + +RewriteFiles& RewriteFiles::AddDataFile(const std::shared_ptr& file) { + ICEBERG_BUILDER_RETURN_IF_ERROR(MergingSnapshotUpdate::AddDataFile(file)); + return *this; +} + +RewriteFiles& RewriteFiles::AddDeleteFile(const std::shared_ptr& delete_file) { + ICEBERG_BUILDER_RETURN_IF_ERROR(MergingSnapshotUpdate::AddDeleteFile(delete_file)); + return *this; +} + +RewriteFiles& RewriteFiles::AddDeleteFile(const std::shared_ptr& delete_file, + int64_t data_sequence_number) { + ICEBERG_BUILDER_RETURN_IF_ERROR( + MergingSnapshotUpdate::AddDeleteFile(delete_file, data_sequence_number)); + return *this; +} + +RewriteFiles& RewriteFiles::SetDataSequenceNumber(int64_t sequence_number) { + SetNewDataFilesDataSequenceNumber(sequence_number); + return *this; +} + +RewriteFiles& RewriteFiles::RewriteDataFiles( + const std::vector>& files_to_delete, + const std::vector>& files_to_add, int64_t sequence_number) { + SetNewDataFilesDataSequenceNumber(sequence_number); + Rewrite(files_to_delete, {}, files_to_add, {}); + return *this; +} + +RewriteFiles& RewriteFiles::Rewrite( + const std::vector>& data_files_to_replace, + const std::vector>& delete_files_to_replace, + const std::vector>& data_files_to_add, + const std::vector>& delete_files_to_add) { + for (const auto& data_file : data_files_to_replace) { + DeleteDataFile(data_file); + } + + for (const auto& delete_file : delete_files_to_replace) { + DeleteDeleteFile(delete_file); + } + + for (const auto& data_file : data_files_to_add) { + AddDataFile(data_file); + } + + for (const auto& delete_file : delete_files_to_add) { + AddDeleteFile(delete_file); + } + + return *this; +} + +RewriteFiles& RewriteFiles::ValidateFromSnapshot(int64_t snapshot_id) { + starting_snapshot_id_ = snapshot_id; + return *this; +} + +RewriteFiles& RewriteFiles::ToBranch(const std::string& branch) { + SetTargetBranch(branch); + return *this; +} + +std::string RewriteFiles::operation() { return DataOperation::kReplace; } + +void RewriteFiles::ValidateReplacedAndAddedFiles() { + // 1. Files to delete cannot be empty + if (!DeletesDataFiles() && !DeletesDeleteFiles()) { + AddError(ErrorKind::kValidationFailed, "Files to delete cannot be empty"); + return; + } + + // 2. Data files to add must be empty because there's no data file to be rewritten + if (!DeletesDataFiles() && AddsDataFiles()) { + AddError(ErrorKind::kValidationFailed, + "Data files to add must be empty because there's no data file to be " + "rewritten"); + return; + } + + // 3. Delete files to add must be empty because there's no delete file to be rewritten + if (!DeletesDeleteFiles() && AddsDeleteFiles()) { + AddError(ErrorKind::kValidationFailed, + "Delete files to add must be empty because there's no delete file to be " + "rewritten"); + return; + } +} + +Status RewriteFiles::Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) { + // Step 1: Validate the replaced and added files invariants + ValidateReplacedAndAddedFiles(); + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + // Step 2: If there are replaced data files, validate no new row-level deletes + // for those data files have been added concurrently + if (!replaced_data_files_.empty()) { + // The instance method automatically determines ignore_equality_deletes + // based on whether SetDataSequenceNumber was called: + // - If data sequence number was set, equality deletes at higher sequence + // numbers still correctly apply to the new files → ignore them. + // - If no sequence number was set, ALL new deletes are conflicts. + auto io = ctx_->table->io(); + ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateNoNewDeletesForDataFiles( + current_metadata, starting_snapshot_id_, replaced_data_files_, snapshot, + std::move(io), HasDataSequenceNumber())); + } + + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/rewrite_files.h b/src/iceberg/update/rewrite_files.h new file mode 100644 index 000000000..72fa84f4e --- /dev/null +++ b/src/iceberg/update/rewrite_files.h @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/rewrite_files.h +/// RewriteFiles operation for replacing files in a table. + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/merging_snapshot_update.h" +#include "iceberg/util/data_file_set.h" + +namespace iceberg { + +/// \brief API for replacing files in a table. +/// +/// This operation accumulates file additions and deletions, produces a new +/// Snapshot of the changes, and commits that snapshot as the current. +/// +/// When committing, these changes will be applied to the latest table snapshot. +/// Commit conflicts will be resolved by applying the changes to the new latest +/// snapshot and reattempting the commit. If any of the deleted files are no +/// longer in the latest snapshot when reattempting, the commit will throw a +/// ValidationException. +/// +/// Note that the new state of the table after each rewrite must be logically +/// equivalent to the original table state. +class ICEBERG_EXPORT RewriteFiles : public MergingSnapshotUpdate { + public: + /// \brief Create a new RewriteFiles operation. + /// + /// \param table_name The name of the table + /// \param ctx The transaction context + /// \return A unique pointer to the new RewriteFiles operation + static Result> Make( + std::string table_name, std::shared_ptr ctx); + + ~RewriteFiles() override = default; + + /// \brief Remove a data file from the current table state. + /// + /// This rewrite operation may change the size or layout of the data files. + /// The set of live data records must never change. + /// + /// \param data_file a rewritten data file + /// \return this for method chaining + RewriteFiles& DeleteDataFile(const std::shared_ptr& data_file); + + /// \brief Remove a delete file from the table state. + /// + /// This rewrite operation may change the size or layout of the delete files. + /// The set of applicable delete records must never change. + /// + /// \param delete_file a rewritten delete file + /// \return this for method chaining + RewriteFiles& DeleteDeleteFile(const std::shared_ptr& delete_file); + + /// \brief Add a new data file or delete file. + /// + /// This rewrite operation may change the size or layout of the files. + /// The set of live data records must never change. + /// + /// \param file a new file (data or delete) + /// \return this for method chaining + RewriteFiles& AddDataFile(const std::shared_ptr& file); + + /// \brief Add a new delete file. + /// + /// \param delete_file a new delete file + /// \return this for method chaining + RewriteFiles& AddDeleteFile(const std::shared_ptr& delete_file); + + /// \brief Add a new delete file with the given data sequence number. + /// + /// To ensure equivalence in the set of applicable delete records, the + /// sequence number of the delete file must be the max sequence number of + /// the delete files that it is replacing. + /// + /// \param delete_file a new delete file + /// \param data_sequence_number data sequence number to append on the file + /// \return this for method chaining + RewriteFiles& AddDeleteFile(const std::shared_ptr& delete_file, + int64_t data_sequence_number); + + /// \brief Configure the data sequence number for this rewrite operation. + /// + /// This data sequence number will be used for all new data files that are + /// added in this rewrite. This is helpful to avoid commit conflicts between + /// data compaction and adding equality deletes. + /// + /// \param sequence_number a data sequence number + /// \return this for method chaining + RewriteFiles& SetDataSequenceNumber(int64_t sequence_number); + + /// \brief Add a rewrite that replaces one set of data files with another set + /// that contains the same data. The sequence number provided will be used for + /// all the data files added. + /// + /// \param files_to_delete files that will be replaced (deleted) + /// \param files_to_add files that will be added + /// \param sequence_number sequence number to use for all data files added + /// \return this for method chaining + RewriteFiles& RewriteDataFiles( + const std::vector>& files_to_delete, + const std::vector>& files_to_add, + int64_t sequence_number); + + /// \brief Add a rewrite that replaces one set of files with another set that + /// contains the same data. + /// + /// \param data_files_to_replace data files that will be replaced (deleted) + /// \param delete_files_to_replace delete files that will be replaced (deleted) + /// \param data_files_to_add data files that will be added + /// \param delete_files_to_add delete files that will be added + /// \return this for method chaining + RewriteFiles& Rewrite( + const std::vector>& data_files_to_replace, + const std::vector>& delete_files_to_replace, + const std::vector>& data_files_to_add, + const std::vector>& delete_files_to_add); + + /// \brief Set the snapshot ID used in any reads for this operation. + /// + /// Validations will check changes after this snapshot ID. If this is not + /// called, all ancestor snapshots through the table's initial snapshot are + /// validated. + /// + /// \param snapshot_id a snapshot ID + /// \return this for method chaining + RewriteFiles& ValidateFromSnapshot(int64_t snapshot_id); + + /// \brief Perform operations on a particular branch. + /// + /// \param branch name of a SnapshotRef of type branch + /// \return this for method chaining + RewriteFiles& ToBranch(const std::string& branch); + + std::string operation() override; + + protected: + Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) override; + + explicit RewriteFiles(std::string table_name, std::shared_ptr ctx); + + private: + /// \brief Validate the replaced and added files invariants. + /// + /// Ensures that: + /// - Files to delete cannot be empty + /// - Data files to add must be empty if there's no data file to be rewritten + /// - Delete files to add must be empty if there's no delete file to be rewritten + void ValidateReplacedAndAddedFiles(); + + /// \brief Tracks which data files are being replaced, for conflict detection. + DataFileSet replaced_data_files_; + + /// \brief Optional snapshot ID boundary for validation scope. + std::optional starting_snapshot_id_; +}; + +} // namespace iceberg