diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index 67df8b9e8003..d2d322a3470c 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -1106,6 +1106,9 @@ message StandardCoders { // encoded with it's corresponding coder. // Components: single coder for the value NULLABLE = 17 [(beam_urn) = "beam:coder:nullable:v1"]; + + // Coder for Void type. Uses zero bytes per Void. + VOID = 19 [(beam_urn) = "beam:coder:void:v1"]; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java index 5b0d5aedd619..274df769d173 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; import org.apache.beam.sdk.util.ShardedKey; @@ -74,6 +75,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { .put(ShardedKey.Coder.class, ModelCoders.SHARDED_KEY_CODER_URN) .put(TimestampPrefixingWindowCoder.class, ModelCoders.CUSTOM_WINDOW_CODER_URN) .put(NullableCoder.class, ModelCoders.NULLABLE_CODER_URN) + .put(VoidCoder.class, ModelCoders.VOID_CODER_URN) .build(); private static final Map, CoderTranslator> @@ -99,6 +101,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { .put(ShardedKey.Coder.class, CoderTranslators.shardedKey()) .put(TimestampPrefixingWindowCoder.class, CoderTranslators.timestampPrefixingWindow()) .put(NullableCoder.class, CoderTranslators.nullable()) + .put(VoidCoder.class, CoderTranslators.atomic(VoidCoder.class)) .build(); static { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java index 7b7546aceb61..9370428b72dc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java @@ -35,7 +35,8 @@ private ModelCoders() {} public static final String BYTES_CODER_URN = getUrn(StandardCoders.Enum.BYTES); public static final String BOOL_CODER_URN = getUrn(StandardCoders.Enum.BOOL); - // Where is this required explicitly, instead of implicit within WindowedValue and LengthPrefix + // Where is this required explicitly, instead of implicit within WindowedValue + // and LengthPrefix // coders? public static final String INT64_CODER_URN = getUrn(StandardCoders.Enum.VARINT); public static final String STRING_UTF8_CODER_URN = getUrn(StandardCoders.Enum.STRING_UTF8); @@ -48,8 +49,10 @@ private ModelCoders() {} public static final String LENGTH_PREFIX_CODER_URN = getUrn(StandardCoders.Enum.LENGTH_PREFIX); public static final String GLOBAL_WINDOW_CODER_URN = getUrn(StandardCoders.Enum.GLOBAL_WINDOW); - // This isn't strictly required once there's a way to represent an 'unknown window' (i.e. the - // custom window encoding + the maximum timestamp of the window, this can be used for interval + // This isn't strictly required once there's a way to represent an 'unknown + // window' (i.e. the + // custom window encoding + the maximum timestamp of the window, this can be + // used for interval // windows. public static final String INTERVAL_WINDOW_CODER_URN = getUrn(StandardCoders.Enum.INTERVAL_WINDOW); @@ -69,6 +72,8 @@ private ModelCoders() {} public static final String NULLABLE_CODER_URN = getUrn(StandardCoders.Enum.NULLABLE); + public static final String VOID_CODER_URN = getUrn(StandardCoders.Enum.VOID); + static { checkState( STATE_BACKED_ITERABLE_CODER_URN.equals(getUrn(StandardCoders.Enum.STATE_BACKED_ITERABLE))); @@ -93,7 +98,8 @@ private ModelCoders() {} PARAM_WINDOWED_VALUE_CODER_URN, STATE_BACKED_ITERABLE_CODER_URN, SHARDED_KEY_CODER_URN, - NULLABLE_CODER_URN); + NULLABLE_CODER_URN, + VOID_CODER_URN); public static Set urns() { return MODEL_CODER_URNS;