diff --git a/iotdb-client/subscription/src/main/i18n/en/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java b/iotdb-client/subscription/src/main/i18n/en/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java index f19f35b705e6d..e053a12140c8f 100644 --- a/iotdb-client/subscription/src/main/i18n/en/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java +++ b/iotdb-client/subscription/src/main/i18n/en/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java @@ -44,6 +44,9 @@ public final class SubscriptionMessages { "SubscriptionPushConsumer {} cancel auto poll worker"; public static final String PUSH_CONSUMER_SUBMIT_AUTO_POLL = "SubscriptionPushConsumer {} submit auto poll worker"; + public static final String PUSH_CONSUMER_POLL_EMPTY_MESSAGE = + "SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s), " + + "consecutive empty polls: {}"; public static final String CONSUMER_LISTENER_FAILURE = "Consumer listener result failure when consuming message: {}"; public static final String AUTO_POLL_UNEXPECTED = "something unexpected happened when auto poll messages..."; @@ -75,6 +78,9 @@ public final class SubscriptionMessages { "SubscriptionPullConsumer {} cancel auto commit worker"; public static final String PULL_CONSUMER_SUBMIT_AUTO_COMMIT = "SubscriptionPullConsumer {} submit auto commit worker"; + public static final String PULL_CONSUMER_POLL_EMPTY_MESSAGE = + "SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s), " + + "consecutive empty polls: {}"; public static final String AUTO_COMMIT_UNEXPECTED = "something unexpected happened when auto commit messages..."; public static final String COMMIT_DURING_CLOSE_UNEXPECTED = diff --git a/iotdb-client/subscription/src/main/i18n/zh/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java b/iotdb-client/subscription/src/main/i18n/zh/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java index 05d0639d15d5d..e623dd68a8b01 100644 --- a/iotdb-client/subscription/src/main/i18n/zh/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java +++ b/iotdb-client/subscription/src/main/i18n/zh/org/apache/iotdb/rpc/subscription/i18n/SubscriptionMessages.java @@ -44,6 +44,8 @@ public final class SubscriptionMessages { "SubscriptionPushConsumer {} 取消自动拉取工作线程"; public static final String PUSH_CONSUMER_SUBMIT_AUTO_POLL = "SubscriptionPushConsumer {} 提交自动拉取工作线程"; + public static final String PUSH_CONSUMER_POLL_EMPTY_MESSAGE = + "SubscriptionPushConsumer {} 从主题 {} 拉取到空消息,耗时 {} 毫秒,连续空拉取次数:{}"; public static final String CONSUMER_LISTENER_FAILURE = "消费消息时消费者监听器结果失败:{}"; public static final String AUTO_POLL_UNEXPECTED = "自动拉取消息时发生意外情况..."; @@ -75,6 +77,8 @@ public final class SubscriptionMessages { "SubscriptionPullConsumer {} 取消自动提交工作线程"; public static final String PULL_CONSUMER_SUBMIT_AUTO_COMMIT = "SubscriptionPullConsumer {} 提交自动提交工作线程"; + public static final String PULL_CONSUMER_POLL_EMPTY_MESSAGE = + "SubscriptionPullConsumer {} 从主题 {} 拉取到空消息,耗时 {} 毫秒,连续空拉取次数:{}"; public static final String AUTO_COMMIT_UNEXPECTED = "自动提交消息时发生意外情况..."; public static final String COMMIT_DURING_CLOSE_UNEXPECTED = diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java index 240ab1745527b..550e6826ca522 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalLong; import java.util.Properties; import java.util.Queue; import java.util.Set; @@ -83,6 +84,8 @@ public abstract class AbstractSubscriptionPullConsumer extends AbstractSubscript private SortedMap> uncommittedCommitContexts; + private final EmptyPollLogThrottler emptyPollLogThrottler = new EmptyPollLogThrottler(); + private final AtomicBoolean isClosed = new AtomicBoolean(true); @Override @@ -137,6 +140,7 @@ protected synchronized void open() throws SubscriptionException { // set isClosed to false before submitting workers isClosed.set(false); + emptyPollLogThrottler.reset(); // submit auto poll worker if enabling auto commit if (autoCommit) { @@ -237,11 +241,16 @@ protected List poll(final Set topicNames, final lon final List messages = multiplePoll(parsedTopicNames, timeoutMs); if (messages.isEmpty() && processors.isEmpty()) { - LOGGER.info( - "SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s)", - this, - CollectionUtils.getLimitedString(parsedTopicNames, 32), - timeoutMs); + final OptionalLong consecutiveEmptyPollCount = + emptyPollLogThrottler.markEmptyPollAndMaybeGetCount(); + if (consecutiveEmptyPollCount.isPresent()) { + LOGGER.info( + SubscriptionMessages.PULL_CONSUMER_POLL_EMPTY_MESSAGE, + this, + CollectionUtils.getLimitedString(parsedTopicNames, 32), + timeoutMs, + consecutiveEmptyPollCount.getAsLong()); + } return messages; } @@ -260,6 +269,7 @@ protected List poll(final Set topicNames, final lon return processed; } + emptyPollLogThrottler.reset(); trackAutoCommitMessages(processed); return processed; diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumer.java index 8c7842b714c88..17b96e4228441 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumer.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalLong; import java.util.Properties; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,6 +63,8 @@ public abstract class AbstractSubscriptionPushConsumer extends AbstractSubscript private final long autoPollIntervalMs; private final long autoPollTimeoutMs; + private final EmptyPollLogThrottler emptyPollLogThrottler = new EmptyPollLogThrottler(); + private final AtomicBoolean isClosed = new AtomicBoolean(true); protected AbstractSubscriptionPushConsumer( @@ -128,6 +131,7 @@ protected synchronized void open() throws SubscriptionException { // set isClosed to false before submitting workers isClosed.set(false); + emptyPollLogThrottler.reset(); // submit auto poll worker submitAutoPollWorker(); @@ -198,14 +202,20 @@ public void run() { return type == SubscriptionMessageType.WATERMARK.getType(); }); if (messages.isEmpty()) { - LOGGER.info( - "SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s)", - this, - CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32), - autoPollTimeoutMs); + final OptionalLong consecutiveEmptyPollCount = + emptyPollLogThrottler.markEmptyPollAndMaybeGetCount(); + if (consecutiveEmptyPollCount.isPresent()) { + LOGGER.info( + SubscriptionMessages.PUSH_CONSUMER_POLL_EMPTY_MESSAGE, + AbstractSubscriptionPushConsumer.this, + CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32), + autoPollTimeoutMs, + consecutiveEmptyPollCount.getAsLong()); + } return; } + emptyPollLogThrottler.reset(); if (ackStrategy.equals(AckStrategy.BEFORE_CONSUME)) { ack(messages); } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/EmptyPollLogThrottler.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/EmptyPollLogThrottler.java new file mode 100644 index 0000000000000..6097add99ea54 --- /dev/null +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/EmptyPollLogThrottler.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session.subscription.consumer.base; + +import java.util.OptionalLong; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; + +final class EmptyPollLogThrottler { + + private static final long DEFAULT_LOG_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1); + + private final long logIntervalNanos; + private final LongSupplier ticker; + + private long consecutiveEmptyPollCount; + private long lastLogTimeNanos; + private boolean hasLoggedEmptyPoll; + + EmptyPollLogThrottler() { + this(DEFAULT_LOG_INTERVAL_NANOS, System::nanoTime); + } + + EmptyPollLogThrottler(final long logIntervalNanos, final LongSupplier ticker) { + this.logIntervalNanos = Math.max(logIntervalNanos, 1); + this.ticker = ticker; + } + + synchronized OptionalLong markEmptyPollAndMaybeGetCount() { + consecutiveEmptyPollCount++; + final long currentTimeNanos = ticker.getAsLong(); + if (!hasLoggedEmptyPoll || currentTimeNanos - lastLogTimeNanos >= logIntervalNanos) { + hasLoggedEmptyPoll = true; + lastLogTimeNanos = currentTimeNanos; + return OptionalLong.of(consecutiveEmptyPollCount); + } + return OptionalLong.empty(); + } + + synchronized void reset() { + consecutiveEmptyPollCount = 0; + lastLogTimeNanos = 0; + hasLoggedEmptyPoll = false; + } +} diff --git a/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/base/EmptyPollLogThrottlerTest.java b/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/base/EmptyPollLogThrottlerTest.java new file mode 100644 index 0000000000000..10cd2a5678d49 --- /dev/null +++ b/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/base/EmptyPollLogThrottlerTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session.subscription.consumer.base; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.OptionalLong; +import java.util.concurrent.atomic.AtomicLong; + +public class EmptyPollLogThrottlerTest { + + @Test + public void testThrottleConsecutiveEmptyPollLogs() { + final AtomicLong ticker = new AtomicLong(); + final EmptyPollLogThrottler throttler = new EmptyPollLogThrottler(100, ticker::get); + + OptionalLong logCount = throttler.markEmptyPollAndMaybeGetCount(); + Assert.assertTrue(logCount.isPresent()); + Assert.assertEquals(1, logCount.getAsLong()); + + ticker.addAndGet(99); + logCount = throttler.markEmptyPollAndMaybeGetCount(); + Assert.assertFalse(logCount.isPresent()); + + ticker.incrementAndGet(); + logCount = throttler.markEmptyPollAndMaybeGetCount(); + Assert.assertTrue(logCount.isPresent()); + Assert.assertEquals(3, logCount.getAsLong()); + } + + @Test + public void testResetMakesNextEmptyPollLoggable() { + final AtomicLong ticker = new AtomicLong(); + final EmptyPollLogThrottler throttler = new EmptyPollLogThrottler(100, ticker::get); + + Assert.assertTrue(throttler.markEmptyPollAndMaybeGetCount().isPresent()); + Assert.assertFalse(throttler.markEmptyPollAndMaybeGetCount().isPresent()); + + throttler.reset(); + + final OptionalLong logCount = throttler.markEmptyPollAndMaybeGetCount(); + Assert.assertTrue(logCount.isPresent()); + Assert.assertEquals(1, logCount.getAsLong()); + } +}