From 2cb66d365d3aa435c425c5fcd34939ab3b7d6285 Mon Sep 17 00:00:00 2001 From: Gregory Freilikhman Date: Wed, 17 Jun 2026 00:28:28 +0300 Subject: [PATCH] test: bump CI broker to 4.2.2 and fix delayed-delivery tests Upgrade the integration-test Pulsar image from 4.0.0 to 4.2.2. Increase deliver_at/deliver_after delay to 2500ms because broker 4.0.1+ buckets deliverAt timestamps (~512ms with the default 1s tick), so the old 1100ms margin no longer guarantees receive(1000) times out. Expect TopicNotFound when creating a producer on a non-existent tenant/ns on broker 4.2.x. Co-authored-by: Cursor --- build-support/pulsar-test-service-start.sh | 2 +- tests/asyncio_test.py | 2 +- tests/pulsar_test.py | 12 ++++++++---- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/build-support/pulsar-test-service-start.sh b/build-support/pulsar-test-service-start.sh index f8e26f99..4dd31631 100755 --- a/build-support/pulsar-test-service-start.sh +++ b/build-support/pulsar-test-service-start.sh @@ -25,7 +25,7 @@ cd $SRC_DIR ./build-support/pulsar-test-service-stop.sh -CONTAINER_ID=$(docker run -i --user $(id -u) -p 8080:8080 -p 6650:6650 -p 8443:8443 -p 6651:6651 --rm --detach apachepulsar/pulsar:4.0.0 sleep 3600) +CONTAINER_ID=$(docker run -i --user $(id -u) -p 8080:8080 -p 6650:6650 -p 8443:8443 -p 6651:6651 --rm --detach apachepulsar/pulsar:4.2.2 sleep 3600) echo $CONTAINER_ID > .tests-container-id.txt docker cp tests/test-conf $CONTAINER_ID:/pulsar/test-conf diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 3cc1078c..5394fba6 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -201,7 +201,7 @@ async def test_create_producer_failure(self): await self._client.create_producer('tenant/ns/asyncio-test-send-failure') self.fail() except PulsarException as e: - self.assertEqual(e.error(), pulsar.Result.Timeout) + self.assertEqual(e.error(), pulsar.Result.TopicNotFound) async def test_send_failure(self): producer = await self._client.create_producer('asyncio-test-send-failure') diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index be817c5d..93a6415a 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -330,8 +330,11 @@ def test_deliver_at(self): client = Client(self.serviceUrl) consumer = client.subscribe("my-python-topic-deliver-at", "my-sub", consumer_type=ConsumerType.Shared) producer = client.create_producer("my-python-topic-deliver-at") - # Delay message in 1.1s - producer.send(b"hello", deliver_at=int(round(time.time() * 1000)) + 1100) + # Delay must exceed receive(1000) timeout plus broker early-delivery slack. + # Pulsar 4.2.x buckets deliverAt timestamps (trimLowerBit, ~512ms with 1s tick), + # so short delays (e.g. 1100ms) can be delivered immediately on broker >= 4.0.1. + delay_ms = 2500 + producer.send(b"hello", deliver_at=int(round(time.time() * 1000)) + delay_ms) # Message should not be available in the next second with self.assertRaises(pulsar.Timeout): @@ -349,8 +352,9 @@ def test_deliver_after(self): client = Client(self.serviceUrl) consumer = client.subscribe("my-python-topic-deliver-after", "my-sub", consumer_type=ConsumerType.Shared) producer = client.create_producer("my-python-topic-deliver-after") - # Delay message in 1.1s - producer.send(b"hello", deliver_after=timedelta(milliseconds=1100)) + # Same margin as test_deliver_at; see comment there for broker 4.2.x bucketing. + delay_ms = 2500 + producer.send(b"hello", deliver_after=timedelta(milliseconds=delay_ms)) # Message should not be available in the next second with self.assertRaises(pulsar.Timeout):