Skip to content

Commit c2ae180

Browse files
authored
[fix][broker] Fix incomplete futures in topic property update/delete methods (#25228)
1 parent 6c79d3a commit c2ae180

2 files changed

Lines changed: 43 additions & 4 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -660,8 +660,10 @@ private CompletableFuture<Void> internalUpdateNonPartitionedTopicProperties(Map<
660660
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
661661
.thenAccept(opt -> {
662662
if (!opt.isPresent()) {
663-
throw new RestException(Status.NOT_FOUND,
664-
getTopicNotFoundErrorMessage(topicName.toString()));
663+
future.completeExceptionally(
664+
new WebApplicationException(getTopicNotFoundErrorMessage(topicName.toString()),
665+
Status.NOT_FOUND));
666+
return;
665667
}
666668
ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
667669
managedLedger.asyncSetProperties(properties, new AsyncCallbacks.UpdatePropertiesCallback() {
@@ -681,6 +683,9 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
681683
future.completeExceptionally(exception);
682684
}
683685
}, null);
686+
}).exceptionally(ex -> {
687+
future.completeExceptionally(ex);
688+
return null;
684689
});
685690
return future;
686691
}
@@ -717,8 +722,10 @@ private CompletableFuture<Void> internalRemoveNonPartitionedTopicProperties(Stri
717722
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
718723
.thenAccept(opt -> {
719724
if (!opt.isPresent()) {
720-
throw new RestException(Status.NOT_FOUND,
721-
getTopicNotFoundErrorMessage(topicName.toString()));
725+
future.completeExceptionally(
726+
new WebApplicationException(getTopicNotFoundErrorMessage(topicName.toString()),
727+
Status.NOT_FOUND));
728+
return;
722729
}
723730
ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
724731
managedLedger.asyncDeleteProperty(key, new AsyncCallbacks.UpdatePropertiesCallback() {
@@ -733,6 +740,9 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
733740
future.completeExceptionally(exception);
734741
}
735742
}, null);
743+
}).exceptionally(ex -> {
744+
future.completeExceptionally(ex);
745+
return null;
736746
});
737747
return future;
738748
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,6 +1412,35 @@ public void testUpdateNonPartitionedTopicProperties() throws Exception {
14121412
Assert.assertEquals(properties.get("key2"), "value2");
14131413
}
14141414

1415+
@Test
1416+
public void testUpdatePropertiesOnNonExistentTopic() throws Exception {
1417+
final String namespace = newUniqueName(defaultTenant + "/ns2");
1418+
final String topicName = "persistent://" + namespace + "/testUpdatePropertiesOnNonExistentTopic";
1419+
admin.namespaces().createNamespace(namespace, 20);
1420+
1421+
// Test updateProperties on non-existent topic should return 404 Not Found
1422+
Map<String, String> topicProperties = new HashMap<>();
1423+
topicProperties.put("key1", "value1");
1424+
try {
1425+
admin.topics().updateProperties(topicName, topicProperties);
1426+
fail("Should have thrown an exception for non-existent topic");
1427+
} catch (Exception e) {
1428+
Assert.expectThrows(PulsarAdminException.NotFoundException.class, () -> {
1429+
throw e;
1430+
});
1431+
}
1432+
1433+
// Test removeProperties on non-existent topic should return 404 Not Found
1434+
try {
1435+
admin.topics().removeProperties(topicName, "key1");
1436+
fail("Should have thrown an exception for non-existent topic");
1437+
} catch (PulsarAdminException.NotFoundException e) {
1438+
Assert.expectThrows(PulsarAdminException.NotFoundException.class, () -> {
1439+
throw e;
1440+
});
1441+
}
1442+
}
1443+
14151444
@Test
14161445
public void testNonPersistentTopics() throws Exception {
14171446
final String namespace = newUniqueName(defaultTenant + "/ns2");

0 commit comments

Comments
 (0)