Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1431,9 +1431,14 @@ private long getNextPage() {

@Override
public synchronized NextResult tryNext() {
// if an unbehaved program called hasNext twice before next, we only cache it once.
if (cachedNext != null) {
return NextResult.hasElements;
PageCursorInfo info = locatePageInfo(cachedNext.getPagedMessage().getPageNumber());
if (info != null && info.isRemoved(cachedNext.getPagedMessage().getMessageNumber())) {
logger.debug("Reference {} has been removed from another iterator, moving it next", cachedNext);
cachedNext = null;
} else {
return NextResult.hasElements;
}
}

if (!pageStore.isStorePaging()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1526,4 +1526,7 @@ void slowConsumerDetected(String sessionID,

@LogMessage(id = 224157, value = "At least one of the components failed to start under the lockCoordinator {}. A retry will be executed", level = LogMessage.Level.INFO)
void retryLockCoordinator(String name);

@LogMessage(id = 224158, value = "The operation {} on queue {} cannot read more data from paging into memory and will be interrupted.", level = LogMessage.Level.INFO)
void preventQueueManagementToFloodMemory(String operation, String queue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,9 @@ public boolean skipDelivery() {

@Override
public void addTail(final MessageReference ref, final boolean direct) {
if (logger.isTraceEnabled()) {
logger.trace("AddTail on queue {}, reference={}", this.getName(), ref);
}
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_TAIL)) {
if (scheduleIfPossible(ref)) {
return;
Expand Down Expand Up @@ -2025,7 +2028,7 @@ public int deleteMatchingReferences(Filter filter) throws Exception {

@Override
public int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception {
return iterQueue(flushLimit, filter1, createDeleteMatchingAction(ackReason));
return iterQueue("deleteMatchingReferences", flushLimit, filter1, createDeleteMatchingAction(ackReason));
}

QueueIterateAction createDeleteMatchingAction(AckReason ackReason) {
Expand All @@ -2039,13 +2042,21 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
};
}

private int iterQueue(final String operationName,
final int flushLimit,
final Filter filter1,
QueueIterateAction messageAction) throws Exception {
return iterQueue(operationName, flushLimit, filter1, messageAction, true);
}
/**
* This is a generic method for any method interacting on the Queue to move or delete messages Instead of duplicate
* the feature we created an abstract class where you pass the logic for each message.
*/
private int iterQueue(final int flushLimit,
private int iterQueue(final String operationName,
final int flushLimit,
final Filter filter1,
QueueIterateAction messageAction) throws Exception {
QueueIterateAction messageAction,
boolean separatePageIterator) throws Exception {
int count = 0;
int txCount = 0;

Expand All @@ -2059,6 +2070,10 @@ private int iterQueue(final int flushLimit,

depageLock.lock();

if (logger.isDebugEnabled()) {
logger.debug("Executing iterQueue for operation {} on queue {}", operationName, getName());
}

try {
Transaction tx = new TransactionImpl(storageManager);

Expand All @@ -2075,6 +2090,13 @@ private int iterQueue(final int flushLimit,
if (messageAction.actMessage(tx, ref)) {
iter.remove();
refRemoved(ref);
if (logger.isTraceEnabled()) {
logger.trace("{} matched act=true on reference {}, during queue iteration", count, ref);
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("{} matched act=false on reference {}, during queue iteration", count, ref);
}
}
txCount++;
count++;
Expand All @@ -2095,9 +2117,10 @@ private int iterQueue(final int flushLimit,

List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(messageAction::match);
for (MessageReference messageReference : cancelled) {
messageAction.actMessage(tx, messageReference);
count++;
txCount++;
if (messageAction.actMessage(tx, messageReference)) {
count++;
txCount++;
}
if (messageAction.expectedHitsReached(count)) {
break;
}
Expand All @@ -2112,24 +2135,60 @@ private int iterQueue(final int flushLimit,
}

if (pageIterator != null) {
while (pageIterator.hasNext() && !messageAction.expectedHitsReached(count)) {
PagedReference reference = pageIterator.next();
pageIterator.remove();
PageIterator theIterator;
if (separatePageIterator) {
theIterator = pageSubscription.iterator();
} else {
theIterator = pageIterator;
}

if (messageAction.match(reference)) {
if (!messageAction.actMessage(tx, reference)) {
addTail(reference, false);
try {
while (theIterator.hasNext() && !messageAction.expectedHitsReached(count)) {
PagedReference reference = theIterator.next();
boolean matched = messageAction.match(reference);
boolean acted = false;

if (matched) {
acted = messageAction.actMessage(tx, reference);
}

if (logger.isTraceEnabled()) {
logger.trace("{} matched={} act={} on reference {}, during queue iteration", count, matched, acted, reference);
}

if (separatePageIterator) {
if (acted) {
theIterator.remove();
}
} else {
theIterator.remove();

if (!acted) {
// Put non-matching or non-acted messages back to the queue tail
addTail(reference, false);
if (!needsDepage()) {
ActiveMQServerLogger.LOGGER.preventQueueManagementToFloodMemory(operationName, String.valueOf(QueueImpl.this.getName()));
break;
}
}
}

if (matched) {
txCount++;
count++;
}

if (txCount > 0 && txCount % flushLimit == 0) {
tx.commit();
tx = new TransactionImpl(storageManager);
txCount = 0;
}
txCount++;
count++;
} else {
addTail(reference, false);
}

if (txCount > 0 && txCount % flushLimit == 0) {
tx.commit();
tx = new TransactionImpl(storageManager);
txCount = 0;
} finally {
if (separatePageIterator) {
// close the iterator if not allowed to depage
theIterator.close();
}
}
}
Expand Down Expand Up @@ -2166,33 +2225,14 @@ public void destroyPaging() throws Exception {

@Override
public synchronized boolean deleteReference(final long messageID) throws Exception {
boolean deleted = false;

Transaction tx = new TransactionImpl(storageManager);

try (LinkedListIterator<MessageReference> iter = iterator()) {

while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
incDelivering(ref);
acknowledge(tx, ref);
iter.remove();
refRemoved(ref);
deleted = true;
break;
}
}

if (!deleted) {
// Look in scheduled deliveries
deleted = scheduledDeliveryHandler.removeReferenceWithID(messageID, tx) != null ? true : false;
return iterQueue("deleteReference", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
incDelivering(ref);
acknowledge(tx, ref);
return true;
}

tx.commit();

return deleted;
}
}) == 1;
}

@Override
Expand Down Expand Up @@ -2441,7 +2481,7 @@ public void run() {
@Override
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {

return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
return iterQueue("sendMessageToDeadLetterAddress", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
Expand All @@ -2457,7 +2497,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
@Override
public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {

return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
return iterQueue("sendMessagesToDeadLetterAddress", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
Expand All @@ -2476,7 +2516,7 @@ public boolean moveReference(final long messageID,
final Binding binding,
final boolean rejectDuplicate) throws Exception {

return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
return iterQueue("moveReference", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
Expand Down Expand Up @@ -2513,7 +2553,7 @@ public int moveReferences(final int flushLimit,
final Integer expectedHits = messageCount > 0 ? messageCount : null;
final DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress);

return iterQueue(flushLimit, filter, new QueueIterateAction(expectedHits) {
return iterQueue("moveReferences", flushLimit, filter, new QueueIterateAction(expectedHits) {
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
boolean ignored = false;
Expand Down Expand Up @@ -2541,7 +2581,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
}

public int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
return iterQueue("moveReferencesBetweenSnFQueues", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
return moveBetweenSnFQueues(queueSuffix, tx, ref, null);
Expand All @@ -2554,7 +2594,7 @@ public boolean copyReference(final long messageID,
final SimpleString toQueue,
final Binding binding) throws Exception {

return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
return iterQueue("copyReference", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
Expand All @@ -2567,7 +2607,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
}

public int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
return iterQueue("rerouteMessages", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
RoutingContext routingContext = new RoutingContextImpl(tx);
Expand All @@ -2592,7 +2632,7 @@ public int retryMessages(Filter filter, Integer expectedHits) throws Exception {

final HashMap<String, Long> queues = new HashMap<>();

return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction(expectedHits) {
return iterQueue("retryMessages", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction(expectedHits) {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
Expand Down Expand Up @@ -2637,30 +2677,38 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
@Override
public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {

return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
// changeReferences is changing the message directly in the queue.
// For that reason, iterQueue here needs to act as if it's depaging messages,
// and thus it needs to use the same iterator from depaging and keep the message at the tail of the
// queue after being read.
return iterQueue("changeReferencePriority", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
ref.getMessage().setPriority(newPriority);
return false;
}

}) == 1;
}, false) == 1;

}

@Override
public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {

return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
// changeReferences is changing the message directly in the queue.
// For that reason, iterQueue here needs to act as if it's depaging messages,
// and thus it needs to use the same iterator from depaging and keep the message at the tail of the
// queue after being read.
return iterQueue("changeReferencesPriority", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
ref.getMessage().setPriority(newPriority);
return false;
}

});
}, false);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ public void testAlternating(String nameServerA, String nameServerB, File brokerP
}

// Send messages through the shared acceptor
cfX = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
sendMessages(cfX, MESSAGES_SENT_PER_ITERATION);

// Consume some messages
Expand Down
Loading