Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public String showLogFileCommits(
defaultValue = "false") final boolean headerOnly)
throws IOException {

HoodieStorage storage = HoodieCLI.getTableMetaClient().getStorage();
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
HoodieStorage storage = metaClient.getStorage();
List<String> logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(
storage, new StoragePath(logFilePathPattern)).stream()
.map(status -> status.getPath().toString()).collect(Collectors.toList());
Expand All @@ -116,7 +117,7 @@ storage, new StoragePath(logFilePathPattern)).stream()
fileName = path.getName();
}
HoodieSchema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, path);
try (Reader reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(path), writerSchema)) {
try (Reader reader = HoodieLogFormat.newReader(storage, metaClient, new HoodieLogFile(path), writerSchema)) {

// read the avro blocks
while (reader.hasNext()) {
Expand Down Expand Up @@ -266,7 +267,7 @@ storage, new StoragePath(logFilePathPattern)).stream()
HoodieSchema writerSchema = TableSchemaResolver.readSchemaFromLogFile(
client.getStorage(), new StoragePath(logFile));
try (HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(storage, new HoodieLogFile(new StoragePath(logFile)), writerSchema)) {
HoodieLogFormat.newReader(storage, client, new HoodieLogFile(new StoragePath(logFile)), writerSchema)) {
// read the avro blocks
while (reader.hasNext()) {
HoodieLogBlock n = reader.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public class HoodieWriteConfig extends HoodieConfig {
.withValidValues(
String.valueOf(HoodieTableVersion.SIX.versionCode()),
String.valueOf(HoodieTableVersion.EIGHT.versionCode()),
String.valueOf(HoodieTableVersion.NINE.versionCode())
String.valueOf(HoodieTableVersion.NINE.versionCode()),
String.valueOf(HoodieTableVersion.TEN.versionCode())
)
.sinceVersion("1.0.0")
.withDocumentation("The table version this writer is storing the table in. This should match the current table version.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.CommonClientUtils;

import java.util.Iterator;

Expand All @@ -33,7 +34,7 @@ public HoodieAppendHandle<T, I, K, O> create(final HoodieWriteConfig hoodieConfi
final String fileIdPrefix, final TaskContextSupplier sparkTaskContextSupplier) {

String fileId = getNextFileId(fileIdPrefix);
if (hoodieTable.getMetaClient().getTableConfig().isLSMTreeStorageLayout()) {
if (CommonClientUtils.shouldWriteNativeLogFormat(hoodieConfig)) {
return new HoodieNativeLogAppendHandle<>(hoodieConfig, commitTime, hoodieTable, partitionPath,
fileId, sparkTaskContextSupplier);
}
Expand All @@ -45,7 +46,7 @@ public HoodieAppendHandle<T, I, K, O> create(final HoodieWriteConfig hoodieConfi
final HoodieTable<T, I, K, O> hoodieTable, final String partitionPath,
final String fileId, final Iterator<HoodieRecord<T>> recordItr,
final TaskContextSupplier sparkTaskContextSupplier) {
if (hoodieTable.getMetaClient().getTableConfig().isLSMTreeStorageLayout()) {
if (CommonClientUtils.shouldWriteNativeLogFormat(hoodieConfig)) {
return new HoodieNativeLogAppendHandle<>(hoodieConfig, commitTime, hoodieTable, partitionPath,
fileId, recordItr, sparkTaskContextSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
import org.apache.hudi.common.table.read.BufferedRecord;
Expand Down Expand Up @@ -181,11 +182,12 @@ private void initRecordTypeAndCdcLogger(HoodieRecord.HoodieRecordType enginRecor
}

private HoodieCDCLogWriter<?> createCDCLogWriter() {
if (HoodieCDCLogWriterFactory.shouldWriteNativeCDCLogs(hoodieTable.getMetaClient().getTableConfig())) {
HoodieTableConfig tableConfig = hoodieTable.getMetaClient().getTableConfig();
if (HoodieCDCLogWriterFactory.shouldWriteNativeCDCLogs(config, tableConfig)) {
return new HoodieNativeCDCLogger(
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
tableConfig,
partitionPath,
storage,
getWriterSchema(),
Expand All @@ -200,7 +202,7 @@ private HoodieCDCLogWriter<?> createCDCLogWriter() {
return new HoodieCDCLogger(
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
tableConfig,
partitionPath,
storage,
getWriterSchema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public List<WriteStatus> close() {

if (isSecondaryIndexStatsStreamingWritesEnabled && !statuses.isEmpty()) {
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(partitionPath, fileId, getReadFileSlice(),
statuses.stream().map(status -> status.getStat().getPath()).collect(Collectors.toList()),
SecondaryIndexStreamingTracker.collectNewLogFilesForSecondaryIndexStats(statuses),
statuses.get(statuses.size() - 1), hoodieTable, secondaryIndexDefns, config, instantTime, writeSchemaWithMetaFields);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.LogFileCreationCallback;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.CommonClientUtils;

import org.apache.avro.generic.IndexedRecord;

Expand All @@ -53,7 +55,7 @@ static <T, I, K, O> HoodieCDCLogWriter<IndexedRecord> createAvroCDCLogWriter(
TaskContextSupplier taskContextSupplier,
Supplier<HoodieLogFormat.Writer> logWriterSupplier) {
HoodieTableConfig tableConfig = hoodieTable.getMetaClient().getTableConfig();
if (shouldWriteNativeCDCLogs(tableConfig)) {
if (shouldWriteNativeCDCLogs(config, tableConfig)) {
return new HoodieAvroNativeCDCLogger(
instantTime,
config,
Expand All @@ -78,7 +80,10 @@ static <T, I, K, O> HoodieCDCLogWriter<IndexedRecord> createAvroCDCLogWriter(
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}

static boolean shouldWriteNativeCDCLogs(HoodieTableConfig tableConfig) {
return tableConfig.isLSMTreeStorageLayout();
static boolean shouldWriteNativeCDCLogs(HoodieWriteConfig writeConfig, HoodieTableConfig tableConfig) {
// Native CDC uses the table's base-file writer; keep Lance CDC logs on Avro until native CDC
// Lance writer semantics are enabled consistently across engines.
return tableConfig.getBaseFileFormat() != HoodieFileFormat.LANCE
&& CommonClientUtils.shouldWriteNativeLogFormat(writeConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ private HoodieNativeLogAppendHandle(HoodieWriteConfig config, String instantTime
@Override
protected void createLogWriterForAppend(String instantTime, Option<FileSlice> fileSliceOpt) {
try {
String[] orderingFields = ConfigUtils.getOrderingFields(recordProperties);
this.writer = new HoodieNativeLogFormatWriter(
storage.getDefaultBufferSize(),
storage,
Expand All @@ -105,7 +106,7 @@ protected void createLogWriterForAppend(String instantTime, Option<FileSlice> fi
writeSchemaWithMetaFields,
taskContextSupplier,
hoodieTable.getReaderContextFactoryForWrite().getContext().getRecordContext(),
Arrays.stream(ConfigUtils.getOrderingFields(recordProperties)).collect(Collectors.toList()));
orderingFields == null ? Collections.emptyList() : Arrays.asList(orderingFields));
} catch (IOException e) {
throw new HoodieException("Creating native log writer with fileId: " + fileId + ", "
+ "delta commit time: " + instantTime + " error", e);
Expand Down Expand Up @@ -182,8 +183,17 @@ public boolean canWrite(HoodieRecord record) {
@Override
protected void updateLogFiles(HoodieDeltaWriteStat stat) {
for (AppendResult appendResult : writer.getLastAppendResults()) {
if (!stat.getLogFiles().contains(appendResult.logFile().getFileName())) {
stat.addLogFiles(appendResult.logFile().getFileName());
String fileName = appendResult.logFile().getFileName();
if (!stat.getLogFiles().contains(fileName)) {
stat.addLogFiles(fileName);
}
// A native flush can produce a separate delete file that is not captured by stat.getPath(). Record it (with
// its size) so metadata table file listing and marker reconciliation account for it.
if (FSUtils.isNativeDeleteLogFile(fileName)) {
String deleteFilePath = partitionPath.isEmpty()
? new StoragePath(fileName).toString()
: new StoragePath(partitionPath, fileName).toString();
stat.addDeleteFileStat(deleteFilePath, appendResult.size());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.hudi.common.table.log.NativeLogFooterMetadata;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.BufferedRecords;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.OrderingValues;
import org.apache.hudi.common.util.collection.ArrayComparable;
Expand Down Expand Up @@ -166,13 +164,7 @@ public void appendDeleteRecord(HoodieRecord record, HoodieSchema recordSchema, S
recordSchema, recordProperties, orderingFieldNames.toArray(new String[0]));
Object deleteEngineRecord = recordContext.constructEngineRecord(
deleteLogSchema, createDeleteLogFieldValues(recordKey, orderingValue));
// Keep isDelete=false here so RecordContext constructs a data-bearing HoodieRecord
// with the native delete-log row. The delete semantics come from the delete log file
// itself; setting isDelete=true would create a HoodieEmptyRecord and lose the row.
BufferedRecord deleteRecord = BufferedRecords.fromEngineRecord(
deleteEngineRecord, deleteLogSchema, recordContext, orderingValue, recordKey, false);
deleteFileWriter.write(recordKey, recordContext.constructHoodieRecord(deleteRecord, record.getPartitionPath()),
deleteLogSchema, recordProperties);
deleteFileWriter.writeRow(recordKey, deleteEngineRecord);
}

private Object[] createDeleteLogFieldValues(String recordKey, Comparable orderingValue) {
Expand Down Expand Up @@ -213,6 +205,9 @@ private void addFooterMetadata(Map<HeaderMetadataType, String> header) throws IO
if (dataFileWriter != null) {
dataFileWriter.addFooterMetadata(NativeLogFooterMetadata.toFooterMetadata(header));
}
if (deleteFileWriter != null) {
deleteFileWriter.addFooterMetadata(NativeLogFooterMetadata.toFooterMetadata(header));
}
}

private void ensureDataFileWriter(HoodieSchema recordSchema) throws IOException {
Expand All @@ -230,9 +225,13 @@ private void ensureDeleteFileWriter() throws IOException {
if (deleteFileWriter == null) {
deleteLogFile = createNativeLogFile(currentAppendVersion, DELETE_LOG_EXTENSION);
deleteLogSchema = HoodieSchemas.createDeleteLogSchema(tableSchema, orderingFieldNames);
// The delete records are built through recordContext#constructEngineRecord (see #appendDeleteRecord), so the
// writer must match the record context's engine type rather than the merger's record type: on Spark executors
// the reader context for write degrades to Avro (no engine context available), and using the merger record
// type here (e.g. SPARK) would create an engine-native writer that fails on the Avro delete records.
deleteFileWriter = HoodieFileWriterFactory.getFileWriter(
instantTime, deleteLogFile.getPath(), storage, writeConfig, deleteLogSchema, taskContextSupplier,
writeConfig.getRecordMerger().getRecordType());
recordContext.getRecordType());
}
}

Expand All @@ -245,7 +244,8 @@ private void ensureAppendVersion() throws IOException {
private void closeFileWriters() throws IOException {
if (dataFileWriter != null) {
dataFileWriter.close();
lastDataFileFormatMetadata = Option.ofNullable(dataFileWriter.getFileFormatMetadata());
lastDataFileFormatMetadata = writeConfig.isMetadataColumnStatsIndexEnabled()
? Option.ofNullable(dataFileWriter.getFileFormatMetadata()) : Option.empty();
dataFileWriter = null;
}
if (deleteFileWriter != null) {
Expand All @@ -272,10 +272,12 @@ private AppendResult completeAppend(int appendVersion) {
return new AppendResult(logFile, 0, 0);
}

long totalSize = lastAppendResults.stream().mapToLong(AppendResult::size).sum();
// The returned result represents the primary (data) file recorded as the write stat's path. Its size must be the
// data file's own size only: a delete file flushed in the same shot is a separate physical file whose size is
// tracked independently (HoodieDeltaWriteStat#getDeleteFileStats).
AppendResult firstResult = lastAppendResults.get(0);
this.logFile = firstResult.logFile();
return new AppendResult(firstResult.logFile(), 0, totalSize);
return firstResult;
}

private int nextAvailableVersion() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieKey;
Expand All @@ -30,6 +31,7 @@
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.BaseKeyGenerator;
Expand All @@ -40,10 +42,13 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -127,6 +132,29 @@ static void trackSecondaryIndexStats(String partitionPath, String fileId, Option
});
}

/**
* Collects the paths of the new log files (including native delete files) written by an append handle,
* so that they can be added to the file slice when generating secondary index stats.
*
* @param statuses Write statuses produced by the append handle
* @return De-duplicated list of new log file paths in insertion order
*/
@VisibleForTesting
static List<String> collectNewLogFilesForSecondaryIndexStats(List<WriteStatus> statuses) {
Set<String> newLogFiles = new LinkedHashSet<>();
for (WriteStatus status : statuses) {
HoodieDeltaWriteStat stat = (HoodieDeltaWriteStat) status.getStat();
if (stat.getPath() != null) {
newLogFiles.add(stat.getPath());
}
Map<String, Long> deleteFileStats = stat.getDeleteFileStats();
if (deleteFileStats != null) {
newLogFiles.addAll(deleteFileStats.keySet());
}
}
return new ArrayList<>(newLogFiles);
}

/**
* Utility function used by HoodieCreateHandle to generate secondary index stats for the corresponding hoodie record.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeExpressionIndexPartiti
partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(entry.getValue().getBaseFile().get().getPath(), entry.getValue().getBaseFile().get().getFileSize())));
}
entry.getValue().getLogFiles()
.filter(hoodieLogFile -> !FSUtils.isNativeDeleteLogFile(hoodieLogFile.getFileName()))
.forEach(hoodieLogFile -> partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(), Pair.of(hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize()))));
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
Expand Down Expand Up @@ -611,7 +612,10 @@ public static Set<String> getFilesToFetchColumnStats(List<HoodieWriteStat> parti
.flatMap(fileSlice -> Stream.concat(
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
.filter(e -> Objects.nonNull(e) && !filesWithColumnStats.contains(e) && !fileGroupIdsToReplace.contains(e))
.filter(e -> Objects.nonNull(e)
&& !FSUtils.isNativeDeleteLogFile(e)
&& !filesWithColumnStats.contains(e)
&& !fileGroupIdsToReplace.contains(e))
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieIndexDefinition;
Expand Down Expand Up @@ -818,10 +819,20 @@ void reconcileAgainstMarkers(HoodieEngineContext context,
.filter(Objects::nonNull)
.flatMap(cdcStat -> cdcStat.keySet().stream())
.collect(Collectors.toSet());
// A native log append can flush both a data file and a delete file in one shot, but the write stat only records
// the data file as its path (delete files are tracked separately in HoodieDeltaWriteStat#getDeleteFileStats).
// Collect the delete files here so their CREATE markers are not mistaken for partially written files and deleted.
Set<String> validDeletePaths = stats.stream()
.filter(stat -> stat instanceof HoodieDeltaWriteStat)
.map(stat -> ((HoodieDeltaWriteStat) stat).getDeleteFileStats())
.filter(Objects::nonNull)
.flatMap(deleteFileStats -> deleteFileStats.keySet().stream())
.collect(Collectors.toSet());

// Contains list of partially created files. These needs to be cleaned up.
invalidDataPaths.removeAll(validDataPaths);
invalidDataPaths.removeAll(validCdcDataPaths);
invalidDataPaths.removeAll(validDeletePaths);

if (!invalidDataPaths.isEmpty()) {
if (shouldFailOnDuplicateDataFileDetection) {
Expand Down
Loading
Loading