Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ constexpr std::string_view kSequenceNumber = "sequence-number";
constexpr std::string_view kTimestampMs = "timestamp-ms";
constexpr std::string_view kManifestList = "manifest-list";
constexpr std::string_view kSummary = "summary";
constexpr std::string_view kFirstRowId = "first-row-id";
constexpr std::string_view kAddedRows = "added-rows";
constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep";
constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms";
constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms";
Expand Down Expand Up @@ -459,9 +461,21 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
json[kManifestList] = snapshot.manifest_list;
// If there is an operation, write the summary map
if (snapshot.Operation().has_value()) {
json[kSummary] = snapshot.summary;
nlohmann::json summary_json;
for (const auto& [key, value] : snapshot.summary) {
if (key == SnapshotSummaryFields::kFirstRowId ||
key == SnapshotSummaryFields::kAddedRows) {
continue;
}
summary_json[key] = value;
}
json[kSummary] = std::move(summary_json);
}
SetOptionalField(json, kSchemaId, snapshot.schema_id);
SetOptionalField(json, kFirstRowId, snapshot.first_row_id);
if (snapshot.first_row_id.has_value()) {
SetOptionalField(json, kAddedRows, snapshot.added_rows);
}
return json;
}

Expand Down Expand Up @@ -808,12 +822,54 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
}
}

auto parse_summary_int64 =
Comment thread
wgtmac marked this conversation as resolved.
Outdated
[&summary](const std::string& key) -> Result<std::optional<int64_t>> {
auto it = summary.find(key);
if (it == summary.end()) {
return std::nullopt;
}
ICEBERG_ASSIGN_OR_RAISE(auto value, StringUtils::ParseNumber<int64_t>(it->second));
return value;
};

ICEBERG_ASSIGN_OR_RAISE(auto first_row_id,
GetJsonValueOptional<int64_t>(json, kFirstRowId));
if (!first_row_id.has_value()) {
ICEBERG_ASSIGN_OR_RAISE(first_row_id,
parse_summary_int64(SnapshotSummaryFields::kFirstRowId));
}

ICEBERG_ASSIGN_OR_RAISE(auto added_rows,
GetJsonValueOptional<int64_t>(json, kAddedRows));
if (!added_rows.has_value()) {
ICEBERG_ASSIGN_OR_RAISE(added_rows,
parse_summary_int64(SnapshotSummaryFields::kAddedRows));
}

summary.erase(SnapshotSummaryFields::kFirstRowId);
Comment thread
wgtmac marked this conversation as resolved.
Outdated
summary.erase(SnapshotSummaryFields::kAddedRows);

if (first_row_id.has_value() && first_row_id.value() < 0) {
return JsonParseError("Invalid first-row-id (cannot be negative): {}",
first_row_id.value());
}
if (added_rows.has_value() && added_rows.value() < 0) {
return JsonParseError("Invalid added-rows (cannot be negative): {}",
added_rows.value());
}
if (first_row_id.has_value() && !added_rows.has_value()) {
return JsonParseError("Invalid added-rows (required when first-row-id is set): null");
}
if (!first_row_id.has_value()) {
added_rows = std::nullopt;
}

ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional<int32_t>(json, kSchemaId));

return std::make_unique<Snapshot>(
snapshot_id, parent_snapshot_id,
sequence_number.value_or(TableMetadata::kInitialSequenceNumber), timestamp_ms,
manifest_list, std::move(summary), schema_id);
manifest_list, std::move(summary), schema_id, first_row_id, added_rows);
}

nlohmann::json ToJson(const BlobMetadata& blob_metadata) {
Expand Down
19 changes: 12 additions & 7 deletions src/iceberg/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ std::optional<std::string_view> Snapshot::Operation() const {
}

Result<std::optional<int64_t>> Snapshot::FirstRowId() const {
if (first_row_id.has_value()) {
return first_row_id;
}

auto it = summary.find(SnapshotSummaryFields::kFirstRowId);
if (it == summary.end()) {
return std::nullopt;
Expand All @@ -171,6 +175,10 @@ Result<std::optional<int64_t>> Snapshot::FirstRowId() const {
}

Result<std::optional<int64_t>> Snapshot::AddedRows() const {
if (added_rows.has_value()) {
return added_rows;
}

auto it = summary.find(SnapshotSummaryFields::kAddedRows);
if (it == summary.end()) {
return std::nullopt;
Expand All @@ -186,7 +194,8 @@ bool Snapshot::Equals(const Snapshot& other) const {
return snapshot_id == other.snapshot_id &&
parent_snapshot_id == other.parent_snapshot_id &&
sequence_number == other.sequence_number && timestamp_ms == other.timestamp_ms &&
schema_id == other.schema_id;
schema_id == other.schema_id && first_row_id == other.first_row_id &&

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks odd if two snapshots with different row lineage attributes are deemed identical. I would rather think this is an oversight from apache/iceberg#11948. Both iceberg-rust and iceberg-python enforce all fields to be equal so I think it is fine to include them here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Opened apache/iceberg#17015 to fix at Java side.

added_rows == other.added_rows;
}

Result<std::unique_ptr<Snapshot>> Snapshot::Make(
Expand All @@ -203,12 +212,6 @@ Result<std::unique_ptr<Snapshot>> Snapshot::Make(
ICEBERG_PRECHECK(!first_row_id.has_value() || added_rows.has_value(),
"Missing added-rows when first-row-id is set");
summary[SnapshotSummaryFields::kOperation] = operation;
if (first_row_id.has_value()) {
summary[SnapshotSummaryFields::kFirstRowId] = std::to_string(first_row_id.value());
}
if (added_rows.has_value()) {
summary[SnapshotSummaryFields::kAddedRows] = std::to_string(added_rows.value());
}
return std::make_unique<Snapshot>(Snapshot{
.snapshot_id = snapshot_id,
.parent_snapshot_id = parent_snapshot_id,
Expand All @@ -217,6 +220,8 @@ Result<std::unique_ptr<Snapshot>> Snapshot::Make(
.manifest_list = std::move(manifest_list),
.summary = std::move(summary),
.schema_id = schema_id,
.first_row_id = first_row_id,
.added_rows = first_row_id.has_value() ? added_rows : std::nullopt,
});
}

Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@ struct ICEBERG_EXPORT Snapshot {
std::unordered_map<std::string, std::string> summary;
/// ID of the table's current schema when the snapshot was created.
std::optional<int32_t> schema_id;
/// The row-id of the first newly added row in this snapshot.
std::optional<int64_t> first_row_id;
/// The upper bound of rows with assigned row IDs in this snapshot.
std::optional<int64_t> added_rows;

/// \brief Create a new Snapshot instance with validation on the inputs.
static Result<std::unique_ptr<Snapshot>> Make(
Expand Down
85 changes: 77 additions & 8 deletions src/iceberg/test/json_serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,73 @@ TEST(JsonInternalTest, Snapshot) {
TestJsonConversion(snapshot, expected_json);
}

TEST(JsonInternalTest, SnapshotRowLineageSerializesTopLevelFields) {
ICEBERG_UNWRAP_OR_FAIL(
auto snapshot,
Snapshot::Make(/*sequence_number=*/99, /*snapshot_id=*/1234567890,
/*parent_snapshot_id=*/9876543210,
TimePointMsFromUnixMs(1234567890123), DataOperation::kAppend,
{{SnapshotSummaryFields::kAddedDataFiles, "50"}},
/*schema_id=*/42, "/path/to/manifest_list",
/*first_row_id=*/100, /*added_rows=*/25));

auto json = ToJson(*snapshot);
EXPECT_EQ(json["first-row-id"], 100);
EXPECT_EQ(json["added-rows"], 25);
EXPECT_FALSE(json["summary"].contains("first-row-id"));
EXPECT_FALSE(json["summary"].contains("added-rows"));
}

TEST(JsonInternalTest, SnapshotFromJsonReadsTopLevelRowLineageFields) {
nlohmann::json snapshot_json =
R"({"snapshot-id":1234567890,
"parent-snapshot-id":9876543210,
"sequence-number":99,
"timestamp-ms":1234567890123,
"manifest-list":"/path/to/manifest_list",
"summary":{
"operation":"append",
"added-data-files":"50"
},
"schema-id":42,
"first-row-id":100,
"added-rows":25})"_json;

ICEBERG_UNWRAP_OR_FAIL(auto snapshot, SnapshotFromJson(snapshot_json));
ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, snapshot->FirstRowId());
ICEBERG_UNWRAP_OR_FAIL(auto added_rows, snapshot->AddedRows());
EXPECT_EQ(first_row_id, 100);
EXPECT_EQ(added_rows, 25);

auto json = ToJson(*snapshot);
EXPECT_EQ(json["first-row-id"], 100);
EXPECT_EQ(json["added-rows"], 25);
EXPECT_FALSE(json["summary"].contains("first-row-id"));
EXPECT_FALSE(json["summary"].contains("added-rows"));
}

TEST(JsonInternalTest, SnapshotFromJsonReadsLegacySummaryOnlyRowLineageFields) {
nlohmann::json snapshot_json =
R"({"snapshot-id":1234567890,
"parent-snapshot-id":9876543210,
"sequence-number":99,
"timestamp-ms":1234567890123,
"manifest-list":"/path/to/manifest_list",
"summary":{
"operation":"append",
"added-data-files":"50",
"first-row-id":"100",
"added-rows":"25"
},
"schema-id":42})"_json;

ICEBERG_UNWRAP_OR_FAIL(auto snapshot, SnapshotFromJson(snapshot_json));
ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, snapshot->FirstRowId());
ICEBERG_UNWRAP_OR_FAIL(auto added_rows, snapshot->AddedRows());
EXPECT_EQ(first_row_id, 100);
EXPECT_EQ(added_rows, 25);
}

// FIXME: disable it for now since Iceberg Spark plugin generates
// custom summary keys.
TEST(JsonInternalTest, DISABLED_SnapshotFromJsonWithInvalidSummary) {
Expand Down Expand Up @@ -480,19 +547,21 @@ TEST(JsonInternalTest, TableUpdateSetDefaultSortOrder) {
}

TEST(JsonInternalTest, TableUpdateAddSnapshot) {
auto snapshot = std::make_shared<Snapshot>(
Snapshot{.snapshot_id = 123456789,
.parent_snapshot_id = 987654321,
.sequence_number = 5,
.timestamp_ms = TimePointMsFromUnixMs(1234567890000),
.manifest_list = "/path/to/manifest-list.avro",
.summary = {{SnapshotSummaryFields::kOperation, DataOperation::kAppend}},
.schema_id = 1});
ICEBERG_UNWRAP_OR_FAIL(
auto snapshot_unique,
Snapshot::Make(/*sequence_number=*/5, /*snapshot_id=*/123456789,
/*parent_snapshot_id=*/987654321,
TimePointMsFromUnixMs(1234567890000), DataOperation::kAppend,
/*summary=*/{}, /*schema_id=*/1, "/path/to/manifest-list.avro",
/*first_row_id=*/100, /*added_rows=*/25));
std::shared_ptr<Snapshot> snapshot(std::move(snapshot_unique));
table::AddSnapshot update(snapshot);

ICEBERG_UNWRAP_OR_FAIL(auto json, ToJson(update));
EXPECT_EQ(json["action"], "add-snapshot");
EXPECT_TRUE(json.contains("snapshot"));
EXPECT_EQ(json["snapshot"]["first-row-id"], 100);
EXPECT_EQ(json["snapshot"]["added-rows"], 25);

auto parsed = TableUpdateFromJson(json);
ASSERT_THAT(parsed, IsOk());
Expand Down
Loading