Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
afd0b39
Introduce python memory profiling pipeline options.
tvalentyn May 13, 2026
fc5e516
Add memray and google-pprof dependencies
tvalentyn May 13, 2026
dc5b25a
Parse profiler options in boot.go
tvalentyn May 14, 2026
6457020
Upload profiles to GCS.
tvalentyn Jun 5, 2026
3c28f12
Disable the profiler after a timeout reached.
tvalentyn Jun 5, 2026
2aa2d1f
Support --profiler_stop_after_crash
tvalentyn Jun 5, 2026
ba4e331
Add on-the-worker postprocessing support.
tvalentyn Jun 8, 2026
ed72010
Generate also the --leaks flamegraph.
tvalentyn Jun 8, 2026
19811c2
Profiler options: use seconds, update defaults.
tvalentyn Jun 8, 2026
569db3b
Set a default profile location from temp location.
tvalentyn Jun 8, 2026
d1f3b01
Refactor: move profiling pieces into a separate file.
tvalentyn Jun 9, 2026
b8168d6
Post-process profiles sequentially.
tvalentyn Jun 9, 2026
7c33d74
Simplify tcmalloc preloading to be compatible with ARM versions.
tvalentyn Jun 10, 2026
a662b08
Use consistent mechanisms for periodic invocations of background tasks.
tvalentyn Jun 10, 2026
b29ee47
Formatting
tvalentyn Jun 10, 2026
e5aa7ee
Regenerate dependencies.
tvalentyn Jun 10, 2026
24c4bb3
Update changes.md
tvalentyn Jun 10, 2026
842a683
Address comments.
tvalentyn Jun 10, 2026
46a3d56
Be more resilient to scenarios when postprocessing was interrupted.
tvalentyn Jun 10, 2026
48fafb0
Handle postprocessing decision for each report individually.
tvalentyn Jun 10, 2026
ad83961
Remove none-handling since pipeline option validation overwrites empt…
tvalentyn Jun 11, 2026
9bbdb44
Append Job ID and hostname if available in the local temp directory s…
tvalentyn Jun 11, 2026
8b5c75f
yapf
tvalentyn Jun 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,21 @@

## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* Python SDK now supports memory profiling with Memray ([#38853](https://github.com/apache/beam/issues/38853)).
* (Python) Added [Qdrant](https://qdrant.tech/) VectorDatabaseWriteConfig implementation ([#38141](https://github.com/apache/beam/issues/38141)).


## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming Engine jobs. It can be disabled by passing `--experiments=disable_streaming_engine_state_tag_encoding_v2` or `--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag encoding version cannot change during a job update. Jobs using tag encoding v2 (enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam versions prior to 2.73.0, as only versions 2.73.0 and later support tag encoding v2. ([#38705](https://github.com/apache/beam/issues/38705)).
* (Python) Added instrumentation to support off-the-shelf profiling agents when launching Python SDK Harness ([#38853](https://github.com/apache/beam/issues/38853)).

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* (Python) Typehints of dataclass fields are honored during type inferences. To restore the behavior of fallback-to-any,
use pipeline option `--exclude_infer_dataclass_field_type` ([#38797](https://github.com/apache/beam/issues/38797)).
However fixing forward is recommended.
Expand All @@ -85,7 +84,6 @@

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed IcebergIO writing manifest column bounds padded with trailing `0x00` bytes, which broke equality predicate pushdown in some query engines (Java) ([#38580](https://github.com/apache/beam/issues/38580)).

## Security Fixes
Expand Down
76 changes: 76 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,82 @@ def _add_argparse_args(cls, parser):
default=1.0,
help='A number between 0 and 1 indicating the ratio '
'of bundles that should be profiled.')
parser.add_argument(
'--profiler_agent',
default=None,
help=(
'Specifies the profiling agent to launch the SDK worker harness '
'with (e.g., "memray", "tcmalloc", or a custom wrapper script/binary).'
))
parser.add_argument(
'--profiler_extra_arg',
'--profiler_extra_args',
dest='profiler_extra_args',
action=_CommaSeparatedListAction,
default=None,
help=
'Comma-separated list of extra arguments to pass to the profiler agent.'
)
parser.add_argument(
'--profiler_extra_env_var',
'--profiler_extra_env_vars',
dest='profiler_extra_env_vars',
action=_CommaSeparatedListAction,
default=None,
help=(
'Comma-separated list of environment variables required by the profiler agent '
'in format "KEY1=VAL1,KEY2=VAL2".'))
parser.add_argument(
'--profile_temp_location',
default=None,
help=(
'Directory path on the worker where local profiles are saved. '
'Defaults to ${semi_persist_dir}/profiles if not specified.'))
parser.add_argument(
'--profile_upload_interval_sec',
type=int,
default=300,
help=(
'Frequency (in seconds) at which the local profiles are uploaded to GCS. '
'Defaults to 300 (5 min).'))
parser.add_argument(
'--profiler_stop_after_sec',
type=int,
default=0,
help=(
'Time limit (in seconds) for profiling a single process. When exceeded, '
'the worker process is restarted without the profiler.'))
parser.add_argument(
'--profiler_stop_after_crash',
action='store_true',
default=False,
help=(
'If True, the profiling agent won\'t be re-enabled after a worker '
'process crash.'))
parser.add_argument(
'--profile_postprocess_interval_sec',
type=int,
default=600,
help=(
'Frequency (in seconds) at which the local profiles are post-processed '
'on-the-fly. Defaults to 600 (10 minutes). Set to 0 to disable.'))

def validate(self, validator):
errors = []
if self.profiler_agent:
if self.profile_cpu or self.profile_memory:
errors.append(
'--profiler_agent is mutually exclusive with --profile_cpu '
'and --profile_memory.')

if not self.profile_location:
temp_location = self.view_as(GoogleCloudOptions).temp_location
if temp_location:
self.profile_location = temp_location.rstrip('/') + '/profiles'
_LOGGER.info(
'Setting --profile_location to %s since profiling is enabled.',
self.profile_location)
return errors


class SetupOptions(PipelineOptions):
Expand Down
37 changes: 37 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,43 @@ def test_unknown_option_prefix(self):
options = PipelineOptions(['--type_check_strictness', 'blahblah'])
options.view_as(TypeOptions)

def test_profiling_agent_is_exclusive_with_legacy_profiling_options(self):
options = PipelineOptions(['--profiler_agent=memray'])
validator = PipelineOptionsValidator(options, None)
self.assertEqual(validator.validate(), [])

options = PipelineOptions(['--profiler_agent=memray', '--profile_cpu'])
validator = PipelineOptionsValidator(options, None)
errors = validator.validate()
self.assertTrue(
any('--profiler_agent is mutually exclusive' in err for err in errors))

options = PipelineOptions(['--profiler_agent=memray', '--profile_memory'])
validator = PipelineOptionsValidator(options, None)
errors = validator.validate()
self.assertTrue(
any('--profiler_agent is mutually exclusive' in err for err in errors))

def test_profile_location_defaulting_and_opt_out(self):
options = PipelineOptions(
['--profiler_agent=memray', '--temp_location=gs://bucket/temp'])
validator = PipelineOptionsValidator(options, None)
self.assertEqual(validator.validate(), [])
self.assertEqual(
options.view_as(ProfilingOptions).profile_location,
'gs://bucket/temp/profiles')

options = PipelineOptions([
'--profiler_agent=memray',
'--temp_location=gs://bucket/temp',
'--profile_location=gs://other-bucket/custom_profiles'
])
validator = PipelineOptionsValidator(options, None)
self.assertEqual(validator.validate(), [])
self.assertEqual(
options.view_as(ProfilingOptions).profile_location,
'gs://other-bucket/custom_profiles')

def test_add_experiment(self):
options = PipelineOptions([])
options.view_as(DebugOptions).add_experiment('new_experiment')
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TestOptions
Expand All @@ -55,6 +56,7 @@ class PipelineOptionsValidator(object):
DebugOptions,
GoogleCloudOptions,
PortableOptions,
ProfilingOptions,
SetupOptions,
StandardOptions,
TestOptions,
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ RUN \
ccache \
# Required for using Beam Python SDK on ARM machines.
libgeos-dev \
# Required for memory profiling with tcmalloc.
google-perftools \
&& \

rm -rf /var/lib/apt/lists/* && \

pip install --upgrade pip setuptools wheel && \
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/container/base_image_requirements_manual.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ google-cloud-profiler;python_version<="3.12"
# tests
google-api-python-client;python_version<="3.13"
guppy3
# Explicitly pin memray to use a version compatible with postprocessing logic in Beam.
memray==1.19.3
mmh3 # Optimizes execution of some Beam codepaths. TODO: Make it Beam's dependency.
nltk # Commonly used for natural language processing.
google-crc32c
Expand Down
90 changes: 81 additions & 9 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"slices"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -133,7 +134,17 @@ type PipelineOptionsData struct {
}

type OptionsData struct {
Experiments []string `json:"experiments"`
Experiments []string `json:"experiments"`
ProfilerAgent string `json:"profiler_agent"`
ProfilerExtraArgs []string `json:"profiler_extra_args"`
ProfilerExtraEnvVars []string `json:"profiler_extra_env_vars"`
ProfileLocation string `json:"profile_location"`
ProfileTempLocation string `json:"profile_temp_location"`
ProfileUploadIntervalSec int `json:"profile_upload_interval_sec"`
ProfilerStopAfterSec int `json:"profiler_stop_after_sec"`
ProfilerStopAfterCrash bool `json:"profiler_stop_after_crash"`
ProfilePostprocessIntervalSec int `json:"profile_postprocess_interval_sec"`
JobId string `json:"jobId,omitempty"`
}

func getExperiments(options string) []string {
Expand Down Expand Up @@ -198,6 +209,14 @@ func launchSDKProcess() error {
logger.Printf(ctx, "Build isolation disabled when installing packages with pip")
}

var opts PipelineOptionsData
if err := json.Unmarshal([]byte(options), &opts); err != nil {
logger.Warnf(ctx, "Failed to unmarshal pipeline options for profiling config: %v", err)
}

ctx = setupProfilerConfig(ctx, logger, &opts)
startProfilerBackgroundTasks(ctx, logger)

// (2) Retrieve and install the staged packages.
//
// No log.Fatalf() from here on, otherwise deferred cleanups will not be called!
Expand Down Expand Up @@ -314,11 +333,6 @@ func launchSDKProcess() error {
childPids.mu.Unlock()
}()

args := []string{
"-m",
sdkHarnessEntrypoint,
}

var wg sync.WaitGroup
wg.Add(len(workerIds))
for _, workerId := range workerIds {
Expand All @@ -333,12 +347,68 @@ func launchSDKProcess() error {
childPids.mu.Unlock()
return
}
logger.Printf(ctx, "Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger, bufLogger, "python", args...)

currentProg := "python"
currentArgs := []string{"-m", sdkHarnessEntrypoint}
currentEnv := map[string]string{"WORKER_ID": workerId}

profilingActive := false
currentProg, currentArgs, currentEnv, profilingActive = maybeWithProfiler(
ctx, logger, workerId, currentProg, currentArgs, currentEnv,
)

var envStr string
if len(currentEnv) > 0 {
var envStrings []string
for k, v := range currentEnv {
envStrings = append(envStrings, k+"="+v)
}
slices.Sort(envStrings)
envStr = strings.Join(envStrings, ", ")
}

logger.Printf(ctx, "Executing Python (%v): %v %v", envStr, currentProg, strings.Join(currentArgs, " "))
cmd := StartCommandEnv(currentEnv, os.Stdin, bufLogger, bufLogger, currentProg, currentArgs...)
childPids.v = append(childPids.v, cmd.Process.Pid)
childPids.mu.Unlock()

if err := cmd.Wait(); err != nil {
var timer *time.Timer
var profilingTimedOut atomic.Bool

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically you'd want to define an atomic value outside of the scope of the goroutine so it is shared amongst the goroutines. Defined within the goroutine function I would suspect each one would have its own copy, which makes the atomic unnecessary as best I can tell. I think it comes down to the behavior you want here: do you want each goroutine to individually stop profiling on a timeout, or would you want one timeout to cause all of the goroutines to stop profiling?

@tvalentyn tvalentyn Jun 10, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the ai review flagged this to me out of concern that the timer coroutine modifies the value concurrently.


pcfg := getProfilerConfig(ctx)
if profilingActive && pcfg.StopAfterSec > 0 {
duration := time.Duration(pcfg.StopAfterSec) * time.Second
timer = time.AfterFunc(duration, func() {
childPids.mu.Lock()
defer childPids.mu.Unlock()
if cmd.Process != nil {
logger.Printf(ctx, "Profiling timeout of %d seconds reached. Sending SIGINT to worker %s",
pcfg.StopAfterSec, workerId)
profilingTimedOut.Store(true)
syscall.Kill(-cmd.Process.Pid, syscall.SIGINT)
Comment thread
tvalentyn marked this conversation as resolved.
}
})
}

err := cmd.Wait()
if timer != nil {
timer.Stop()
}

if err != nil {
if profilingTimedOut.Load() {
stopProfiling(ctx)
bufLogger.FlushAtDebug(ctx)
logger.Printf(ctx, "Python worker %v terminated after profiling timeout. Restarting without profiler.", workerId)
// Error is not counted toward error budget.
continue
}

if profilingActive && pcfg.StopAfterCrash {
stopProfiling(ctx)
logger.Printf(ctx, "Python worker %v crashed. Disabling profiler on subsequent restarts because --profiler_stop_after_crash is enabled.", workerId)
}

// Retry on fatal errors, like OOMs and segfaults, not just
// DoFns throwing exceptions.
errorCount += 1
Expand Down Expand Up @@ -532,3 +602,5 @@ func logSubmissionEnvDependencies(ctx context.Context, bufLogger *tools.Buffered
bufLogger.Printf(ctx, "%s", string(content))
return nil
}


Loading
Loading