-
Notifications
You must be signed in to change notification settings - Fork 910
opentelemetry-sdk: Add ability to refresh process sensitive Resource attributes #5280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
2e6b498
59a92dc
5e081a3
308a459
c693d1c
3fd61b2
d44e094
108b919
536027e
8be65d6
9d3b2f2
acc083b
5039afe
fce36ef
29d48a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| `opentelemetry-sdk`: Add ability to refresh process sensitive Resource attributes |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -287,6 +287,21 @@ def detect(self) -> "Resource": | |
| """Don't call `Resource.create` here to avoid an infinite loop, instead instantiate `Resource` directly""" | ||
| raise NotImplementedError() | ||
|
|
||
| # pylint: disable-next=no-self-use | ||
| def is_process_sensitive(self) -> bool: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did we have an agreement on the naming here? I remember there was a discussion around it from the SIG
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if there was anything explicit. If people have better alternatives for names here, I am open to suggestions. |
||
| """Return whether this detector depends on the current process identity. | ||
|
|
||
| Process sensitive detectors may return resource attributes that become | ||
| stale after a process identity change, such as :func:`os.fork`. | ||
| Detectors returning ``True`` should be re-run after such changes so the | ||
| resulting :class:`Resource` describes the current process. | ||
|
|
||
| Returns: | ||
| ``True`` if this detector should be re-run after process identity | ||
| changes otherwise ``False``. | ||
| """ | ||
| return False | ||
|
|
||
|
|
||
| class OTELResourceDetector(ResourceDetector): | ||
| # pylint: disable=no-self-use | ||
|
|
@@ -316,6 +331,9 @@ def detect(self) -> "Resource": | |
|
|
||
| class ProcessResourceDetector(ResourceDetector): | ||
| # pylint: disable=no-self-use | ||
| def is_process_sensitive(self) -> bool: | ||
| return True | ||
|
|
||
| def detect(self) -> "Resource": | ||
| _runtime_version = ".".join( | ||
| map( | ||
|
|
@@ -529,6 +547,17 @@ def _build_resource_detectors() -> list["ResourceDetector"]: | |
| return detectors | ||
|
|
||
|
|
||
| def _get_process_sensitive_resource() -> Resource: | ||
| return get_aggregated_resources( | ||
| [ | ||
| detector | ||
| for detector in _build_resource_detectors() | ||
| if detector.is_process_sensitive() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe a dumb idea but I figured I'd share. What if we instantiate the resource values with a subclass of I think for service.instance.id, you could return a UUID consistently unless the process start time and pid are detected as having changed (this might not work across al unixes, haven't checked) One downside, if people are reading such a resource attribute frequently it could be slow (makes a syscall).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's a good idea (since it's avoids having to register a post-fork handler), but as you mention there are certainly edge cases that need to be handled and the concern of additional overhead. This hypothetical lazy string object would also need to be thread safe since it would cross thread boundaries quite frequently via batch processors. My original reasoning with leaning towards this approach is that the logic added in this PR is only executed if the process is forked, and given that >99% of Python applications never fork, I'd prefer to accept a slightly more operationally complex solution that has no impact on the vast majority of users that never fork over a solution that has a potential impacts on all users. The other "solution" is to just accept this limitation and document that forking an already initialized OTel Python process is not supported. Python has already started raising a deprecation warning if you attempt to fork with multiple threads, and fork is no longer the default start method on any platform.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Is that viable though? I think auto instrumenting gunicorn is still a hassle, you have to set up a post fork hook. That's not possible in some cases like the k8s Operator and I imagine the new OTel Injector. I also see regular issues/PRs being filed asking for components to made fork safe. I do agree with you though, it's kind of a losing battle to get complete coverage, and having tons of post fork hooks is also problematic. With all that in mind, it would be good to keep the implementation hidden/private for now so we can reevaluate later. I would even be OK with something less generalized than this PR, like special casing known process sensitive attributes in Resource class so we don't need to have hooks in each SDK to recreate. |
||
| ], | ||
| Resource.get_empty(), | ||
| ) | ||
|
|
||
|
|
||
| def get_aggregated_resources( | ||
| detectors: Sequence["ResourceDetector"], | ||
| initial_resource: Resource | None = None, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| # Copyright The OpenTelemetry Authors | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import json | ||
| import os | ||
|
|
||
| from opentelemetry._logs import LogRecord | ||
| from opentelemetry.sdk._logs import LoggerProvider | ||
| from opentelemetry.sdk._logs.export import ( | ||
| InMemoryLogRecordExporter, | ||
| SimpleLogRecordProcessor, | ||
| ) | ||
| from opentelemetry.sdk.resources import PROCESS_PID | ||
|
|
||
|
|
||
| # pylint: disable-next=too-many-locals | ||
| def main() -> None: | ||
| exporter = InMemoryLogRecordExporter() | ||
| logger_provider = LoggerProvider(shutdown_on_exit=False) | ||
| logger_provider.add_log_record_processor( | ||
| SimpleLogRecordProcessor(exporter) | ||
| ) | ||
| logger = logger_provider.get_logger("cached") | ||
| parent_pid = os.getpid() | ||
| parent_resource_pid = logger_provider.resource.attributes[PROCESS_PID] | ||
| parent_logger_pid = logger.resource.attributes[PROCESS_PID] | ||
|
|
||
| read_fd, write_fd = os.pipe() | ||
| pid = os.fork() | ||
| if not pid: | ||
| os.close(read_fd) | ||
| child_pid = os.getpid() | ||
| new_logger = logger_provider.get_logger("new") | ||
| logger.emit(LogRecord(observed_timestamp=0, body="cached")) | ||
| new_logger.emit(LogRecord(observed_timestamp=0, body="new")) | ||
| finished_logs = exporter.get_finished_logs() | ||
| payload = { | ||
| "child_pid": child_pid, | ||
| "provider_pid": logger_provider.resource.attributes[PROCESS_PID], | ||
| "cached_logger_pid": logger.resource.attributes[PROCESS_PID], | ||
| "new_logger_pid": new_logger.resource.attributes[PROCESS_PID], | ||
| "exported_resource_pids": [ | ||
| log.resource.attributes[PROCESS_PID] for log in finished_logs | ||
| ], | ||
| "log_bodies": sorted(log.log_record.body for log in finished_logs), | ||
| } | ||
| os.write(write_fd, json.dumps(payload).encode()) | ||
| os.close(write_fd) | ||
| # pylint: disable-next=protected-access | ||
| os._exit(0) | ||
|
|
||
| os.close(write_fd) | ||
| child_payload = os.read(read_fd, 4096) | ||
| os.close(read_fd) | ||
| _, status = os.waitpid(pid, 0) | ||
| exit_code = os.waitstatus_to_exitcode(status) | ||
| if exit_code != 0: | ||
| raise SystemExit(exit_code) | ||
|
|
||
| print( | ||
| json.dumps( | ||
| { | ||
| "parent_pid": parent_pid, | ||
| "parent_resource_pid": parent_resource_pid, | ||
| "parent_logger_pid": parent_logger_pid, | ||
| "parent_resource_pid_after_fork": logger_provider.resource.attributes[ | ||
| PROCESS_PID | ||
| ], | ||
| "parent_logger_pid_after_fork": logger.resource.attributes[ | ||
| PROCESS_PID | ||
| ], | ||
| "child": json.loads(child_payload.decode()), | ||
| } | ||
| ), | ||
| flush=True, | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() |
Uh oh!
There was an error while loading. Please reload this page.