Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
}
}

public void setSchemaBatchSizeHistogram(Histogram schemaBatchSizeHistogram) {
if (outputPipeConnector instanceof IoTDBSink) {
((IoTDBSink) outputPipeConnector).setSchemaBatchSizeHistogram(schemaBatchSizeHistogram);
}
}

public void setTsFileBatchSizeHistogram(Histogram tsFileBatchSizeHistogram) {
if (outputPipeConnector instanceof IoTDBSink) {
((IoTDBSink) outputPipeConnector).setTsFileBatchSizeHistogram(tsFileBatchSizeHistogram);
Expand All @@ -341,6 +347,13 @@ public void setTabletBatchTimeIntervalHistogram(Histogram tabletBatchTimeInterva
}
}

public void setSchemaBatchTimeIntervalHistogram(Histogram schemaBatchTimeIntervalHistogram) {
if (outputPipeConnector instanceof IoTDBSink) {
((IoTDBSink) outputPipeConnector)
.setSchemaBatchTimeIntervalHistogram(schemaBatchTimeIntervalHistogram);
}
}

public void setTsFileBatchTimeIntervalHistogram(Histogram tsFileBatchTimeIntervalHistogram) {
if (outputPipeConnector instanceof IoTDBSink) {
((IoTDBSink) outputPipeConnector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.type.Histogram;
import org.apache.iotdb.metrics.type.Rate;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
Expand Down Expand Up @@ -57,6 +58,7 @@ public void bindTo(final AbstractMetricService metricService) {

private void createMetrics(final String taskID) {
createRate(taskID);
createHistogram(taskID);
}

private void createRate(final String taskID) {
Expand All @@ -73,6 +75,38 @@ private void createRate(final String taskID) {
String.valueOf(connector.getCreationTime())));
}

private void createHistogram(final String taskID) {
final PipeSinkSubtask connector = connectorMap.get(taskID);

final Histogram schemaBatchSizeHistogram =
metricService.getOrCreateHistogram(
Metric.PIPE_SCHEMA_BATCH_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
connector.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
connector.setSchemaBatchSizeHistogram(schemaBatchSizeHistogram);

final Histogram schemaBatchTimeIntervalHistogram =
metricService.getOrCreateHistogram(
Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
connector.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
connector.setSchemaBatchTimeIntervalHistogram(schemaBatchTimeIntervalHistogram);

final Histogram schemaBatchEventSizeHistogram =
metricService.getOrCreateHistogram(
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
connector.getAttributeSortedString());
connector.setEventSizeHistogram(schemaBatchEventSizeHistogram);
}

@Override
public void unbindFrom(final AbstractMetricService metricService) {
ImmutableSet.copyOf(connectorMap.keySet()).forEach(this::deregister);
Expand All @@ -84,6 +118,7 @@ public void unbindFrom(final AbstractMetricService metricService) {

private void removeMetrics(final String taskID) {
removeRate(taskID);
removeHistogram(taskID);
}

private void removeRate(final String taskID) {
Expand All @@ -99,6 +134,29 @@ private void removeRate(final String taskID) {
schemaRateMap.remove(taskID);
}

private void removeHistogram(final String taskID) {
final PipeSinkSubtask connector = connectorMap.get(taskID);
metricService.remove(
MetricType.HISTOGRAM,
Metric.PIPE_SCHEMA_BATCH_SIZE.toString(),
Tag.NAME.toString(),
connector.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
metricService.remove(
MetricType.HISTOGRAM,
Metric.PIPE_SCHEMA_BATCH_TIME_COST.toString(),
Tag.NAME.toString(),
connector.getAttributeSortedString(),
Tag.CREATION_TIME.toString(),
String.valueOf(connector.getCreationTime()));
metricService.remove(
MetricType.HISTOGRAM,
Metric.PIPE_CONNECTOR_BATCH_SIZE.toString(),
Tag.NAME.toString(),
connector.getAttributeSortedString());
}

//////////////////////////// Register & deregister (pipe integration) ////////////////////////////

public void register(@NonNull final PipeSinkSubtask pipeSinkSubtask) {
Expand Down
Loading
Loading