Skip to content

Commit a88be32

Browse files
authored
Adding forward DLQ to management option (#76)
* adding forward DLQ management option * caching suite.T()
1 parent 37c859c commit a88be32

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

integration/publisher_queue_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,25 @@ func (suite *serviceBusQueueSuite) TestCreatePublisherUsingExistingQueue() {
4444
}
4545
}
4646

47+
// TestCreatePublisherWithDeadLetterForwardUsingNewQueue tests the creation of a publisher for a new queue
48+
func (suite *serviceBusQueueSuite) TestCreatePublisherWithDeadLetterForwardUsingNewQueue() {
49+
testContext := suite.T()
50+
suite.Parallel()
51+
queueName := "newQueue" + suite.TagID
52+
_, err := queue.NewPublisher(context.Background(), queueName, suite.publisherAuthOption, publisher.WithForwardDeadLetteredMessagesTo(queueName+"DLQ", 1000))
53+
if suite.NoError(err) {
54+
// make sure that queue exists
55+
ns := suite.GetNewNamespace()
56+
tm := ns.NewQueueManager()
57+
_, err := tm.Get(context.Background(), queueName)
58+
require.NoError(testContext, err)
59+
60+
// delete new queue
61+
err = tm.Delete(context.Background(), queueName)
62+
require.NoError(testContext, err)
63+
}
64+
}
65+
4766
// TestPublishAfterIdle tests the creation of a publisher for an existing queue and a connection string
4867
func (suite *serviceBusQueueSuite) TestPublishAfterIdle() {
4968
suite.T().Parallel()

queue/publisher/options.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
package publisher
22

33
import (
4-
"github.com/Azure/go-shuttle/common"
5-
"github.com/Azure/go-shuttle/common/options/publisheropts"
4+
"context"
65
"time"
76

8-
"github.com/Azure/azure-service-bus-go"
7+
servicebus "github.com/Azure/azure-service-bus-go"
98
"github.com/Azure/go-autorest/autorest/adal"
9+
"github.com/Azure/go-shuttle/common"
10+
"github.com/Azure/go-shuttle/common/options/publisheropts"
1011
)
1112

13+
type DeadLetterTarget struct {
14+
}
15+
1216
// ManagementOption provides structure for configuring a new Publisher
1317
type ManagementOption = publisheropts.ManagementOption
1418

@@ -55,6 +59,24 @@ func WithDuplicateDetection(window *time.Duration) ManagementOption {
5559
}
5660
}
5761

62+
// WithForwardDeadLetteredMessagesTo forwards deadlettered messages to a targetable queue, the identity must have management permissions on said queue
63+
func WithForwardDeadLetteredMessagesTo(deadLetterTargetName string, deliveryCount int) ManagementOption {
64+
return func(p common.Publisher) error {
65+
qm := p.Namespace().NewQueueManager()
66+
67+
if _, err := qm.Put(context.Background(), deadLetterTargetName, servicebus.QueueEntityWithMaxDeliveryCount(int32(deliveryCount))); err != nil {
68+
return err
69+
}
70+
71+
deadLetterTarget, err := p.Namespace().NewQueueManager().Get(context.Background(), deadLetterTargetName)
72+
if err != nil {
73+
return err
74+
}
75+
p.(QueuePublisher).AppendQueueManagementOption(servicebus.QueueEntityWithForwardDeadLetteredMessagesTo(deadLetterTarget))
76+
return nil
77+
}
78+
}
79+
5880
// SetMessageDelay schedules a message in the future
5981
func SetMessageDelay(delay time.Duration) Option {
6082
return publisheropts.SetMessageDelay(delay)

0 commit comments

Comments
 (0)