Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr&
case Closing:
case Closed:
case Producer_Fenced:
case Terminated:
case Failed:
LOG_DEBUG(getName() << "Ignoring connection closed event since the handler is not used anymore");
break;
Expand Down
2 changes: 2 additions & 0 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
Failed, // Handler is failed, in Java client: HandlerState.State.Failed
Producer_Fenced, // The producer has been fenced by the broker
// in Java client: HandlerState.State.ProducerFenced
Terminated, // The topic has been terminatedproducer has been fenced by the broker
// in Java client: HandlerState.State.Terminated
};

std::atomic<State> state_;
Expand Down
7 changes: 5 additions & 2 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result
}
}

if (result == ResultProducerFenced) {
state_ = Producer_Fenced;
if (result == ResultProducerFenced || result == ResultTopicTerminated) {
state_ = result == ResultProducerFenced ? Producer_Fenced : Terminated;
failPendingMessages(result, false);
auto client = client_.lock();
if (client) {
Expand Down Expand Up @@ -450,6 +450,9 @@ bool ProducerImpl::isValidProducerState(const SendCallback& callback) const {
case HandlerBase::Producer_Fenced:
callback(ResultProducerFenced, {});
return false;
case HandlerBase::Terminated:
callback(ResultTopicTerminated, {});
return false;
case HandlerBase::NotStarted:
case HandlerBase::Failed:
default:
Expand Down
1 change: 1 addition & 0 deletions lib/ResultUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ inline bool isResultRetryable(Result result) {
ResultInvalidConfiguration,
ResultIncompatibleSchema,
ResultTopicNotFound,
ResultTopicTerminated,
ResultOperationNotSupported,
ResultNotAllowedError,
ResultChecksumError,
Expand Down
44 changes: 44 additions & 0 deletions tests/ProducerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,50 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
client.close();
}

TEST(ProducerTest, testCreateProducerAfterTopicTermination) {
const auto topicName = "testCreateProducerAfterTopicTermination-" + std::to_string(time(nullptr));
const auto topic = "persistent://public/default/" + topicName;

Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("content").build()));
ASSERT_EQ(ResultOk, producer.close());

const auto httpCode =
makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", "");
ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;

Producer terminatedProducer;
ASSERT_EQ(ResultTopicTerminated, client.createProducer(topic, terminatedProducer));

client.close();
}

TEST(ProducerTest, testSendAfterTopicTerminationReconnect) {
const auto topicName = "testSendAfterTopicTerminationReconnect-" + std::to_string(time(nullptr));
const auto topic = "persistent://public/default/" + topicName;

Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("before-terminate").build()));

const auto httpCode =
makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", "");
ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;

PulsarFriend::getProducerImpl(producer).disconnectProducer();
ASSERT_TRUE(
waitUntil(std::chrono::seconds(3), [&producer] { return PulsarFriend::isTerminated(producer); }));

ASSERT_EQ(ResultTopicTerminated, producer.send(MessageBuilder().setContent("after-terminate").build()));

client.close();
}

class ProducerTest : public ::testing::TestWithParam<bool> {};

TEST_P(ProducerTest, testMaxMessageSize) {
Expand Down
5 changes: 5 additions & 0 deletions tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ class PulsarFriend {
return waitUntil(std::chrono::seconds(3),
[producerImpl] { return !producerImpl->getCnx().expired(); });
}

static bool isTerminated(Producer producer) {
auto producerImpl = std::dynamic_pointer_cast<ProducerImpl>(producer.impl_);
return producerImpl && producerImpl->state_ == HandlerBase::Terminated;
}
};
} // namespace pulsar

Expand Down
Loading