Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ public interface MemoryReservationManager {
* @param bytesAlreadyReserved the amount of memory that has already been reserved
*/
void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved);

/**
* Mark this manager as highest-priority (e.g. SHOW QUERIES). When operators memory is
* insufficient, allocation will fall back to zero bytes instead of failing.
*/
void setHighestPriority(boolean isHighestPriority);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,9 @@ public boolean isHighestPriority() {

public void setHighestPriority(boolean highestPriority) {
this.highestPriority = highestPriority;
if (memoryReservationManager != null) {
memoryReservationManager.setHighestPriority(highestPriority);
}
}

public boolean isSingleSourcePath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public SessionInfo getSessionInfo() {
return getInstanceContext().getSessionInfo();
}

public boolean isHighestPriority() {
FragmentInstanceContext instanceContext = getInstanceContext();
return instanceContext != null && instanceContext.isHighestPriority();
}

@Override
public void recordScanAggregationFromRawDataCost(long costTimeInNanos) {
if (driverContext != null && driverContext.getFragmentInstanceContext() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public CteScanOperator(
requireNonNull(dataStore, "dataStore is null");
this.operatorContext = operatorContext;
this.sourceId = sourceId;
this.dataReader = new MemoryReader(dataStore, queryId);
this.dataReader = new MemoryReader(dataStore, queryId, operatorContext.isHighestPriority());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.path.IFullPath;
import org.apache.iotdb.commons.queryengine.common.SqlDialect;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.DataNodeMemoryConfig;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
Expand Down Expand Up @@ -118,7 +119,7 @@ public List<PipelineDriverFactory> plan(
context.invalidateParentPlanNodeIdToMemoryEstimator();

// check whether current free memory is enough to execute current query
long estimatedMemorySize = checkMemory(memoryEstimator, instanceContext.getStateMachine());
long estimatedMemorySize = checkMemory(memoryEstimator, instanceContext);

context.addPipelineDriverFactory(root, context.getDriverContext(), estimatedMemorySize);

Expand Down Expand Up @@ -157,7 +158,7 @@ public List<PipelineDriverFactory> plan(
context.invalidateParentPlanNodeIdToMemoryEstimator();

// check whether current free memory is enough to execute current query
checkMemory(memoryEstimator, instanceContext.getStateMachine());
checkMemory(memoryEstimator, instanceContext);

context.addPipelineDriverFactory(root, context.getDriverContext(), 0);

Expand Down Expand Up @@ -193,7 +194,7 @@ private Operator generateOperator(
}

private long checkMemory(
final PipelineMemoryEstimator memoryEstimator, FragmentInstanceStateMachine stateMachine)
final PipelineMemoryEstimator memoryEstimator, FragmentInstanceContext instanceContext)
throws MemoryNotEnoughException {

// if it is disabled, just return
Expand All @@ -206,37 +207,65 @@ private long checkMemory(

QueryRelatedResourceMetricSet.getInstance().updateEstimatedMemory(estimatedMemorySize);

if (OPERATORS_MEMORY_BLOCK.allocate(estimatedMemorySize)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"[ConsumeMemory] consume: {}, current remaining memory: {}",
estimatedMemorySize,
OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes());
}
} else {
long reservedBytes =
allocateOperatorsMemory(estimatedMemorySize, instanceContext.isHighestPriority());
if (reservedBytes < 0) {
throw new MemoryNotEnoughException(
String.format(
"There is not enough memory to execute current fragment instance, "
+ "current remaining free memory is %dB, "
+ "estimated memory usage for current fragment instance is %dB",
OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes(), estimatedMemorySize));
}
stateMachine.addStateChangeListener(
newState -> {
if (newState.isDone()) {
try (SetThreadName fragmentInstanceName =
new SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) {
OPERATORS_MEMORY_BLOCK.release(estimatedMemorySize);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"[ReleaseMemory] release: {}, current remaining memory: {}",
estimatedMemorySize,
OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes());
FragmentInstanceStateMachine stateMachine = instanceContext.getStateMachine();
if (reservedBytes > 0) {
stateMachine.addStateChangeListener(
newState -> {
if (newState.isDone()) {
try (SetThreadName fragmentInstanceName =
new SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) {
OPERATORS_MEMORY_BLOCK.release(reservedBytes);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"[ReleaseMemory] release: {}, current remaining memory: {}",
reservedBytes,
OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes());
}
}
}
}
});
return estimatedMemorySize;
});
}
return reservedBytes;
}

/**
* Try to reserve bytes from the operators memory block.
*
* @return allocated bytes on success ({@code > 0}), {@code 0} if nothing to allocate or
* highest-priority fallback applies, {@code -1} if allocation failed
*/
private long allocateOperatorsMemory(final long memoryInBytes, final boolean isHighestPriority) {
if (memoryInBytes <= 0) {
return 0L;
}
if (OPERATORS_MEMORY_BLOCK.allocate(memoryInBytes)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"[ConsumeMemory] consume: {}, current remaining memory: {}",
memoryInBytes,
OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes());
}
return memoryInBytes;
}
if (isHighestPriority) {
return 0L;
}
return -1L;
}

@TestOnly
long allocateOperatorsMemoryForTest(final long memoryInBytes, final boolean isHighestPriority) {
return allocateOperatorsMemory(memoryInBytes, isHighestPriority);
}

private QueryDataSourceType getQueryDataSourceType(DataDriverContext dataDriverContext) {
Expand Down Expand Up @@ -291,23 +320,19 @@ public synchronized long tryAllocateFreeMemory4Load(final long memoryInBytes) {
}
}

public void reserveFromFreeMemoryForOperators(
public long reserveFromFreeMemoryForOperators(
final long memoryInBytes,
final long reservedBytes,
final String queryId,
final String contextHolder) {
final String contextHolder,
final boolean isHighestPriority)
throws MemoryNotEnoughException {
if (memoryInBytes <= 0) {
throw new IllegalArgumentException(
"Bytes to reserve from free memory for operators should be larger than 0");
}
if (OPERATORS_MEMORY_BLOCK.allocate(memoryInBytes)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"[ConsumeMemory] consume: {}, current remaining memory: {}",
memoryInBytes,
OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes());
}
} else {
long allocated = allocateOperatorsMemory(memoryInBytes, isHighestPriority);
if (allocated < 0) {
throw new MemoryNotEnoughException(
String.format(
"There is not enough memory for Query %s, the contextHolder is %s,"
Expand All @@ -320,6 +345,7 @@ public void reserveFromFreeMemoryForOperators(
reservedBytes,
memoryInBytes));
}
return allocated;
}

public void releaseToFreeMemoryForOperators(final long memoryInBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@
@Override
public void reserveMemoryVirtually(
final long bytesToBeReserved, final long bytesAlreadyReserved) {}

@Override
public void setHighestPriority(boolean isHighestPriority) {}

Check failure on line 53 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8UTq5uyn1O-w91dq6D&open=AZ8UTq5uyn1O-w91dq6D&pullRequest=18052
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
package org.apache.iotdb.db.queryengine.plan.planner.memory;

import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;

import org.apache.tsfile.utils.Pair;

import javax.annotation.concurrent.NotThreadSafe;

import static com.google.common.base.Preconditions.checkState;

@NotThreadSafe
public class NotThreadSafeMemoryReservationManager implements MemoryReservationManager {
// To avoid reserving memory too frequently, we choose to do it in batches. This is the lower
Expand All @@ -39,8 +42,16 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM

private final String contextHolder;

private boolean isHighestPriority;

private long reservedBytesInTotal = 0;

/**
* Bytes logically reserved but not taken from the operators pool due to highest-priority
* fallback.
*/
private long fallbackBytesInTotal = 0;

private long bytesToBeReserved = 0;

private long bytesToBeReleased = 0;
Expand All @@ -50,6 +61,21 @@ public NotThreadSafeMemoryReservationManager(final QueryId queryId, final String
this.contextHolder = contextHolder;
}

@Override
public void setHighestPriority(boolean isHighestPriority) {
this.isHighestPriority = isHighestPriority;
}

@TestOnly
public long getReservedBytesInTotalForTest() {
return reservedBytesInTotal;
}

@TestOnly
public long getFallbackBytesInTotalForTest() {
return fallbackBytesInTotal;
}

@Override
public void reserveMemoryCumulatively(final long size) {
bytesToBeReserved += size;
Expand All @@ -61,61 +87,92 @@ public void reserveMemoryCumulatively(final long size) {
@Override
public void reserveMemoryImmediately() {
if (bytesToBeReserved != 0) {
LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
bytesToBeReserved, reservedBytesInTotal, queryId.getId(), contextHolder);
reservedBytesInTotal += bytesToBeReserved;
long actualReserved =
LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
bytesToBeReserved,
reservedBytesInTotal,
queryId.getId(),
contextHolder,
isHighestPriority);
if (actualReserved == 0) {
fallbackBytesInTotal += bytesToBeReserved;
} else {
reservedBytesInTotal += actualReserved;
}
bytesToBeReserved = 0;
}
}

@Override
public void reserveMemoryImmediately(final long size) {
if (size != 0) {
LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
size, reservedBytesInTotal, queryId.getId(), contextHolder);
reservedBytesInTotal += size;
long actualReserved =
LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
size, reservedBytesInTotal, queryId.getId(), contextHolder, isHighestPriority);
if (actualReserved == 0) {
fallbackBytesInTotal += size;
} else {
reservedBytesInTotal += actualReserved;
}
}
}

@Override
public void releaseMemoryCumulatively(final long size) {
if (size <= 0) {
return;
}
bytesToBeReleased += size;
if (bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) {
long bytesToRelease;
if (bytesToBeReleased <= bytesToBeReserved) {
bytesToBeReserved -= bytesToBeReleased;
} else {
bytesToRelease = bytesToBeReleased - bytesToBeReserved;
bytesToBeReserved = 0;
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
reservedBytesInTotal -= bytesToRelease;
}
releaseBytesImmediately(bytesToBeReleased);
bytesToBeReleased = 0;
}
}

private void releaseBytesImmediately(final long size) {
long poolBytes = deductReleaseAccounting(size);
if (poolBytes > 0) {
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(poolBytes);
}
}

/** Deduct release size from pending reserve, fallback quota, then pool reservation in order. */
private long deductReleaseAccounting(final long size) {
long remaining = size;
if (remaining <= bytesToBeReserved) {
bytesToBeReserved -= remaining;
return 0L;
}
remaining -= bytesToBeReserved;
bytesToBeReserved = 0;

if (remaining <= fallbackBytesInTotal) {
fallbackBytesInTotal -= remaining;
return 0L;
}
remaining -= fallbackBytesInTotal;
fallbackBytesInTotal = 0;

reservedBytesInTotal -= remaining;
checkState(reservedBytesInTotal >= 0, "Released bytes has been larger than reserved!");
return remaining;
}

@Override
public void releaseAllReservedMemory() {
if (reservedBytesInTotal != 0) {
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal);
reservedBytesInTotal = 0;
bytesToBeReserved = 0;
bytesToBeReleased = 0;
}
fallbackBytesInTotal = 0;
bytesToBeReserved = 0;
bytesToBeReleased = 0;
}

@Override
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
if (bytesToBeReserved >= size) {
bytesToBeReserved -= size;
return new Pair<>(size, 0L);
} else {
long releasedBytesInReserved = bytesToBeReserved;
long releasedBytesInTotal = size - bytesToBeReserved;
bytesToBeReserved = 0;
reservedBytesInTotal -= releasedBytesInTotal;
return new Pair<>(releasedBytesInReserved, releasedBytesInTotal);
}
long poolBytes = deductReleaseAccounting(size);
return new Pair<>(size - poolBytes, poolBytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ public synchronized void reserveMemoryVirtually(
final long bytesToBeReserved, final long bytesAlreadyReserved) {
super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
}

@Override
public synchronized void setHighestPriority(boolean isHighestPriority) {
super.setHighestPriority(isHighestPriority);
}
}
Loading
Loading