Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import concurrent.futures
import json
import logging
import os
import threading
import traceback
import typing
import weakref
from os import environ
from time import time_ns
from types import MappingProxyType, TracebackType
Expand Down Expand Up @@ -238,6 +240,16 @@ def __init__(self, num_threads: int = 2):
# iterating through it on "on_start" and "on_end".
self._span_processors = () # type: Tuple[SpanProcessor, ...]
self._lock = threading.Lock()
self._init_executor(num_threads)
if hasattr(os, "register_at_fork"):
# Only the main thread is kept in forked processed, the executor
# needs to be re-instantiated to get a fresh pool of threads:
weak_reinit = weakref.WeakMethod(self._init_executor)
os.register_at_fork(
after_in_child=lambda: weak_reinit()(num_threads)
)

def _init_executor(self, num_threads: int) -> None:
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=num_threads
)
Expand Down
69 changes: 69 additions & 0 deletions opentelemetry-sdk/tests/trace/test_span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
# limitations under the License.

import abc
import gc
import multiprocessing
import os
import time
import typing
import unittest
import weakref
from platform import python_implementation, system
from threading import Event
from typing import Optional
Expand All @@ -26,6 +30,10 @@
from opentelemetry import trace as trace_api
from opentelemetry.context import Context
from opentelemetry.sdk import trace
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)


def span_event_start_fmt(span_processor_name, span_name):
Expand Down Expand Up @@ -486,3 +494,64 @@ def test_force_flush_late_by_span_processor(self):
for mock_processor in mocks:
self.assertEqual(1, mock_processor.force_flush.call_count)
multi_processor.shutdown()

def test_processor_gc(self):
multi_processor = trace.ConcurrentMultiSpanProcessor(5)
weak_ref = weakref.ref(multi_processor)
multi_processor.shutdown()

# When the processor is garbage collected
del multi_processor
gc.collect()

# Then the reference to the processor should no longer exist
self.assertIsNone(
weak_ref(),
"The ConcurrentMultiSpanProcessor object created by this test wasn't garbage collected",
)

@unittest.skipUnless(hasattr(os, "fork"), "needs *nix")
def test_batch_span_processor_fork(self):
multiprocessing.set_start_method("fork")
tracer_provider = trace.TracerProvider()
tracer = tracer_provider.get_tracer(__name__)
exporter = InMemorySpanExporter()
multi_processor = trace.ConcurrentMultiSpanProcessor(2)
multi_processor.add_span_processor(SimpleSpanProcessor(exporter))
tracer_provider.add_span_processor(multi_processor)

# Use the ConcurrentMultiSpanProcessor in the main process.
# This is necessary in this test to start using the underlying ThreadPoolExecutor and avoid false positive:
with tracer.start_as_current_span("main process before fork span"):
pass
assert (
exporter.get_finished_spans()[-1].name
== "main process before fork span"
)

# The forked ConcurrentMultiSpanProcessor is usable in the child process:
def child(conn):
with tracer.start_as_current_span("child process span"):
pass
conn.send(exporter.get_finished_spans()[-1].name)
conn.close()

parent_conn, child_conn = multiprocessing.Pipe()
process = multiprocessing.Process(target=child, args=(child_conn,))
process.start()
has_response = parent_conn.poll(timeout=5)
if not has_response:
process.kill()
self.fail(
"The child process did not send any message after 5 seconds, it's very probably locked"
)
process.join(timeout=5)
assert parent_conn.recv() == "child process span"

# The ConcurrentMultiSpanProcessor is still usable in the main process after the child process termination:
with tracer.start_as_current_span("main process after fork span"):
pass
assert (
exporter.get_finished_spans()[-1].name
== "main process after fork span"
)