diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java index b52534b4ec2ed..f0420330652e8 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java @@ -80,4 +80,10 @@ public interface MemoryReservationManager { * @param bytesAlreadyReserved the amount of memory that has already been reserved */ void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved); + + /** + * Mark this manager as highest-priority (e.g. SHOW QUERIES). When operators memory is + * insufficient, allocation will fall back to zero bytes instead of failing. + */ + void setHighestPriority(boolean isHighestPriority); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 81dd89387bf64..98a385d3a389f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -1365,6 +1365,9 @@ public boolean isHighestPriority() { public void setHighestPriority(boolean highestPriority) { this.highestPriority = highestPriority; + if (memoryReservationManager != null) { + memoryReservationManager.setHighestPriority(highestPriority); + } } public boolean isSingleSourcePath() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java index 584cb4fe74c00..0ff922f889436 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java @@ -71,6 +71,11 @@ public SessionInfo getSessionInfo() { return getInstanceContext().getSessionInfo(); } + public boolean isHighestPriority() { + FragmentInstanceContext instanceContext = getInstanceContext(); + return instanceContext != null && instanceContext.isHighestPriority(); + } + @Override public void recordScanAggregationFromRawDataCost(long costTimeInNanos) { if (driverContext != null && driverContext.getFragmentInstanceContext() != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java index c25fe15d93e81..c995ab62bca5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java @@ -59,7 +59,7 @@ public CteScanOperator( requireNonNull(dataStore, "dataStore is null"); this.operatorContext = operatorContext; this.sourceId = sourceId; - this.dataReader = new MemoryReader(dataStore, queryId); + this.dataReader = new MemoryReader(dataStore, queryId, operatorContext.isHighestPriority()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 95f28052c1fd5..cfdd39c15b3ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.path.IFullPath; import org.apache.iotdb.commons.queryengine.common.SqlDialect; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.DataNodeMemoryConfig; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -118,7 +119,7 @@ public List plan( context.invalidateParentPlanNodeIdToMemoryEstimator(); // check whether current free memory is enough to execute current query - long estimatedMemorySize = checkMemory(memoryEstimator, instanceContext.getStateMachine()); + long estimatedMemorySize = checkMemory(memoryEstimator, instanceContext); context.addPipelineDriverFactory(root, context.getDriverContext(), estimatedMemorySize); @@ -157,7 +158,7 @@ public List plan( context.invalidateParentPlanNodeIdToMemoryEstimator(); // check whether current free memory is enough to execute current query - checkMemory(memoryEstimator, instanceContext.getStateMachine()); + checkMemory(memoryEstimator, instanceContext); context.addPipelineDriverFactory(root, context.getDriverContext(), 0); @@ -193,7 +194,7 @@ private Operator generateOperator( } private long checkMemory( - final PipelineMemoryEstimator memoryEstimator, FragmentInstanceStateMachine stateMachine) + final PipelineMemoryEstimator memoryEstimator, FragmentInstanceContext instanceContext) throws MemoryNotEnoughException { // if it is disabled, just return @@ -206,14 +207,9 @@ private long checkMemory( QueryRelatedResourceMetricSet.getInstance().updateEstimatedMemory(estimatedMemorySize); - if (OPERATORS_MEMORY_BLOCK.allocate(estimatedMemorySize)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "[ConsumeMemory] consume: {}, current remaining memory: {}", - estimatedMemorySize, - OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); - } - } else { + long reservedBytes = + allocateOperatorsMemory(estimatedMemorySize, instanceContext.isHighestPriority()); + if (reservedBytes < 0) { throw new MemoryNotEnoughException( String.format( "There is not enough memory to execute current fragment instance, " @@ -221,22 +217,55 @@ private long checkMemory( + "estimated memory usage for current fragment instance is %dB", OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes(), estimatedMemorySize)); } - stateMachine.addStateChangeListener( - newState -> { - if (newState.isDone()) { - try (SetThreadName fragmentInstanceName = - new SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) { - OPERATORS_MEMORY_BLOCK.release(estimatedMemorySize); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "[ReleaseMemory] release: {}, current remaining memory: {}", - estimatedMemorySize, - OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); + FragmentInstanceStateMachine stateMachine = instanceContext.getStateMachine(); + if (reservedBytes > 0) { + stateMachine.addStateChangeListener( + newState -> { + if (newState.isDone()) { + try (SetThreadName fragmentInstanceName = + new SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) { + OPERATORS_MEMORY_BLOCK.release(reservedBytes); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "[ReleaseMemory] release: {}, current remaining memory: {}", + reservedBytes, + OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); + } } } - } - }); - return estimatedMemorySize; + }); + } + return reservedBytes; + } + + /** + * Try to reserve bytes from the operators memory block. + * + * @return allocated bytes on success ({@code > 0}), {@code 0} if nothing to allocate or + * highest-priority fallback applies, {@code -1} if allocation failed + */ + private long allocateOperatorsMemory(final long memoryInBytes, final boolean isHighestPriority) { + if (memoryInBytes <= 0) { + return 0L; + } + if (OPERATORS_MEMORY_BLOCK.allocate(memoryInBytes)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "[ConsumeMemory] consume: {}, current remaining memory: {}", + memoryInBytes, + OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); + } + return memoryInBytes; + } + if (isHighestPriority) { + return 0L; + } + return -1L; + } + + @TestOnly + long allocateOperatorsMemoryForTest(final long memoryInBytes, final boolean isHighestPriority) { + return allocateOperatorsMemory(memoryInBytes, isHighestPriority); } private QueryDataSourceType getQueryDataSourceType(DataDriverContext dataDriverContext) { @@ -291,23 +320,19 @@ public synchronized long tryAllocateFreeMemory4Load(final long memoryInBytes) { } } - public void reserveFromFreeMemoryForOperators( + public long reserveFromFreeMemoryForOperators( final long memoryInBytes, final long reservedBytes, final String queryId, - final String contextHolder) { + final String contextHolder, + final boolean isHighestPriority) + throws MemoryNotEnoughException { if (memoryInBytes <= 0) { throw new IllegalArgumentException( "Bytes to reserve from free memory for operators should be larger than 0"); } - if (OPERATORS_MEMORY_BLOCK.allocate(memoryInBytes)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "[ConsumeMemory] consume: {}, current remaining memory: {}", - memoryInBytes, - OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); - } - } else { + long allocated = allocateOperatorsMemory(memoryInBytes, isHighestPriority); + if (allocated < 0) { throw new MemoryNotEnoughException( String.format( "There is not enough memory for Query %s, the contextHolder is %s," @@ -320,6 +345,7 @@ public void reserveFromFreeMemoryForOperators( reservedBytes, memoryInBytes)); } + return allocated; } public void releaseToFreeMemoryForOperators(final long memoryInBytes) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java index 7cee8034a053d..d1c34e365efbc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java @@ -48,4 +48,7 @@ public Pair releaseMemoryVirtually(final long size) { @Override public void reserveMemoryVirtually( final long bytesToBeReserved, final long bytesAlreadyReserved) {} + + @Override + public void setHighestPriority(boolean isHighestPriority) {} } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java index d156628532c34..71924894c7c96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.memory; import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; @@ -27,6 +28,8 @@ import javax.annotation.concurrent.NotThreadSafe; +import static com.google.common.base.Preconditions.checkState; + @NotThreadSafe public class NotThreadSafeMemoryReservationManager implements MemoryReservationManager { // To avoid reserving memory too frequently, we choose to do it in batches. This is the lower @@ -39,8 +42,16 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM private final String contextHolder; + private boolean isHighestPriority; + private long reservedBytesInTotal = 0; + /** + * Bytes logically reserved but not taken from the operators pool due to highest-priority + * fallback. + */ + private long fallbackBytesInTotal = 0; + private long bytesToBeReserved = 0; private long bytesToBeReleased = 0; @@ -50,6 +61,21 @@ public NotThreadSafeMemoryReservationManager(final QueryId queryId, final String this.contextHolder = contextHolder; } + @Override + public void setHighestPriority(boolean isHighestPriority) { + this.isHighestPriority = isHighestPriority; + } + + @TestOnly + public long getReservedBytesInTotalForTest() { + return reservedBytesInTotal; + } + + @TestOnly + public long getFallbackBytesInTotalForTest() { + return fallbackBytesInTotal; + } + @Override public void reserveMemoryCumulatively(final long size) { bytesToBeReserved += size; @@ -61,9 +87,18 @@ public void reserveMemoryCumulatively(final long size) { @Override public void reserveMemoryImmediately() { if (bytesToBeReserved != 0) { - LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( - bytesToBeReserved, reservedBytesInTotal, queryId.getId(), contextHolder); - reservedBytesInTotal += bytesToBeReserved; + long actualReserved = + LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( + bytesToBeReserved, + reservedBytesInTotal, + queryId.getId(), + contextHolder, + isHighestPriority); + if (actualReserved == 0) { + fallbackBytesInTotal += bytesToBeReserved; + } else { + reservedBytesInTotal += actualReserved; + } bytesToBeReserved = 0; } } @@ -71,51 +106,73 @@ public void reserveMemoryImmediately() { @Override public void reserveMemoryImmediately(final long size) { if (size != 0) { - LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( - size, reservedBytesInTotal, queryId.getId(), contextHolder); - reservedBytesInTotal += size; + long actualReserved = + LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( + size, reservedBytesInTotal, queryId.getId(), contextHolder, isHighestPriority); + if (actualReserved == 0) { + fallbackBytesInTotal += size; + } else { + reservedBytesInTotal += actualReserved; + } } } @Override public void releaseMemoryCumulatively(final long size) { + if (size <= 0) { + return; + } bytesToBeReleased += size; if (bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) { - long bytesToRelease; - if (bytesToBeReleased <= bytesToBeReserved) { - bytesToBeReserved -= bytesToBeReleased; - } else { - bytesToRelease = bytesToBeReleased - bytesToBeReserved; - bytesToBeReserved = 0; - LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease); - reservedBytesInTotal -= bytesToRelease; - } + releaseBytesImmediately(bytesToBeReleased); bytesToBeReleased = 0; } } + private void releaseBytesImmediately(final long size) { + long poolBytes = deductReleaseAccounting(size); + if (poolBytes > 0) { + LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(poolBytes); + } + } + + /** Deduct release size from pending reserve, fallback quota, then pool reservation in order. */ + private long deductReleaseAccounting(final long size) { + long remaining = size; + if (remaining <= bytesToBeReserved) { + bytesToBeReserved -= remaining; + return 0L; + } + remaining -= bytesToBeReserved; + bytesToBeReserved = 0; + + if (remaining <= fallbackBytesInTotal) { + fallbackBytesInTotal -= remaining; + return 0L; + } + remaining -= fallbackBytesInTotal; + fallbackBytesInTotal = 0; + + reservedBytesInTotal -= remaining; + checkState(reservedBytesInTotal >= 0, "Released bytes has been larger than reserved!"); + return remaining; + } + @Override public void releaseAllReservedMemory() { if (reservedBytesInTotal != 0) { LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal); reservedBytesInTotal = 0; - bytesToBeReserved = 0; - bytesToBeReleased = 0; } + fallbackBytesInTotal = 0; + bytesToBeReserved = 0; + bytesToBeReleased = 0; } @Override public Pair releaseMemoryVirtually(final long size) { - if (bytesToBeReserved >= size) { - bytesToBeReserved -= size; - return new Pair<>(size, 0L); - } else { - long releasedBytesInReserved = bytesToBeReserved; - long releasedBytesInTotal = size - bytesToBeReserved; - bytesToBeReserved = 0; - reservedBytesInTotal -= releasedBytesInTotal; - return new Pair<>(releasedBytesInReserved, releasedBytesInTotal); - } + long poolBytes = deductReleaseAccounting(size); + return new Pair<>(size - poolBytes, poolBytes); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java index 2a544421f3ffd..0a1c6eee4181e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java @@ -66,4 +66,9 @@ public synchronized void reserveMemoryVirtually( final long bytesToBeReserved, final long bytesAlreadyReserved) { super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved); } + + @Override + public synchronized void setHighestPriority(boolean isHighestPriority) { + super.setHighestPriority(isHighestPriority); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java index 7daad8d46e416..b4e8d97352919 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java @@ -38,12 +38,18 @@ public class MemoryReader implements CteDataReader { private final CteDataStore dataStore; private int tsBlockIndex; - public MemoryReader(CteDataStore dataStore, QueryId queryId) { + public MemoryReader(CteDataStore dataStore, QueryId queryId, boolean isHighestPriority) { this.dataStore = dataStore; this.tsBlockIndex = 0; if (dataStore.incrementAndGetCount() == 1) { - LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( - dataStore.ramBytesUsed(), 0L, queryId.getId(), MemoryReader.class.getName()); + long actualReserved = + LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( + dataStore.ramBytesUsed(), + 0L, + queryId.getId(), + MemoryReader.class.getName(), + isHighestPriority); + dataStore.setActualReservedBytes(actualReserved); } } @@ -63,7 +69,11 @@ public TsBlock next() throws IoTDBException { @Override public void close() throws IoTDBException { if (dataStore.decrementAndGetCount() == 0) { - LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(dataStore.ramBytesUsed()); + long reservedBytes = dataStore.getActualReservedBytes(); + if (reservedBytes > 0) { + LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytes); + } + dataStore.setActualReservedBytes(0L); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java index 39b9c41447cdf..65cbc0f5b9f2b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java @@ -339,6 +339,9 @@ public void reserveMemoryVirtually(long bytesToBeReserved, long bytesAlreadyRese reservedBytes += bytesToBeReserved + bytesAlreadyReserved; } + @Override + public void setHighestPriority(boolean isHighestPriority) {} + private long getReservedBytes() { return reservedBytes; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/CteScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/CteScanOperatorTest.java index 7309e2e06adcb..ab476f61ee9ea 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/CteScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/CteScanOperatorTest.java @@ -177,7 +177,7 @@ public void testMultipleCteScanOperators() throws Exception { new CteScanOperator(operatorContext, planNodeId, cteDataStore, queryId); assertEquals(2, cteDataStore.getCount()); - assertEquals(896, cteDataStore.ramBytesUsed()); + assertEquals(904, cteDataStore.ramBytesUsed()); // Both operators should be able to read data assertTrue(operator1.hasNext()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java new file mode 100644 index 0000000000000..6d0cabb044313 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner; + +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class LocalExecutionPlannerOperatorsMemoryTest { + + private static final LocalExecutionPlanner PLANNER = LocalExecutionPlanner.getInstance(); + + private long bytesHeldByTest = 0L; + + @After + public void tearDown() { + if (bytesHeldByTest > 0) { + PLANNER.releaseToFreeMemoryForOperators(bytesHeldByTest); + bytesHeldByTest = 0L; + } + } + + @Test + public void testAllocateOperatorsMemoryFailsWhenInsufficient() { + long free = PLANNER.getFreeMemoryForOperators(); + Assert.assertEquals(-1L, PLANNER.allocateOperatorsMemoryForTest(free + 1024L, false)); + } + + @Test + public void testAllocateOperatorsMemorySucceedsWhenAvailable() { + long request = Math.min(1024L, PLANNER.getFreeMemoryForOperators()); + long reserved = PLANNER.allocateOperatorsMemoryForTest(request, false); + Assert.assertEquals(request, reserved); + bytesHeldByTest = reserved; + } + + @Test + public void testHighestPriorityAllocatesWhenPoolHasRoom() { + long request = Math.min(1024L, PLANNER.getFreeMemoryForOperators()); + if (request <= 0) { + return; + } + long freeBefore = PLANNER.getFreeMemoryForOperators(); + + long reserved = PLANNER.allocateOperatorsMemoryForTest(request, true); + Assert.assertEquals(request, reserved); + Assert.assertEquals(freeBefore - request, PLANNER.getFreeMemoryForOperators()); + bytesHeldByTest = reserved; + } + + @Test + public void testHighestPriorityFallbackWhenPoolInsufficient() { + long freeBefore = PLANNER.getFreeMemoryForOperators(); + long request = freeBefore + 1024L; + + Assert.assertEquals(0L, PLANNER.allocateOperatorsMemoryForTest(request, true)); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + } + + @Test + public void testMemoryReservationManagerHighestPriorityAllocatesWhenPoolHasRoom() { + long request = Math.min(1024L, PLANNER.getFreeMemoryForOperators()); + if (request <= 0) { + return; + } + + NotThreadSafeMemoryReservationManager manager = + new NotThreadSafeMemoryReservationManager(new QueryId("show_queries"), "test"); + manager.setHighestPriority(true); + long freeBefore = PLANNER.getFreeMemoryForOperators(); + + manager.reserveMemoryImmediately(request); + Assert.assertEquals(request, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(freeBefore - request, PLANNER.getFreeMemoryForOperators()); + + manager.releaseAllReservedMemory(); + Assert.assertEquals(0L, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + } + + @Test + public void testMemoryReservationManagerHighestPriorityFallbackWhenPoolInsufficient() { + long freeBefore = PLANNER.getFreeMemoryForOperators(); + long request = freeBefore + 1024L; + + NotThreadSafeMemoryReservationManager manager = + new NotThreadSafeMemoryReservationManager(new QueryId("show_queries"), "test"); + manager.setHighestPriority(true); + + manager.reserveMemoryImmediately(request); + Assert.assertEquals(0L, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(request, manager.getFallbackBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + + manager.releaseMemoryCumulatively(request); + Assert.assertEquals(0L, manager.getFallbackBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + + manager.releaseAllReservedMemory(); + Assert.assertEquals(0L, manager.getFallbackBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + } + + @Test + public void testMemoryReservationManagerHighestPriorityFallbackReleaseViaBatchThreshold() { + long freeBefore = PLANNER.getFreeMemoryForOperators(); + long request = freeBefore + MEMORY_BATCH_THRESHOLD; + + NotThreadSafeMemoryReservationManager manager = + new NotThreadSafeMemoryReservationManager(new QueryId("show_queries"), "test"); + manager.setHighestPriority(true); + + manager.reserveMemoryImmediately(request); + Assert.assertEquals(0L, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(request, manager.getFallbackBytesInTotalForTest()); + + manager.releaseMemoryCumulatively(request); + Assert.assertEquals(0L, manager.getFallbackBytesInTotalForTest()); + Assert.assertEquals(0L, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + } + + private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L; + + @Test + public void testMemoryReservationManagerNormalPriorityReserveAndRelease() { + long request = Math.min(1024L, PLANNER.getFreeMemoryForOperators()); + if (request <= 0) { + return; + } + + NotThreadSafeMemoryReservationManager manager = + new NotThreadSafeMemoryReservationManager(new QueryId("normal_query"), "test"); + long freeBefore = PLANNER.getFreeMemoryForOperators(); + + manager.reserveMemoryImmediately(request); + Assert.assertEquals(request, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(freeBefore - request, PLANNER.getFreeMemoryForOperators()); + + manager.releaseAllReservedMemory(); + Assert.assertEquals(0L, manager.getReservedBytesInTotalForTest()); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java index d2cbcc17fa55b..81484abfe0b3f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/utils/cte/CteDataStore.java @@ -48,6 +48,12 @@ public class CteDataStore implements Accountable { // reference count by CteScanReader private final AtomicInteger count; + /** + * Bytes actually reserved from the operators free-memory pool for this CTE data store. May be + * {@code 0} if reservation fell back (e.g. highest-priority query with insufficient pool). + */ + private long actualReservedBytes; + public CteDataStore(TableSchema tableSchema, List columnIndex2TsBlockColumnIndexList) { this.tableSchema = tableSchema; this.columnIndex2TsBlockColumnIndexList = columnIndex2TsBlockColumnIndexList; @@ -106,4 +112,12 @@ public long ramBytesUsed() { public int getCount() { return count.get(); } + + public long getActualReservedBytes() { + return actualReservedBytes; + } + + public void setActualReservedBytes(long actualReservedBytes) { + this.actualReservedBytes = actualReservedBytes; + } }