Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
Expand Down Expand Up @@ -402,6 +403,10 @@ public Translator(Pipeline pipeline, DataflowRunner runner, SdkComponents sdkCom
* @return a Job definition filled in with the type of job, the environment, and the job steps.
*/
public Job translate(List<DataflowPackage> packages) {
// Ensure the experiments list is mutable before any experiments are added.
if (options.getExperiments() != null) {
options.setExperiments(new ArrayList<>(options.getExperiments()));
}
job.setName(options.getJobName().toLowerCase());
Comment on lines +406 to 410

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This pre-emptive copy of the experiments list to an ArrayList is redundant. ExperimentalOptions.addExperiment already handles null-initialization and copies the existing list into a new mutable ArrayList internally before adding any new experiments. You can safely remove this block.

Suggested change
// Ensure the experiments list is mutable before any experiments are added.
if (options.getExperiments() != null) {
options.setExperiments(new ArrayList<>(options.getExperiments()));
}
job.setName(options.getJobName().toLowerCase());
job.setName(options.getJobName().toLowerCase());


Environment environment = new Environment();
Expand All @@ -414,19 +419,8 @@ public Job translate(List<DataflowPackage> packages) {
// back end as well. If streaming engine is not enabled make sure the experiments are also
// not enabled.
if (options.isEnableStreamingEngine()) {
List<String> experiments = options.getExperiments();
if (experiments == null) {
experiments = new ArrayList<String>();
} else {
experiments = new ArrayList<String>(experiments);
}
if (!experiments.contains(GcpOptions.STREAMING_ENGINE_EXPERIMENT)) {
experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT);
}
if (!experiments.contains(GcpOptions.WINDMILL_SERVICE_EXPERIMENT)) {
experiments.add(GcpOptions.WINDMILL_SERVICE_EXPERIMENT);
}
options.setExperiments(experiments);
ExperimentalOptions.addExperiment(options, GcpOptions.STREAMING_ENGINE_EXPERIMENT);
ExperimentalOptions.addExperiment(options, GcpOptions.WINDMILL_SERVICE_EXPERIMENT);
} else {
List<String> experiments = options.getExperiments();
if (experiments != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1243,16 +1243,21 @@ private static boolean includesTransformUpgrades(Pipeline pipeline) {
@SuppressWarnings("Slf4jFormatShouldBeConst")
@Override
public DataflowPipelineJob run(Pipeline pipeline) {
// Ensure the experiments list is mutable before any experiments are added.
if (options.getExperiments() != null) {
options.setExperiments(new ArrayList<>(options.getExperiments()));
}
// Multi-language pipelines and pipelines that include upgrades should automatically be upgraded
Comment on lines +1246 to 1250

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This pre-emptive copy of the experiments list to an ArrayList is redundant. ExperimentalOptions.addExperiment already handles null-initialization and copies the existing list into a new mutable ArrayList internally before adding any new experiments. You can safely remove this block.

Suggested change
// Ensure the experiments list is mutable before any experiments are added.
if (options.getExperiments() != null) {
options.setExperiments(new ArrayList<>(options.getExperiments()));
}
// Multi-language pipelines and pipelines that include upgrades should automatically be upgraded
// Multi-language pipelines and pipelines that include upgrades should automatically be upgraded

// to Dataflow Portable Runner.
if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) {
if (!useUnifiedWorker(options)) {
List<String> experiments = firstNonNull(options.getExperiments(), Collections.emptyList());
LOG.info(
"Automatically enabling Dataflow Portable Runner since the pipeline used cross-language"
+ " transforms or pipeline needed a transform upgrade.");
options.setExperiments(
ImmutableList.<String>builder().addAll(experiments).add("use_runner_v2").build());
if (!firstNonNull(options.getExperiments(), Collections.emptyList())
.contains("use_runner_v2")) {
LOG.info(
"Automatically enabling Dataflow Portable Runner since the pipeline used cross-language"
+ " transforms or pipeline needed a transform upgrade.");
ExperimentalOptions.addExperiment(options, "use_runner_v2");
}
}
}
if (useUnifiedWorker(options)) {
Expand All @@ -1264,21 +1269,10 @@ public DataflowPipelineJob run(Pipeline pipeline) {
throw new IllegalArgumentException(
"Dataflow Portable Runner both disabled and enabled: at least one of ['enable_portable_runner', 'beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['enable_streaming_java_runner', 'disable_portable_runner', 'disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set.");
}
List<String> experiments =
new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true
if (!experiments.contains("use_runner_v2")) {
experiments.add("use_runner_v2");
}
if (!experiments.contains("use_unified_worker")) {
experiments.add("use_unified_worker");
}
if (!experiments.contains("beam_fn_api")) {
experiments.add("beam_fn_api");
}
if (!experiments.contains("use_portable_job_submission")) {
experiments.add("use_portable_job_submission");
}
options.setExperiments(ImmutableList.copyOf(experiments));
ExperimentalOptions.addExperiment(options, "use_runner_v2");
ExperimentalOptions.addExperiment(options, "use_unified_worker");
ExperimentalOptions.addExperiment(options, "beam_fn_api");
ExperimentalOptions.addExperiment(options, "use_portable_job_submission");
// Ensure that logging via the FnApi is enabled
options.as(SdkHarnessOptions.class).setEnableLogViaFnApi(true);
}
Expand All @@ -1305,19 +1299,16 @@ public DataflowPipelineJob run(Pipeline pipeline) {
options.setStreaming(true);

{
List<String> experiments =
options.getExperiments() == null
? new ArrayList<>()
: new ArrayList<>(options.getExperiments());
// Experiment marking that the harness supports tag encoding v2
// Backend will enable tag encoding v2 only if the harness supports it.
experiments.add("streaming_engine_state_tag_encoding_v2_supported");
ExperimentalOptions.addExperiment(
options, "streaming_engine_state_tag_encoding_v2_supported");
// Experiment requesting tag encoding v2 on new jobs starting with 2.75.0. During job
// updates old job's tag encoding version is carried over by the backend.
if (!StreamingOptions.updateCompatibilityVersionLessThan(options, "2.75.0")) {
experiments.add("enable_streaming_engine_state_tag_encoding_v2");
ExperimentalOptions.addExperiment(
options, "enable_streaming_engine_state_tag_encoding_v2");
}
options.setExperiments(ImmutableList.copyOf(experiments));
}

if (useUnifiedWorker(options)) {
Expand Down Expand Up @@ -1443,15 +1434,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages);

if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) {
List<String> experiments =
firstNonNull(dataflowOptions.getExperiments(), Collections.emptyList());
if (!experiments.contains("use_staged_dataflow_worker_jar")) {
dataflowOptions.setExperiments(
ImmutableList.<String>builder()
.addAll(experiments)
.add("use_staged_dataflow_worker_jar")
.build());
}
ExperimentalOptions.addExperiment(options, "use_staged_dataflow_worker_jar");
}

Job newJob = jobSpecification.getJob();
Expand Down Expand Up @@ -1510,11 +1493,8 @@ public DataflowPipelineJob run(Pipeline pipeline) {
.collect(Collectors.toList());

if (minCpuFlags.isEmpty()) {
dataflowOptions.setExperiments(
ImmutableList.<String>builder()
.addAll(experiments)
.add("min_cpu_platform=" + dataflowOptions.getMinCpuPlatform())
.build());
ExperimentalOptions.addExperiment(
dataflowOptions, "min_cpu_platform=" + dataflowOptions.getMinCpuPlatform());
} else {
LOG.warn(
"Flag min_cpu_platform is defined in both top level PipelineOption, "
Expand Down Expand Up @@ -1550,12 +1530,8 @@ public DataflowPipelineJob run(Pipeline pipeline) {
// enable upload_graph when the graph is too large
byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8);
int jobGraphByteSize = jobGraphBytes.length;
if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES
&& !hasExperiment(options, "upload_graph")
&& !useUnifiedWorker(options)) {
List<String> experiments = firstNonNull(options.getExperiments(), Collections.emptyList());
options.setExperiments(
ImmutableList.<String>builder().addAll(experiments).add("upload_graph").build());
if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES && !useUnifiedWorker(options)) {
ExperimentalOptions.addExperiment(options, "upload_graph");
LOG.info(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like this log line will now be printed regardless even if the experiment was already enabled.
let's add && !hasExperiment(options, "upload_graph") back?

"The job graph size ({} in bytes) is larger than {}. Automatically add "
+ "the upload_graph option to experiments.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
Expand Down Expand Up @@ -110,16 +111,20 @@ private static DataflowWorkerHarnessOptions testOptions(
boolean enableStreamingEngine, List<String> additionalExperiments) {
DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.create().as(DataflowWorkerHarnessOptions.class);
// Ensure the experiments list is mutable before any experiments are added.
if (options.getExperiments() != null) {
options.setExperiments(new ArrayList<>(options.getExperiments()));
}
options.setProject("project");
Comment on lines +114 to 118

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This pre-emptive copy of the experiments list to an ArrayList is redundant. ExperimentalOptions.addExperiment already handles null-initialization and copies the existing list into a new mutable ArrayList internally before adding any new experiments. You can safely remove this block.

Suggested change
// Ensure the experiments list is mutable before any experiments are added.
if (options.getExperiments() != null) {
options.setExperiments(new ArrayList<>(options.getExperiments()));
}
options.setProject("project");
options.setProject("project");

@tvalentyn tvalentyn Jun 13, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here and in other places: why was this copy necessary?

options.setJobId("job");
options.setWorkerId("worker");
List<String> experiments =
options.getExperiments() == null ? new ArrayList<>() : options.getExperiments();

if (enableStreamingEngine) {
experiments.add(GcpOptions.STREAMING_ENGINE_EXPERIMENT);
ExperimentalOptions.addExperiment(options, GcpOptions.STREAMING_ENGINE_EXPERIMENT);
}
for (String experiment : additionalExperiments) {
ExperimentalOptions.addExperiment(options, experiment);
}
experiments.addAll(additionalExperiments);
options.setExperiments(experiments);

options.setWindmillServiceStreamingRpcBatchLimit(Integer.MAX_VALUE);
options.setWindmillServiceStreamingRpcHealthCheckPeriodMs(NO_HEALTH_CHECK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_xlang_parquetio_write(self):
address = 'localhost:%s' % port
try:
with TestPipeline() as p:
p.get_pipeline_options().view_as(DebugOptions).experiments.append(
p.get_pipeline_options().view_as(DebugOptions).add_experiment(
'jar_packages=' + expansion_jar)
p.not_use_test_runner_api = True
_ = p \
Expand Down
9 changes: 1 addition & 8 deletions sdks/python/apache_beam/io/iobase_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,7 @@ def test_try_split_with_any_exception(self):
class UseSdfBoundedSourcesTests(unittest.TestCase):
def _run_sdf_wrapper_pipeline(self, source, expected_values):
with beam.Pipeline() as p:
experiments = (p._options.view_as(DebugOptions).experiments or [])

# Setup experiment option to enable using SDFBoundedSourceWrapper
if 'beam_fn_api' not in experiments:
# Required so mocking below doesn't mock Create used in assert_that.
experiments.append('beam_fn_api')

p._options.view_as(DebugOptions).experiments = experiments
p._options.view_as(DebugOptions).add_experiment('beam_fn_api')

actual = p | beam.io.Read(source)
assert_that(actual, equal_to(expected_values))
Expand Down
5 changes: 1 addition & 4 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,7 @@ def __init__(
# set default experiments for portable runners
# (needs to occur prior to pipeline construction)
if runner.is_fnapi_compatible():
experiments = (self._options.view_as(DebugOptions).experiments or [])
if not 'beam_fn_api' in experiments:
experiments.append('beam_fn_api')
self._options.view_as(DebugOptions).experiments = experiments
self._options.view_as(DebugOptions).add_experiment('beam_fn_api')

self.local_tempdir = tempfile.mkdtemp(prefix='beam-pipeline-temp')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,13 @@ def __init__(
self.proto.version = version
# TODO: Use enumerated type instead of strings for job types.
if job_type.startswith('FNAPI_'):
self.debug_options.experiments = self.debug_options.experiments or []

debug_options_experiments = self.debug_options.experiments
# Add use_multiple_sdk_containers flag if it's not already present. Do not
# add the flag if 'no_use_multiple_sdk_containers' is present.
# TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
# till version 2.4.
if ('use_multiple_sdk_containers' not in debug_options_experiments and
'no_use_multiple_sdk_containers' not in debug_options_experiments):
debug_options_experiments.append('use_multiple_sdk_containers')
if ('no_use_multiple_sdk_containers'
not in (self.debug_options.experiments or [])):
self.debug_options.add_experiment('use_multiple_sdk_containers')
# FlexRS
if self.google_cloud_options.flexrs_goal == 'COST_OPTIMIZED':
self.proto.flex_resource_scheduling_goal = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,7 @@ def run_pipeline(
RuntimeValueProvider.set_runtime_options({})

# Setup "beam_fn_api" experiment options if lacked.
experiments = (
options.view_as(pipeline_options.DebugOptions).experiments or [])
if not 'beam_fn_api' in experiments:
experiments.append('beam_fn_api')
options.view_as(pipeline_options.DebugOptions).experiments = experiments
options.view_as(pipeline_options.DebugOptions).add_experiment('beam_fn_api')

# This is sometimes needed if type checking is disabled
# to enforce that the inputs (and outputs) of GroupByKey operations
Expand Down
Loading