Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-sdk`: fix multi-processor `force_flush` skipping remaining processors when one returns `None`
([#5179](https://github.com/open-telemetry/opentelemetry-python/pull/5179))
- Apply fixes for `UP` ruff rule
([#5133](https://github.com/open-telemetry/opentelemetry-python/pull/5133))
- Switch to SPDX license headers and add CI enforcement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,16 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
False otherwise.
"""
deadline_ns = time_ns() + timeout_millis * 1000000
all_flushed = True
for lp in self._log_record_processors:
current_ts = time_ns()
if current_ts >= deadline_ns:
return False

if not lp.force_flush((deadline_ns - current_ts) // 1000000):
return False
if lp.force_flush((deadline_ns - current_ts) // 1000000) is False:
all_flushed = False

return True
return all_flushed


class ConcurrentMultiLogRecordProcessor(LogRecordProcessor):
Expand Down Expand Up @@ -495,7 +496,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
return False

for future in done_futures:
if not future.result():
if future.result() is False:
return False

return True
Expand Down
15 changes: 10 additions & 5 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def on_end(self, span: "ReadableSpan") -> None:
def shutdown(self) -> None:
"""Called when a :class:`opentelemetry.sdk.trace.TracerProvider` is shutdown."""

def force_flush(self, timeout_millis: int = 30000) -> bool: # type: ignore[reportReturnType]
def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=no-self-use
"""Export all ended spans to the configured Exporter that have not yet
been exported.

Expand All @@ -142,6 +142,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # type: ignore[repo
Returns:
False if the timeout is exceeded, True otherwise.
"""
return True


# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved
Expand Down Expand Up @@ -204,15 +205,19 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
given timeout, False otherwise.
"""
deadline_ns = time_ns() + timeout_millis * 1000000
all_flushed = True
for sp in self._span_processors:
current_time_ns = time_ns()
if current_time_ns >= deadline_ns:
return False

if not sp.force_flush((deadline_ns - current_time_ns) // 1000000):
return False
if (
sp.force_flush((deadline_ns - current_time_ns) // 1000000)
is False
):
all_flushed = False

return True
return all_flushed


class ConcurrentMultiSpanProcessor(SpanProcessor):
Expand Down Expand Up @@ -315,7 +320,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
return False

for future in done_futures:
if not future.result():
if future.result() is False:
return False

return True
Expand Down
66 changes: 66 additions & 0 deletions opentelemetry-sdk/tests/logs/test_multi_log_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,36 @@ def delay(_):
self.assertEqual(mock_processor1.force_flush.call_count, 1)
self.assertEqual(mock_processor2.force_flush.call_count, 0)

def test_force_flush_processor_returns_false(self):
multi_log_record_processor = SynchronousMultiLogRecordProcessor()

mock_processor1 = Mock(spec=LogRecordProcessor)
mock_processor1.force_flush = Mock(return_value=False)
multi_log_record_processor.add_log_record_processor(mock_processor1)
mock_processor2 = Mock(spec=LogRecordProcessor)
mock_processor2.force_flush = Mock(return_value=True)
multi_log_record_processor.add_log_record_processor(mock_processor2)

ret_value = multi_log_record_processor.force_flush(50)
self.assertFalse(ret_value)
self.assertEqual(mock_processor1.force_flush.call_count, 1)
self.assertEqual(mock_processor2.force_flush.call_count, 1)

def test_force_flush_processor_returns_none(self):
multi_log_record_processor = SynchronousMultiLogRecordProcessor()

mock_processor1 = Mock(spec=LogRecordProcessor)
mock_processor1.force_flush = Mock(return_value=None)
multi_log_record_processor.add_log_record_processor(mock_processor1)
mock_processor2 = Mock(spec=LogRecordProcessor)
mock_processor2.force_flush = Mock(return_value=True)
multi_log_record_processor.add_log_record_processor(mock_processor2)

ret_value = multi_log_record_processor.force_flush(50)
self.assertTrue(ret_value)
self.assertEqual(mock_processor1.force_flush.call_count, 1)
self.assertEqual(mock_processor2.force_flush.call_count, 1)


class TestConcurrentMultiLogRecordProcessor(
MultiLogRecordProcessorTestBase, unittest.TestCase
Expand Down Expand Up @@ -189,3 +219,39 @@ def delay(_):
for mock in mocks:
self.assertEqual(1, mock.force_flush.call_count)
multi_log_record_processor.shutdown()

def test_force_flush_processor_returns_false(self):
multi_log_record_processor = ConcurrentMultiLogRecordProcessor()

false_mock = Mock(spec=LogRecordProcessor)
false_mock.force_flush = Mock(return_value=False)
mocks = [Mock(spec=LogRecordProcessor) for _ in range(4)]
mocks.insert(0, false_mock)

for mock_processor in mocks:
multi_log_record_processor.add_log_record_processor(mock_processor)

ret_value = multi_log_record_processor.force_flush()

self.assertFalse(ret_value)
for mock in mocks:
self.assertEqual(1, mock.force_flush.call_count)
multi_log_record_processor.shutdown()

def test_force_flush_processor_returns_none(self):
multi_log_record_processor = ConcurrentMultiLogRecordProcessor()

none_mock = Mock(spec=LogRecordProcessor)
none_mock.force_flush = Mock(return_value=None)
mocks = [Mock(spec=LogRecordProcessor) for _ in range(4)]
mocks.insert(0, none_mock)

for mock_processor in mocks:
multi_log_record_processor.add_log_record_processor(mock_processor)

ret_value = multi_log_record_processor.force_flush()

self.assertTrue(ret_value)
for mock in mocks:
self.assertEqual(1, mock.force_flush.call_count)
multi_log_record_processor.shutdown()
48 changes: 47 additions & 1 deletion opentelemetry-sdk/tests/trace/test_span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,35 @@ def test_force_flush_late_by_span_processor(self):
flushed = multi_processor.force_flush(50)
self.assertFalse(flushed)
self.assertEqual(1, mock_processor1.force_flush.call_count)
self.assertEqual(0, mock_processor2.force_flush.call_count)
self.assertEqual(1, mock_processor2.force_flush.call_count)

def test_force_flush_processor_returns_none(self):
multi_processor = trace.SynchronousMultiSpanProcessor()

mock_processor1 = mock.Mock(spec=trace.SpanProcessor)
mock_processor1.force_flush = mock.Mock(return_value=None)
multi_processor.add_span_processor(mock_processor1)
mock_processor2 = mock.Mock(spec=trace.SpanProcessor)
mock_processor2.force_flush = mock.Mock(return_value=True)
multi_processor.add_span_processor(mock_processor2)

flushed = multi_processor.force_flush(50)
self.assertTrue(flushed)
self.assertEqual(1, mock_processor1.force_flush.call_count)
self.assertEqual(1, mock_processor2.force_flush.call_count)

def test_force_flush_default_processor(self):
multi_processor = trace.SynchronousMultiSpanProcessor()

default_processor = trace.SpanProcessor()
multi_processor.add_span_processor(default_processor)
mock_processor = mock.Mock(spec=trace.SpanProcessor)
mock_processor.force_flush = mock.Mock(return_value=True)
multi_processor.add_span_processor(mock_processor)

flushed = multi_processor.force_flush(50)
self.assertTrue(flushed)
self.assertEqual(1, mock_processor.force_flush.call_count)


class TestConcurrentMultiSpanProcessor(
Expand Down Expand Up @@ -476,6 +504,24 @@ def test_force_flush_late_by_span_processor(self):
self.assertEqual(1, mock_processor.force_flush.call_count)
multi_processor.shutdown()

def test_force_flush_processor_returns_none(self):
multi_processor = trace.ConcurrentMultiSpanProcessor(5)

none_mock = mock.Mock(spec=trace.SpanProcessor)
none_mock.force_flush = mock.Mock(return_value=None)
mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 4)]
mocks.insert(0, none_mock)

for mock_processor in mocks:
multi_processor.add_span_processor(mock_processor)

flushed = multi_processor.force_flush()

self.assertTrue(flushed)
for mock_processor in mocks:
self.assertEqual(1, mock_processor.force_flush.call_count)
multi_processor.shutdown()

def test_processor_gc(self):
multi_processor = trace.ConcurrentMultiSpanProcessor(5)
weak_ref = weakref.ref(multi_processor)
Expand Down
Loading