Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,9 @@ message StandardCoders {
// encoded with it's corresponding coder.
// Components: single coder for the value
NULLABLE = 17 [(beam_urn) = "beam:coder:nullable:v1"];

// Coder for Void type. Uses zero bytes per Void.
VOID = 19 [(beam_urn) = "beam:coder:void:v1"];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
import org.apache.beam.sdk.util.ShardedKey;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
.put(ShardedKey.Coder.class, ModelCoders.SHARDED_KEY_CODER_URN)
.put(TimestampPrefixingWindowCoder.class, ModelCoders.CUSTOM_WINDOW_CODER_URN)
.put(NullableCoder.class, ModelCoders.NULLABLE_CODER_URN)
.put(VoidCoder.class, ModelCoders.VOID_CODER_URN)
.build();

private static final Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
Expand All @@ -99,6 +101,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
.put(ShardedKey.Coder.class, CoderTranslators.shardedKey())
.put(TimestampPrefixingWindowCoder.class, CoderTranslators.timestampPrefixingWindow())
.put(NullableCoder.class, CoderTranslators.nullable())
.put(VoidCoder.class, CoderTranslators.atomic(VoidCoder.class))
.build();

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ private ModelCoders() {}

public static final String BYTES_CODER_URN = getUrn(StandardCoders.Enum.BYTES);
public static final String BOOL_CODER_URN = getUrn(StandardCoders.Enum.BOOL);
// Where is this required explicitly, instead of implicit within WindowedValue and LengthPrefix
// Where is this required explicitly, instead of implicit within WindowedValue
// and LengthPrefix
// coders?
public static final String INT64_CODER_URN = getUrn(StandardCoders.Enum.VARINT);
public static final String STRING_UTF8_CODER_URN = getUrn(StandardCoders.Enum.STRING_UTF8);
Expand All @@ -48,8 +49,10 @@ private ModelCoders() {}
public static final String LENGTH_PREFIX_CODER_URN = getUrn(StandardCoders.Enum.LENGTH_PREFIX);

public static final String GLOBAL_WINDOW_CODER_URN = getUrn(StandardCoders.Enum.GLOBAL_WINDOW);
// This isn't strictly required once there's a way to represent an 'unknown window' (i.e. the
// custom window encoding + the maximum timestamp of the window, this can be used for interval
// This isn't strictly required once there's a way to represent an 'unknown
// window' (i.e. the
// custom window encoding + the maximum timestamp of the window, this can be
// used for interval
// windows.
public static final String INTERVAL_WINDOW_CODER_URN =
getUrn(StandardCoders.Enum.INTERVAL_WINDOW);
Expand All @@ -69,6 +72,8 @@ private ModelCoders() {}

public static final String NULLABLE_CODER_URN = getUrn(StandardCoders.Enum.NULLABLE);

public static final String VOID_CODER_URN = getUrn(StandardCoders.Enum.VOID);

static {
checkState(
STATE_BACKED_ITERABLE_CODER_URN.equals(getUrn(StandardCoders.Enum.STATE_BACKED_ITERABLE)));
Expand All @@ -93,7 +98,8 @@ private ModelCoders() {}
PARAM_WINDOWED_VALUE_CODER_URN,
STATE_BACKED_ITERABLE_CODER_URN,
SHARDED_KEY_CODER_URN,
NULLABLE_CODER_URN);
NULLABLE_CODER_URN,
VOID_CODER_URN);

public static Set<String> urns() {
return MODEL_CODER_URNS;
Expand Down
Loading