Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-exporter-otlp-proto-http`: Fix stale `requests.Session` after `fork()` in all OTLP HTTP exporters (traces, metrics, logs) by resetting the session in the child process, re-using the credential provider when configured
([#4995](https://github.com/open-telemetry/opentelemetry-python/pull/4995))
- `opentelemetry-sdk`: fix multi-processor `force_flush` skipping remaining processors when one returns `None`
([#5179](https://github.com/open-telemetry/opentelemetry-python/pull/5179))
- Apply fixes for `UP` ruff rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,12 @@ def __init__(
)
)
self._compression = compression or _compression_from_env()
self._cred_envvar = (
_OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER
)
self._session = (
session
or _load_session_from_envvar(
_OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER
)
or _load_session_from_envvar(self._cred_envvar)
or requests.Session()
)
self._session.headers.update(self._headers)
Expand All @@ -142,6 +143,8 @@ def __init__(
{"Content-Encoding": self._compression.value}
)
self._shutdown = False
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._reset_session_after_fork)

self._metrics = create_exporter_metrics(
OtelComponentTypeValues.OTLP_HTTP_LOG_EXPORTER,
Expand All @@ -154,6 +157,33 @@ def __init__(
== "true",
)

def _reset_session_after_fork(self) -> None:
"""
Reset exporter session in the child process after fork.

We close the existing session to avoid finalizer warnings if file
descriptors were already closed, then recreate the session using the
same creation logic as __init__ to preserve credential provider config.
"""
try:
self._session.close()
self._session = (
_load_session_from_envvar(self._cred_envvar)
or requests.Session()
)
self._session.headers.update(self._headers)
self._session.headers.update(_OTLP_HTTP_HEADERS)
self._session.headers.update(self._headers)
if self._compression is not Compression.NoCompression:
self._session.headers.update(
{"Content-Encoding": self._compression.value}
)
except Exception:
_logger.debug(
"Exception occurred while resetting exporter session",
exc_info=True,
)

def _export(
self, serialized_data: bytes, timeout_sec: float | None = None
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,12 @@ def __init__(
)
)
self._compression = compression or _compression_from_env()
self._cred_envvar = (
_OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER
)
self._session = (
session
or _load_session_from_envvar(
_OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER
)
or _load_session_from_envvar(self._cred_envvar)
or requests.Session()
)
self._session.headers.update(self._headers)
Expand All @@ -205,6 +206,8 @@ def __init__(
)
self._max_export_batch_size: int | None = max_export_batch_size
self._shutdown = False
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._reset_session_after_fork)

self._metrics = create_exporter_metrics(
OtelComponentTypeValues.OTLP_HTTP_METRIC_EXPORTER,
Expand All @@ -217,6 +220,34 @@ def __init__(
== "true",
)

def _reset_session_after_fork(self) -> None:
"""
Reset exporter session in the child process after fork.

We close the existing session to avoid finalizer warnings if file
descriptors were already closed, then recreate the session using the
same creation logic as __init__ to preserve credential provider config.
"""
try:
self._session.close()

self._session = (
_load_session_from_envvar(self._cred_envvar)
or requests.Session()
)
self._session.headers.update(self._headers)
self._session.headers.update(_OTLP_HTTP_HEADERS)
self._session.headers.update(self._headers)
if self._compression is not Compression.NoCompression:
self._session.headers.update(
{"Content-Encoding": self._compression.value}
)
except Exception:
_logger.debug(
"Exception occurred while resetting exporter session",
exc_info=True,
)

def _export(
self, serialized_data: bytes, timeout_sec: float | None = None
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,12 @@ def __init__(
)
)
self._compression = compression or _compression_from_env()
self._cred_envvar = (
_OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER
)
self._session = (
session
or _load_session_from_envvar(
_OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER
)
or _load_session_from_envvar(self._cred_envvar)
or requests.Session()
)
self._session.headers.update(self._headers)
Expand All @@ -137,6 +138,8 @@ def __init__(
{"Content-Encoding": self._compression.value}
)
self._shutdown = False
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._reset_session_after_fork)

self._metrics = create_exporter_metrics(
OtelComponentTypeValues.OTLP_HTTP_SPAN_EXPORTER,
Expand All @@ -149,6 +152,34 @@ def __init__(
== "true",
)

def _reset_session_after_fork(self) -> None:
"""
Reset exporter session in the child process after fork.

We close the existing session to avoid finalizer warnings if file
descriptors were already closed, then recreate the session using the
same creation logic as __init__ to preserve credential provider config.
"""
try:
self._session.close()
self._session = (
_load_session_from_envvar(self._cred_envvar)
or requests.Session()
)
self._session.headers.update(self._headers)
self._session.headers.update(_OTLP_HTTP_HEADERS)
self._session.headers.update(self._headers)
if self._compression is not Compression.NoCompression:
self._session.headers.update(
{"Content-Encoding": self._compression.value}
)
except Exception:
_logger.debug(
"Exception occurred while resetting exporter session",
exc_info=True,
)


def _export(
self, serialized_data: bytes, timeout_sec: float | None = None
):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

# pylint: disable=too-many-lines
import os
import threading
import time
import unittest
from logging import WARNING
from os import environ
from unittest import TestCase
Expand Down Expand Up @@ -1457,6 +1458,80 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post):

assert after - before < 0.2

@unittest.skipUnless(
hasattr(os, "register_at_fork"), "fork session reset not available"
)
def test_register_at_fork_resets_session(self):
initial_session = MagicMock(spec=requests.Session)
initial_session.headers = {}

new_session = MagicMock(spec=requests.Session)
new_session.headers = {}

with (
patch("os.register_at_fork") as mock_register_at_fork,
patch(
"opentelemetry.exporter.otlp.proto.http.metric_exporter._load_session_from_envvar",
return_value=None,
) as mock_load,
patch(
"opentelemetry.exporter.otlp.proto.http.metric_exporter.requests.Session",
return_value=new_session,
),
):
exporter = OTLPMetricExporter(
session=initial_session, headers={"x-test": "1"}
)
after_in_child = mock_register_at_fork.call_args.kwargs[
"after_in_child"
]
after_in_child()

initial_session.close.assert_called_once()
mock_load.assert_called()
self.assertEqual(exporter._session, new_session)
self.assertEqual(exporter._session.headers.get("x-test"), "1")
self.assertEqual(
exporter._session.headers.get("Content-Type"),
"application/x-protobuf",
)
self.assertEqual(exporter._session.headers.get("preexisting"), "yes")

@unittest.skipUnless(
hasattr(os, "register_at_fork"), "fork session reset not available"
)
def test_register_at_fork_uses_credential_provider(self):
initial_session = MagicMock(spec=requests.Session)
initial_session.headers = {}

cred_session = MagicMock(spec=requests.Session)
cred_session.headers = {}

with (
patch("os.register_at_fork") as mock_register_at_fork,
patch(
"opentelemetry.exporter.otlp.proto.http.metric_exporter._load_session_from_envvar",
return_value=cred_session,
) as mock_load,
):
exporter = OTLPMetricExporter(
session=initial_session, headers={"x-test": "1"}
)
after_in_child = mock_register_at_fork.call_args.kwargs[
"after_in_child"
]
after_in_child()

initial_session.close.assert_called_once()
mock_load.assert_called_with(
"OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER"
)
self.assertEqual(exporter._session, cred_session)
self.assertEqual(exporter._session.headers.get("x-test"), "1")
self.assertEqual(
exporter._session.headers.get("Content-Type"),
"application/x-protobuf",
)
def assert_standard_metric_attrs(self, attributes):
self.assertEqual(
attributes["otel.component.type"], "otlp_http_metric_exporter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# pylint: disable=protected-access

import os
import threading
import time
import unittest
Expand Down Expand Up @@ -655,6 +656,79 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post):
)

assert after - before < 0.2
@unittest.skipUnless(
hasattr(os, "register_at_fork"), "fork session reset not available"
)
def test_register_at_fork_resets_session(self):
initial_session = MagicMock(spec=requests.Session)
initial_session.headers = {}

new_session = MagicMock(spec=requests.Session)
new_session.headers = {}

with (
patch("os.register_at_fork") as mock_register_at_fork,
patch(
"opentelemetry.exporter.otlp.proto.http._log_exporter._load_session_from_envvar",
return_value=None,
) as mock_load,
patch(
"opentelemetry.exporter.otlp.proto.http._log_exporter.requests.Session",
return_value=new_session,
),
):
exporter = OTLPLogExporter(
session=initial_session, headers={"x-test": "1"}
)
after_in_child = mock_register_at_fork.call_args.kwargs[
"after_in_child"
]
after_in_child()

initial_session.close.assert_called_once()
mock_load.assert_called()
self.assertEqual(exporter._session, new_session)
self.assertEqual(exporter._session.headers.get("x-test"), "1")
self.assertEqual(
exporter._session.headers.get("Content-Type"),
"application/x-protobuf",
)

@unittest.skipUnless(
hasattr(os, "register_at_fork"), "fork session reset not available"
)
def test_register_at_fork_uses_credential_provider(self):
initial_session = MagicMock(spec=requests.Session)
initial_session.headers = {}

cred_session = MagicMock(spec=requests.Session)
cred_session.headers = {}

with (
patch("os.register_at_fork") as mock_register_at_fork,
patch(
"opentelemetry.exporter.otlp.proto.http._log_exporter._load_session_from_envvar",
return_value=cred_session,
) as mock_load,
):
exporter = OTLPLogExporter(
session=initial_session, headers={"x-test": "1"}
)
after_in_child = mock_register_at_fork.call_args.kwargs[
"after_in_child"
]
after_in_child()

initial_session.close.assert_called_once()
mock_load.assert_called_with(
"OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER"
)
self.assertEqual(exporter._session, cred_session)
self.assertEqual(exporter._session.headers.get("x-test"), "1")
self.assertEqual(
exporter._session.headers.get("Content-Type"),
"application/x-protobuf",
)

def assert_standard_metric_attrs(self, attributes):
self.assertEqual(
Expand Down
Loading