Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ private EndpointGroup apiGroup() {
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes",
tableController::getOptimizingProcesses);
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-types",
tableController::getOptimizingTypes);
"/catalogs/{catalog}/dbs/{db}/tables/{table}/process-types",
tableController::getProcessTypes);
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes/{processId}/tasks",
tableController::getOptimizingProcessTasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.IcebergActions;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CommitMetaProducer;
Expand Down Expand Up @@ -67,12 +68,14 @@
import org.apache.amoro.table.descriptor.OptimizingTaskInfo;
import org.apache.amoro.table.descriptor.PartitionBaseInfo;
import org.apache.amoro.table.descriptor.PartitionFileBaseInfo;
import org.apache.amoro.table.descriptor.ProcessCategory;
import org.apache.amoro.table.descriptor.ServerTableMeta;
import org.apache.amoro.table.descriptor.TableSummary;
import org.apache.amoro.table.descriptor.TagOrBranchInfo;
import org.apache.amoro.utils.MixedDataFiles;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.FileScanTask;
Expand Down Expand Up @@ -118,6 +121,19 @@ public class MixedAndIcebergTableDescriptor extends PersistentBase

private static final Logger LOG = LoggerFactory.getLogger(MixedAndIcebergTableDescriptor.class);

private static final List<String> OPTIMIZING_TYPE_LIST =
Arrays.stream(OptimizingType.values()).map(Enum::name).collect(Collectors.toList());

private static final List<String> CLEANUP_TYPE_LIST =
ImmutableList.of(
IcebergActions.EXPIRE_SNAPSHOTS.getName(),
IcebergActions.CLEAN_ORPHAN.getName(),
IcebergActions.CLEAN_DANGLING_DELETE.getName(),
IcebergActions.EXPIRE_DATA.getName());

private static final List<String> PROFILING_TYPE_LIST =
ImmutableList.of(IcebergActions.AUTO_CREATE_TAGS.getName());

private ExecutorService executorService;

@Override
Expand Down Expand Up @@ -655,7 +671,12 @@ public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {

@Override
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, int offset) {
AmoroTable<?> amoroTable,
String type,
String processCategory,
ProcessStatus status,
int limit,
int offset) {
TableIdentifier tableIdentifier = amoroTable.id();
ServerTableIdentifier identifier =
getAs(
Expand All @@ -671,12 +692,35 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
int total = 0;
// page helper is 1-based
int pageNumber = (offset / limit) + 1;

// Only apply category filtering when type is not specified
final List<String> includeTypes;

if (StringUtils.isBlank(type)) {
if (processCategory == null) {
// No category specified: return empty results
return Pair.of(Collections.emptyList(), 0);
} else {
List<String> categoryTypes =
FormatTableDescriptor.resolveCategoryTypes(
processCategory, OPTIMIZING_TYPE_LIST, CLEANUP_TYPE_LIST, PROFILING_TYPE_LIST);
if (categoryTypes.isEmpty()) {
// Unknown category: return empty results to avoid exposing all processes
return Pair.of(Collections.emptyList(), 0);
}

includeTypes = categoryTypes;
}
} else {
includeTypes = null;
}

List<TableProcessMeta> processMetaList = Collections.emptyList();
try (Page<?> ignored = PageHelper.startPage(pageNumber, limit, true)) {
processMetaList =
getAs(
TableProcessMapper.class,
mapper -> mapper.listProcessMeta(identifier.getId(), type, status));
mapper -> mapper.listProcessMeta(identifier.getId(), type, includeTypes, status));
PageInfo<TableProcessMeta> pageInfo = new PageInfo<>(processMetaList);
total = (int) pageInfo.getTotal();
LOG.info(
Expand Down Expand Up @@ -716,6 +760,31 @@ public Map<String, String> getTableOptimizingTypes(AmoroTable<?> amoroTable) {
return types;
}

@Override
public Map<String, String> getTableProcessTypes(
AmoroTable<?> amoroTable, String processCategory) {
if (ProcessCategory.OPTIMIZING.getName().equalsIgnoreCase(processCategory)) {
return getTableOptimizingTypes(amoroTable);
}

if (ProcessCategory.CLEANUP.getName().equalsIgnoreCase(processCategory)) {
Map<String, String> types = Maps.newHashMap();
types.put(IcebergActions.EXPIRE_SNAPSHOTS.getName(), "Expire Snapshots");
types.put(IcebergActions.CLEAN_ORPHAN.getName(), "Clean Orphan Files");
types.put(IcebergActions.CLEAN_DANGLING_DELETE.getName(), "Clean Dangling Delete Files");
types.put(IcebergActions.EXPIRE_DATA.getName(), "Expire Data");
return types;
}

if (ProcessCategory.PROFILING.getName().equalsIgnoreCase(processCategory)) {
Map<String, String> types = Maps.newHashMap();
types.put(IcebergActions.AUTO_CREATE_TAGS.getName(), "Auto Create Tags");
return types;
}

return Collections.emptyMap();
}

@Override
public List<OptimizingTaskInfo> getOptimizingTaskInfos(
AmoroTable<?> amoroTable, String processId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,16 @@ public List<ConsumerInfo> getTableConsumersInfos(TableIdentifier tableIdentifier
}

public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
TableIdentifier tableIdentifier, String type, ProcessStatus status, int limit, int offset) {
TableIdentifier tableIdentifier,
String type,
String processCategory,
ProcessStatus status,
int limit,
int offset) {
AmoroTable<?> amoroTable = loadTable(tableIdentifier);
FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format());
return formatTableDescriptor.getOptimizingProcessesInfo(
amoroTable, type, status, limit, offset);
amoroTable, type, processCategory, status, limit, offset);
}

public List<OptimizingTaskInfo> getOptimizingProcessTaskInfos(
Expand All @@ -143,10 +148,11 @@ public List<OptimizingTaskInfo> getOptimizingProcessTaskInfos(
return formatTableDescriptor.getOptimizingTaskInfos(amoroTable, processId);
}

public Map<String, String> getTableOptimizingTypes(TableIdentifier tableIdentifier) {
public Map<String, String> getTableProcessTypes(
TableIdentifier tableIdentifier, String processCategory) {
AmoroTable<?> amoroTable = loadTable(tableIdentifier);
FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format());
return formatTableDescriptor.getTableOptimizingTypes(amoroTable);
return formatTableDescriptor.getTableProcessTypes(amoroTable, processCategory);
}

private AmoroTable<?> loadTable(TableIdentifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.amoro.table.descriptor.OptimizingTaskInfo;
import org.apache.amoro.table.descriptor.PartitionBaseInfo;
import org.apache.amoro.table.descriptor.PartitionFileBaseInfo;
import org.apache.amoro.table.descriptor.ProcessCategory;
import org.apache.amoro.table.descriptor.ServerTableMeta;
import org.apache.amoro.table.descriptor.TableSummary;
import org.apache.amoro.table.descriptor.TagOrBranchInfo;
Expand Down Expand Up @@ -332,6 +333,11 @@ public void getOptimizingProcesses(Context ctx) {
type = null;
}

String processCategory = ctx.queryParam("processCategory");
if (StringUtils.isBlank(processCategory)) {
processCategory = null;
}

String status = ctx.queryParam("status");
Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1);
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
Expand All @@ -346,21 +352,32 @@ public void getOptimizingProcesses(Context ctx) {
StringUtils.isBlank(status) ? null : ProcessStatus.valueOf(status);
Pair<List<OptimizingProcessInfo>, Integer> optimizingProcessesInfo =
tableDescriptor.getOptimizingProcessesInfo(
tableIdentifier.buildTableIdentifier(), type, processStatus, limit, offset);
tableIdentifier.buildTableIdentifier(),
type,
processCategory,
processStatus,
limit,
offset);
List<OptimizingProcessInfo> result = optimizingProcessesInfo.getLeft();
int total = optimizingProcessesInfo.getRight();

ctx.json(OkResponse.of(PageResult.of(result, total)));
}

public void getOptimizingTypes(Context ctx) {
public void getProcessTypes(Context ctx) {
String catalog = ctx.pathParam("catalog");
String db = ctx.pathParam("db");
String table = ctx.pathParam("table");
String processCategory =
ctx.queryParamAsClass("processCategory", String.class).getOrDefault(null);
if (StringUtils.isBlank(processCategory)) {
processCategory = ProcessCategory.OPTIMIZING.getName();
}
TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table);

Map<String, String> values =
tableDescriptor.getTableOptimizingTypes(tableIdentifier.buildTableIdentifier());
tableDescriptor.getTableProcessTypes(
tableIdentifier.buildTableIdentifier(), processCategory);
ctx.json(OkResponse.of(values));
}

Expand Down Expand Up @@ -671,7 +688,7 @@ public void getTableConsumerInfos(Context ctx) {
}

/**
* cancel the running optimizing process of one certain table.
* Cancel the running process of one certain table.
*
* @param ctx - context for handling the request and response
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ void updateProcess(
+ "create_time, finish_time, fail_message, process_parameters, summary "
+ "FROM table_process WHERE table_id = #{tableId} "
+ " <if test='processType != null'> AND process_type = #{processType}</if>"
+ " <if test='processType == null and includeTypes != null and includeTypes.size() > 0'> AND process_type IN <foreach collection='includeTypes' item='type' open='(' separator=',' close=')'>#{type}</foreach></if>"
+ " <if test='status != null'> AND status = #{status}</if>"
+ " ORDER BY process_id desc"
+ "</script>")
@ResultMap("tableProcessMap")
List<TableProcessMeta> listProcessMeta(
@Param("tableId") long tableId,
@Param("processType") String processType,
@Param("includeTypes") List<String> includeTypes,
@Param("status") ProcessStatus optimizingStatus);

@Select(
Expand Down
Loading
Loading