Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
private ClientRequestHandler handler;

/** Handle registry. */
private final ClientResourceRegistry resReg = new ClientResourceRegistry();
private final ClientResourceRegistry resReg;

/** Max cursors. */
private final int maxCursors;
Expand Down Expand Up @@ -151,6 +151,7 @@ public ClientConnectionContext(
this.maxCursors = maxCursors;
maxActiveTxCnt = thinCfg.getMaxActiveTxPerConnection();
maxActiveComputeTasks = thinCfg.getMaxActiveComputeTasksPerConnection();
resReg = new ClientResourceRegistry(ctx.log(ClientResourceRegistry.class));
}

/**
Expand Down Expand Up @@ -267,7 +268,7 @@ public void incrementCursors() {
}

/**
* Increments the cursor count.
* Decrement the cursor count.
*/
public void decrementCursors() {
curCnt.decrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteLogger;

/**
* Per-connection resource registry.
Expand All @@ -31,6 +32,17 @@ public class ClientResourceRegistry {
/** ID generator. */
private final AtomicLong idGen = new AtomicLong();

/** Logger. */
private final IgniteLogger log;

/**
* Logger for cleanup errors logging.
* @param log Logger.
*/
public ClientResourceRegistry(IgniteLogger log) {
this.log = log;
}

/**
* Allocates server handle for an object.
*
Expand Down Expand Up @@ -85,8 +97,23 @@ public void release(long hnd) {
* Cleans all handles and closes all ClientCloseableResources.
*/
public void clean() {
for (Map.Entry e : res.entrySet())
closeIfNeeded(e.getValue());
for (Map.Entry<Long, Object> e : res.entrySet()) {
Long id = e.getKey();
Object obj = e.getValue();

try {
closeIfNeeded(obj);
}
catch (Exception ex) {
if (log != null && log.isDebugEnabled())
log.debug("Failed to close client resource on disconnect [id=" + id +
", res=" + obj +
", err=" + ex.getClass().getSimpleName() + ": " + ex.getMessage() + ']');
}
finally {
res.remove(id, obj);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ public void startNotifications(long id) {
@Override public void close() {
if (closeGuard.compareAndSet(false, true)) {
assert cur != null;
cur.close();

ctx.decrementCursors();
try {
cur.close();
}
finally {
ctx.decrementCursors();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,11 @@ namespace ignite
{
disconnected = true;

// Clear the queue as we won't be able to process these notifications after disconnect.
queue.clear();

if (handler.IsValid())
return common::SP_ThreadPoolTask(new DisconnectedTask(handler));
handler.Get()->OnDisconnected();

return common::SP_ThreadPoolTask();
}
Expand All @@ -258,13 +261,18 @@ namespace ignite
"Internal error: handler is already set for the notification");

handler = handler0;

// If we are already disconnected, then there is no point in processing notifications.
if (disconnected) {
queue.clear();
handler.Get()->OnDisconnected();
return;
}

for (MessageQueue::iterator it = queue.begin(); it != queue.end(); ++it)
handler.Get()->OnNotification(*it);

queue.clear();

if (disconnected)
handler.Get()->OnDisconnected();
}

private:
Expand Down
Loading