Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/5280.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`opentelemetry-sdk`: Add ability to refresh process sensitive Resource attributes
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import concurrent.futures
import json
import logging
import os
import threading
import traceback
import warnings
Expand All @@ -24,7 +25,7 @@
cast,
overload,
)
from weakref import WeakSet
from weakref import WeakMethod, WeakSet

from typing_extensions import deprecated

Expand Down Expand Up @@ -59,7 +60,10 @@
from opentelemetry.sdk.environment_variables._internal import (
parse_boolean_environment_variable,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.resources import (
Resource,
_get_process_sensitive_resource,
)
from opentelemetry.sdk.util import ns_to_iso_str
from opentelemetry.sdk.util._configurator import RuleBasedConfigurator
from opentelemetry.sdk.util.instrumentation import (
Expand Down Expand Up @@ -693,6 +697,9 @@ def _is_enabled(self) -> bool:
def _set_logger_config(self, logger_config: _LoggerConfig) -> None:
self._logger_config = logger_config

def _set_resource(self, resource: Resource) -> None:
self._resource = resource

@property
def instrumentation_scope(self):
return self._instrumentation_scope
Expand Down Expand Up @@ -812,11 +819,41 @@ def __init__(
self._logger_cache_lock = Lock()
self._active_loggers: WeakSet[Logger] = WeakSet()
self._active_loggers_lock = Lock()
if hasattr(os, "register_at_fork"):
weak_at_fork = WeakMethod(self._at_fork_reinit)

def _after_in_child() -> None:
if at_fork := weak_at_fork():
at_fork()

os.register_at_fork(after_in_child=_after_in_child)

def _at_fork_reinit(self) -> None:
self._logger_cache_lock = Lock()
self._active_loggers_lock = Lock()
self.update_resource(_get_process_sensitive_resource())

@property
def resource(self):
return self._resource

def update_resource(self, resource: Resource) -> None:
Comment thread
herin049 marked this conversation as resolved.
Outdated
"""Merge a :class:`opentelemetry.sdk.resources.Resource` into this
`LoggerProvider`'s resource.

The resource of all existing :class:`Logger` instances created by this
provider is also updated to the merged :class:`opentelemetry.sdk.resources.Resource`.

Args:
resource: The resource to merge into this `LoggerProvider`'s
current resource.
"""
with self._active_loggers_lock:
self._resource = self._resource.merge(resource)
for logger in list(self._active_loggers):
# pylint: disable-next=protected-access
logger._set_resource(self._resource)
Comment thread
herin049 marked this conversation as resolved.

def _get_logger_no_cache(
self,
name: str,
Expand Down Expand Up @@ -897,7 +934,7 @@ def _set_logger_configurator(
"""
self._logger_configurator = logger_configurator
with self._active_loggers_lock:
for logger in self._active_loggers:
for logger in list(self._active_loggers):
# pylint: disable-next=protected-access
logger._set_logger_config(
self._apply_logger_configurator(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

import os
import weakref
from atexit import register, unregister
from collections.abc import Callable, Sequence
Expand Down Expand Up @@ -51,7 +52,10 @@
from opentelemetry.sdk.metrics._internal.sdk_configuration import (
SdkConfiguration,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.resources import (
Resource,
_get_process_sensitive_resource,
)
from opentelemetry.sdk.util._configurator import RuleBasedConfigurator
from opentelemetry.sdk.util.instrumentation import (
InstrumentationScope,
Expand Down Expand Up @@ -526,6 +530,21 @@ def __init__(
)
metric_reader._set_meter_provider(self)

if hasattr(os, "register_at_fork"):
weak_at_fork = weakref.WeakMethod(self._at_fork_reinit)

def _after_in_child() -> None:
if at_fork := weak_at_fork():
at_fork()

os.register_at_fork(after_in_child=_after_in_child)

def _at_fork_reinit(self) -> None:
self._lock = Lock()
self._meter_lock = Lock()
type(self)._all_metric_readers_lock = Lock()
self.update_resource(_get_process_sensitive_resource())

def _set_meter_configurator(
self, *, meter_configurator: _MeterConfiguratorT
):
Expand All @@ -543,6 +562,19 @@ def _set_meter_configurator(
self._apply_meter_configurator(instrumentation_scope)
)

def update_resource(self, resource: Resource) -> None:
"""Merge a :class:`opentelemetry.sdk.resources.Resource` into this
`MeterProvider`'s resource.

Args:
resource: The resource to merge into this `MeterProvider`'s
current resource.
"""
with self._meter_lock:
self._sdk_config.resource = self._sdk_config.resource.merge(
resource
)

def _apply_meter_configurator(
self, instrumentation_scope: InstrumentationScope
) -> _MeterConfig:
Expand Down
29 changes: 29 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,21 @@ def detect(self) -> "Resource":
"""Don't call `Resource.create` here to avoid an infinite loop, instead instantiate `Resource` directly"""
raise NotImplementedError()

# pylint: disable-next=no-self-use
def is_process_sensitive(self) -> bool:

@emdneto emdneto Jun 12, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we have an agreement on the naming here? I remember there was a discussion around it from the SIG

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there was anything explicit. If people have better alternatives for names here, I am open to suggestions.

"""Return whether this detector depends on the current process identity.

Process sensitive detectors may return resource attributes that become
stale after a process identity change, such as :func:`os.fork`.
Detectors returning ``True`` should be re-run after such changes so the
resulting :class:`Resource` describes the current process.

Returns:
``True`` if this detector should be re-run after process identity
changes otherwise ``False``.
"""
return False


class OTELResourceDetector(ResourceDetector):
# pylint: disable=no-self-use
Expand Down Expand Up @@ -316,6 +331,9 @@ def detect(self) -> "Resource":

class ProcessResourceDetector(ResourceDetector):
# pylint: disable=no-self-use
def is_process_sensitive(self) -> bool:
return True

def detect(self) -> "Resource":
_runtime_version = ".".join(
map(
Expand Down Expand Up @@ -529,6 +547,17 @@ def _build_resource_detectors() -> list["ResourceDetector"]:
return detectors


def _get_process_sensitive_resource() -> Resource: # pyright: ignore[reportUnusedFunction]
return get_aggregated_resources(
[
detector
for detector in _build_resource_detectors()
if detector.is_process_sensitive()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a dumb idea but I figured I'd share.

What if we instantiate the resource values with a subclass of string (or add a new Protocol interface with repr support) that reads the live value from the process or somehow indicates the value is process sensitive?

I think for service.instance.id, you could return a UUID consistently unless the process start time and pid are detected as having changed (this might not work across al unixes, haven't checked)

One downside, if people are reading such a resource attribute frequently it could be slow (makes a syscall).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a good idea (since it's avoids having to register a post-fork handler), but as you mention there are certainly edge cases that need to be handled and the concern of additional overhead. This hypothetical lazy string object would also need to be thread safe since it would cross thread boundaries quite frequently via batch processors.

My original reasoning with leaning towards this approach is that the logic added in this PR is only executed if the process is forked, and given that >99% of Python applications never fork, I'd prefer to accept a slightly more operationally complex solution that has no impact on the vast majority of users that never fork over a solution that has a potential impacts on all users.

The other "solution" is to just accept this limitation and document that forking an already initialized OTel Python process is not supported. Python has already started raising a deprecation warning if you attempt to fork with multiple threads, and fork is no longer the default start method on any platform.

Changed in version 3.14: This is no longer the default start method on any platform. Code that requires fork must explicitly specify that via get_context() or set_start_method().

Changed in version 3.12: If Python is able to detect that your process has multiple threads, the os.fork() function that this start method calls internally will raise a DeprecationWarning. Use a different start method. See the os.fork() documentation for further explanation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and given that >99% of Python applications never fork

The other "solution" is to just accept this limitation and document that forking an already initialized OTel Python process is not supported. Python has already started raising a deprecation warning if you attempt to fork with multiple threads, and fork is no longer the default start method on any platform.

Is that viable though? I think auto instrumenting gunicorn is still a hassle, you have to set up a post fork hook. That's not possible in some cases like the k8s Operator and I imagine the new OTel Injector. I also see regular issues/PRs being filed asking for components to made fork safe. I do agree with you though, it's kind of a losing battle to get complete coverage, and having tons of post fork hooks is also problematic.

With all that in mind, it would be good to keep the implementation hidden/private for now so we can reevaluate later. I would even be OK with something less generalized than this PR, like special casing known process sensitive attributes in Resource class so we don't need to have hooks in each SDK to recreate.

],
Resource.get_empty(),
)


def get_aggregated_resources(
detectors: Sequence["ResourceDetector"],
initial_resource: Resource | None = None,
Expand Down
36 changes: 35 additions & 1 deletion opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
from opentelemetry.sdk.environment_variables._internal import (
parse_boolean_environment_variable,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.resources import (
Resource,
_get_process_sensitive_resource,
)
from opentelemetry.sdk.trace import sampling
from opentelemetry.sdk.trace._tracer_metrics import create_tracer_metrics
from opentelemetry.sdk.trace.id_generator import IdGenerator, RandomIdGenerator
Expand Down Expand Up @@ -1141,6 +1144,9 @@ def __init__(
def _set_tracer_config(self, tracer_config: _TracerConfig):
self._tracer_config = tracer_config

def _set_resource(self, resource: Resource) -> None:
self.resource = resource

def _is_enabled(self) -> bool:
"""If the tracer is not enabled, start_span will create a NonRecordingSpan"""
return self._tracer_config.is_enabled
Expand Down Expand Up @@ -1345,6 +1351,18 @@ def __init__(
)
self._tracers_lock = threading.Lock()
self._tracers: dict[InstrumentationScope, Tracer] = {}
if hasattr(os, "register_at_fork"):
weak_at_fork = weakref.WeakMethod(self._at_fork_reinit)

def _after_in_child() -> None:
if at_fork := weak_at_fork():
at_fork()

os.register_at_fork(after_in_child=_after_in_child)

def _at_fork_reinit(self) -> None:
self._tracers_lock = threading.Lock()
self.update_resource(_get_process_sensitive_resource())

def _set_tracer_configurator(
self, *, tracer_configurator: _TracerConfiguratorT
Expand All @@ -1367,6 +1385,22 @@ def _set_tracer_configurator(
def resource(self) -> Resource:
return self._resource

def update_resource(self, resource: Resource) -> None:
"""Merge a :class:`opentelemetry.sdk.resources.Resource` into this
`TracerProvider`'s resource.

The resource of all existing :class:`Tracer` instances created by this
provider is also updated to the merged :class:`opentelemetry.sdk.resources.Resource`.

Args:
resource: The resource to merge into this `TracerProvider`'s
current resource.
"""
with self._tracers_lock:
self._resource = self._resource.merge(resource)
for tracer in self._tracers.values():
tracer._set_resource(self._resource) # pylint: disable=protected-access

def _apply_tracer_configurator(
self, instrumentation_scope: InstrumentationScope
):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

import json
import os

from opentelemetry._logs import LogRecord
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import (
InMemoryLogRecordExporter,
SimpleLogRecordProcessor,
)
from opentelemetry.sdk.resources import PROCESS_PID


# pylint: disable-next=too-many-locals
def main() -> None:
exporter = InMemoryLogRecordExporter()
logger_provider = LoggerProvider(shutdown_on_exit=False)
logger_provider.add_log_record_processor(
SimpleLogRecordProcessor(exporter)
)
logger = logger_provider.get_logger("cached")
parent_pid = os.getpid()
parent_resource_pid = logger_provider.resource.attributes[PROCESS_PID]
parent_logger_pid = logger.resource.attributes[PROCESS_PID]

read_fd, write_fd = os.pipe()
pid = os.fork()
if not pid:
os.close(read_fd)
child_pid = os.getpid()
new_logger = logger_provider.get_logger("new")
logger.emit(LogRecord(observed_timestamp=0, body="cached"))
new_logger.emit(LogRecord(observed_timestamp=0, body="new"))
finished_logs = exporter.get_finished_logs()
payload = {
"child_pid": child_pid,
"provider_pid": logger_provider.resource.attributes[PROCESS_PID],
"cached_logger_pid": logger.resource.attributes[PROCESS_PID],
"new_logger_pid": new_logger.resource.attributes[PROCESS_PID],
"exported_resource_pids": [
log.resource.attributes[PROCESS_PID] for log in finished_logs
],
"log_bodies": sorted(log.log_record.body for log in finished_logs),
}
os.write(write_fd, json.dumps(payload).encode())
os.close(write_fd)
# pylint: disable-next=protected-access
os._exit(0)

os.close(write_fd)
child_payload = os.read(read_fd, 4096)
os.close(read_fd)
_, status = os.waitpid(pid, 0)
exit_code = os.waitstatus_to_exitcode(status)
if exit_code != 0:
raise SystemExit(exit_code)

print(
json.dumps(
{
"parent_pid": parent_pid,
"parent_resource_pid": parent_resource_pid,
"parent_logger_pid": parent_logger_pid,
"parent_resource_pid_after_fork": logger_provider.resource.attributes[
PROCESS_PID
],
"parent_logger_pid_after_fork": logger.resource.attributes[
PROCESS_PID
],
"child": json.loads(child_payload.decode()),
}
),
flush=True,
)


if __name__ == "__main__":
main()
Loading
Loading