diff --git a/CHANGELOG.md b/CHANGELOG.md index bca5493bd23..1e7e1a444a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-exporter-otlp-proto-http`: Fix stale `requests.Session` after `fork()` in all OTLP HTTP exporters (traces, metrics, logs) by resetting the session in the child process, re-using the credential provider when configured + ([#4995](https://github.com/open-telemetry/opentelemetry-python/pull/4995)) - `opentelemetry-sdk`: fix multi-processor `force_flush` skipping remaining processors when one returns `None` ([#5179](https://github.com/open-telemetry/opentelemetry-python/pull/5179)) - Apply fixes for `UP` ruff rule diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 56906b96501..55784aff400 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -126,11 +126,12 @@ def __init__( ) ) self._compression = compression or _compression_from_env() + self._cred_envvar = ( + _OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER + ) self._session = ( session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER - ) + or _load_session_from_envvar(self._cred_envvar) or requests.Session() ) self._session.headers.update(self._headers) @@ -142,6 +143,8 @@ def __init__( {"Content-Encoding": self._compression.value} ) self._shutdown = False + if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=self._reset_session_after_fork) self._metrics = create_exporter_metrics( OtelComponentTypeValues.OTLP_HTTP_LOG_EXPORTER, @@ -154,6 +157,33 @@ def __init__( == "true", ) + def _reset_session_after_fork(self) -> None: + """ + Reset exporter session in the child process after fork. + + We close the existing session to avoid finalizer warnings if file + descriptors were already closed, then recreate the session using the + same creation logic as __init__ to preserve credential provider config. + """ + try: + self._session.close() + self._session = ( + _load_session_from_envvar(self._cred_envvar) + or requests.Session() + ) + self._session.headers.update(self._headers) + self._session.headers.update(_OTLP_HTTP_HEADERS) + self._session.headers.update(self._headers) + if self._compression is not Compression.NoCompression: + self._session.headers.update( + {"Content-Encoding": self._compression.value} + ) + except Exception: + _logger.debug( + "Exception occurred while resetting exporter session", + exc_info=True, + ) + def _export( self, serialized_data: bytes, timeout_sec: float | None = None ): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index eb1e69cfe4f..a8ba5d93796 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -184,11 +184,12 @@ def __init__( ) ) self._compression = compression or _compression_from_env() + self._cred_envvar = ( + _OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER + ) self._session = ( session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER - ) + or _load_session_from_envvar(self._cred_envvar) or requests.Session() ) self._session.headers.update(self._headers) @@ -205,6 +206,8 @@ def __init__( ) self._max_export_batch_size: int | None = max_export_batch_size self._shutdown = False + if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=self._reset_session_after_fork) self._metrics = create_exporter_metrics( OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER, @@ -217,6 +220,34 @@ def __init__( == "true", ) + def _reset_session_after_fork(self) -> None: + """ + Reset exporter session in the child process after fork. + + We close the existing session to avoid finalizer warnings if file + descriptors were already closed, then recreate the session using the + same creation logic as __init__ to preserve credential provider config. + """ + try: + self._session.close() + + self._session = ( + _load_session_from_envvar(self._cred_envvar) + or requests.Session() + ) + self._session.headers.update(self._headers) + self._session.headers.update(_OTLP_HTTP_HEADERS) + self._session.headers.update(self._headers) + if self._compression is not Compression.NoCompression: + self._session.headers.update( + {"Content-Encoding": self._compression.value} + ) + except Exception: + _logger.debug( + "Exception occurred while resetting exporter session", + exc_info=True, + ) + def _export( self, serialized_data: bytes, timeout_sec: float | None = None ): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 2be240103c0..e2bb713d0a7 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -121,11 +121,12 @@ def __init__( ) ) self._compression = compression or _compression_from_env() + self._cred_envvar = ( + _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER + ) self._session = ( session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER - ) + or _load_session_from_envvar(self._cred_envvar) or requests.Session() ) self._session.headers.update(self._headers) @@ -137,6 +138,8 @@ def __init__( {"Content-Encoding": self._compression.value} ) self._shutdown = False + if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=self._reset_session_after_fork) self._metrics = create_exporter_metrics( OtelComponentTypeValues.OTLP_HTTP_SPAN_EXPORTER, @@ -149,6 +152,34 @@ def __init__( == "true", ) + def _reset_session_after_fork(self) -> None: + """ + Reset exporter session in the child process after fork. + + We close the existing session to avoid finalizer warnings if file + descriptors were already closed, then recreate the session using the + same creation logic as __init__ to preserve credential provider config. + """ + try: + self._session.close() + self._session = ( + _load_session_from_envvar(self._cred_envvar) + or requests.Session() + ) + self._session.headers.update(self._headers) + self._session.headers.update(_OTLP_HTTP_HEADERS) + self._session.headers.update(self._headers) + if self._compression is not Compression.NoCompression: + self._session.headers.update( + {"Content-Encoding": self._compression.value} + ) + except Exception: + _logger.debug( + "Exception occurred while resetting exporter session", + exc_info=True, + ) + + def _export( self, serialized_data: bytes, timeout_sec: float | None = None ): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 84a11e8ae90..73e5971f71c 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -1,9 +1,10 @@ # Copyright The OpenTelemetry Authors # SPDX-License-Identifier: Apache-2.0 -# pylint: disable=too-many-lines +import os import threading import time +import unittest from logging import WARNING from os import environ from unittest import TestCase @@ -1457,6 +1458,80 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): assert after - before < 0.2 + @unittest.skipUnless( + hasattr(os, "register_at_fork"), "fork session reset not available" + ) + def test_register_at_fork_resets_session(self): + initial_session = MagicMock(spec=requests.Session) + initial_session.headers = {} + + new_session = MagicMock(spec=requests.Session) + new_session.headers = {} + + with ( + patch("os.register_at_fork") as mock_register_at_fork, + patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter._load_session_from_envvar", + return_value=None, + ) as mock_load, + patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.requests.Session", + return_value=new_session, + ), + ): + exporter = OTLPMetricExporter( + session=initial_session, headers={"x-test": "1"} + ) + after_in_child = mock_register_at_fork.call_args.kwargs[ + "after_in_child" + ] + after_in_child() + + initial_session.close.assert_called_once() + mock_load.assert_called() + self.assertEqual(exporter._session, new_session) + self.assertEqual(exporter._session.headers.get("x-test"), "1") + self.assertEqual( + exporter._session.headers.get("Content-Type"), + "application/x-protobuf", + ) + self.assertEqual(exporter._session.headers.get("preexisting"), "yes") + + @unittest.skipUnless( + hasattr(os, "register_at_fork"), "fork session reset not available" + ) + def test_register_at_fork_uses_credential_provider(self): + initial_session = MagicMock(spec=requests.Session) + initial_session.headers = {} + + cred_session = MagicMock(spec=requests.Session) + cred_session.headers = {} + + with ( + patch("os.register_at_fork") as mock_register_at_fork, + patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter._load_session_from_envvar", + return_value=cred_session, + ) as mock_load, + ): + exporter = OTLPMetricExporter( + session=initial_session, headers={"x-test": "1"} + ) + after_in_child = mock_register_at_fork.call_args.kwargs[ + "after_in_child" + ] + after_in_child() + + initial_session.close.assert_called_once() + mock_load.assert_called_with( + "OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER" + ) + self.assertEqual(exporter._session, cred_session) + self.assertEqual(exporter._session.headers.get("x-test"), "1") + self.assertEqual( + exporter._session.headers.get("Content-Type"), + "application/x-protobuf", + ) def assert_standard_metric_attrs(self, attributes): self.assertEqual( attributes["otel.component.type"], "otlp_http_metric_exporter" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index d7f3592e288..4250f1634e6 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -3,6 +3,7 @@ # pylint: disable=protected-access +import os import threading import time import unittest @@ -655,6 +656,79 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): ) assert after - before < 0.2 + @unittest.skipUnless( + hasattr(os, "register_at_fork"), "fork session reset not available" + ) + def test_register_at_fork_resets_session(self): + initial_session = MagicMock(spec=requests.Session) + initial_session.headers = {} + + new_session = MagicMock(spec=requests.Session) + new_session.headers = {} + + with ( + patch("os.register_at_fork") as mock_register_at_fork, + patch( + "opentelemetry.exporter.otlp.proto.http._log_exporter._load_session_from_envvar", + return_value=None, + ) as mock_load, + patch( + "opentelemetry.exporter.otlp.proto.http._log_exporter.requests.Session", + return_value=new_session, + ), + ): + exporter = OTLPLogExporter( + session=initial_session, headers={"x-test": "1"} + ) + after_in_child = mock_register_at_fork.call_args.kwargs[ + "after_in_child" + ] + after_in_child() + + initial_session.close.assert_called_once() + mock_load.assert_called() + self.assertEqual(exporter._session, new_session) + self.assertEqual(exporter._session.headers.get("x-test"), "1") + self.assertEqual( + exporter._session.headers.get("Content-Type"), + "application/x-protobuf", + ) + + @unittest.skipUnless( + hasattr(os, "register_at_fork"), "fork session reset not available" + ) + def test_register_at_fork_uses_credential_provider(self): + initial_session = MagicMock(spec=requests.Session) + initial_session.headers = {} + + cred_session = MagicMock(spec=requests.Session) + cred_session.headers = {} + + with ( + patch("os.register_at_fork") as mock_register_at_fork, + patch( + "opentelemetry.exporter.otlp.proto.http._log_exporter._load_session_from_envvar", + return_value=cred_session, + ) as mock_load, + ): + exporter = OTLPLogExporter( + session=initial_session, headers={"x-test": "1"} + ) + after_in_child = mock_register_at_fork.call_args.kwargs[ + "after_in_child" + ] + after_in_child() + + initial_session.close.assert_called_once() + mock_load.assert_called_with( + "OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER" + ) + self.assertEqual(exporter._session, cred_session) + self.assertEqual(exporter._session.headers.get("x-test"), "1") + self.assertEqual( + exporter._session.headers.get("Content-Type"), + "application/x-protobuf", + ) def assert_standard_metric_attrs(self, attributes): self.assertEqual( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 1580e5a1802..a42907713c1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -1,6 +1,7 @@ # Copyright The OpenTelemetry Authors # SPDX-License-Identifier: Apache-2.0 +import os import threading import time import unittest @@ -486,6 +487,79 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): assert after - before < 0.2 + @unittest.skipUnless( + hasattr(os, "register_at_fork"), "fork session reset not available" + ) + def test_register_at_fork_resets_session(self): + initial_session = MagicMock(spec=requests.Session) + initial_session.headers = {} + + new_session = MagicMock(spec=requests.Session) + new_session.headers = {} + + with ( + patch("os.register_at_fork") as mock_register_at_fork, + patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter._load_session_from_envvar", + return_value=None, + ) as mock_load, + patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.requests.Session", + return_value=new_session, + ), + ): + exporter = OTLPSpanExporter( + session=initial_session, headers={"x-test": "1"} + ) + after_in_child = mock_register_at_fork.call_args.kwargs[ + "after_in_child" + ] + after_in_child() + + initial_session.close.assert_called_once() + mock_load.assert_called() + self.assertEqual(exporter._session, new_session) + self.assertEqual(exporter._session.headers.get("x-test"), "1") + self.assertEqual( + exporter._session.headers.get("Content-Type"), + "application/x-protobuf", + ) + + @unittest.skipUnless( + hasattr(os, "register_at_fork"), "fork session reset not available" + ) + def test_register_at_fork_uses_credential_provider(self): + initial_session = MagicMock(spec=requests.Session) + initial_session.headers = {} + + cred_session = MagicMock(spec=requests.Session) + cred_session.headers = {} + + with ( + patch("os.register_at_fork") as mock_register_at_fork, + patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter._load_session_from_envvar", + return_value=cred_session, + ) as mock_load, + ): + exporter = OTLPSpanExporter( + session=initial_session, headers={"x-test": "1"} + ) + after_in_child = mock_register_at_fork.call_args.kwargs[ + "after_in_child" + ] + after_in_child() + + initial_session.close.assert_called_once() + mock_load.assert_called_with( + "OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER" + ) + self.assertEqual(exporter._session, cred_session) + self.assertEqual(exporter._session.headers.get("x-test"), "1") + self.assertEqual( + exporter._session.headers.get("Content-Type"), + "application/x-protobuf", + ) def assert_standard_metric_attrs(self, attributes): self.assertEqual( attributes["otel.component.type"], "otlp_http_span_exporter"