diff --git a/driver/src/main/java/oracle/nosql/driver/http/Client.java b/driver/src/main/java/oracle/nosql/driver/http/Client.java index bbd5776a..5a9a10ba 100644 --- a/driver/src/main/java/oracle/nosql/driver/http/Client.java +++ b/driver/src/main/java/oracle/nosql/driver/http/Client.java @@ -630,7 +630,7 @@ public Result execute(Request kvRequest) { */ kvRequest.setTimeoutInternal(thisIterationTimeoutMs); serialVersionUsed = writeContent(buffer, kvRequest, - queryVersionUsed); + queryVersionUsed); kvRequest.setTimeoutInternal(timeoutMs); /* @@ -1616,11 +1616,11 @@ private synchronized boolean decrementQueryVersion(short versionUsed) { if (queryVersion != versionUsed) { return true; } - if (queryVersion == QueryDriver.QUERY_V4) { - queryVersion = QueryDriver.QUERY_V3; - return true; + if (queryVersion == QueryDriver.QUERY_V3) { + return false; } - return false; + --queryVersion; + return true; } /** diff --git a/driver/src/main/java/oracle/nosql/driver/ops/serde/nson/NsonSerializerFactory.java b/driver/src/main/java/oracle/nosql/driver/ops/serde/nson/NsonSerializerFactory.java index fb791aab..1752edfa 100644 --- a/driver/src/main/java/oracle/nosql/driver/ops/serde/nson/NsonSerializerFactory.java +++ b/driver/src/main/java/oracle/nosql/driver/ops/serde/nson/NsonSerializerFactory.java @@ -1008,7 +1008,7 @@ private static void deserializePrepareOrQuery( } else if (name.equals(DRIVER_QUERY_PLAN)) { dpi = getDriverPlanInfo(Nson.readNsonBinary(in), - serialVersion); + queryVersion); } else if (name.equals(REACHED_LIMIT) && qres != null) { qres.setReachedLimit(Nson.readNsonBoolean(in)); @@ -1179,7 +1179,7 @@ private static void readPhase1Results(byte[] arr, QueryResult result) } private static DriverPlanInfo getDriverPlanInfo(byte[] arr, - short serialVersion) + short queryVersion) throws IOException { if (arr == null || arr.length == 0) { return null; @@ -1187,7 +1187,7 @@ private static DriverPlanInfo getDriverPlanInfo(byte[] arr, ByteBuf buf = Unpooled.wrappedBuffer(arr); ByteInputStream bis = new NettyByteInputStream(buf); DriverPlanInfo dpi = new DriverPlanInfo(); - dpi.driverQueryPlan = PlanIter.deserializeIter(bis, serialVersion); + dpi.driverQueryPlan = PlanIter.deserializeIter(bis, queryVersion); if (dpi.driverQueryPlan == null) { return null; } diff --git a/driver/src/main/java/oracle/nosql/driver/query/PlanIter.java b/driver/src/main/java/oracle/nosql/driver/query/PlanIter.java index 4a3c49b8..7f4e9732 100644 --- a/driver/src/main/java/oracle/nosql/driver/query/PlanIter.java +++ b/driver/src/main/java/oracle/nosql/driver/query/PlanIter.java @@ -306,6 +306,9 @@ protected abstract void displayContent( StringBuilder sb, QueryFormatter formatter); + /* + * Note: the serialVersion param is actually the queryVersion. + */ public static PlanIter deserializeIter( ByteInputStream in, short serialVersion) throws IOException { diff --git a/driver/src/main/java/oracle/nosql/driver/query/QueryDriver.java b/driver/src/main/java/oracle/nosql/driver/query/QueryDriver.java index 809de7f0..cf987846 100644 --- a/driver/src/main/java/oracle/nosql/driver/query/QueryDriver.java +++ b/driver/src/main/java/oracle/nosql/driver/query/QueryDriver.java @@ -35,7 +35,10 @@ public class QueryDriver { /* added query name in QueryRequest */ public static short QUERY_V4 = 4; - public static short QUERY_VERSION = QUERY_V4; + /* added limit iter field in SortIter */ + public static short QUERY_V5 = 5; + + public static short QUERY_VERSION = QUERY_V5; private static final int BATCH_SIZE = 100; diff --git a/driver/src/main/java/oracle/nosql/driver/query/SortIter.java b/driver/src/main/java/oracle/nosql/driver/query/SortIter.java index 91f5d898..d06a56ca 100644 --- a/driver/src/main/java/oracle/nosql/driver/query/SortIter.java +++ b/driver/src/main/java/oracle/nosql/driver/query/SortIter.java @@ -10,6 +10,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.PriorityQueue; import oracle.nosql.driver.query.PlanIterState.StateEnum; import oracle.nosql.driver.util.ByteInputStream; @@ -25,35 +26,120 @@ */ public class SortIter extends PlanIter { + private class CompareFunction implements Comparator { + + RuntimeControlBlock theRCB; + + CompareFunction(RuntimeControlBlock rcb) { + theRCB = rcb; + } + + @Override + public int compare(MapValue v1, MapValue v2) { + + return Compare.sortResults(theRCB, + v1, + v2, + theSortFields, + theSortSpecs); + } + } + + private static class ReverseCompareFunction implements Comparator { + + CompareFunction theComparator; + + ReverseCompareFunction(CompareFunction comparator) { + theComparator = comparator; + } + + @Override + public int compare(MapValue v1, MapValue v2) { + return -theComparator.compare(v1, v2); + } + } + private static class SortIterState extends PlanIterState { - final ArrayList theResults; + CompareFunction theComparator; + + /* + * If theLimit is < 0, the SortIter will cache the full query result + * set (in theResults) and then sort it and return it one result at a + * time. theLimit will be > 0 if the query contains LIMIT clause. In + * this case, theLimit is the sum of the OFFSET and LIMIT values + * specified in the query and the SortIter does not need to cache more + * than theLimit results. Specifically, if theLimit is > 0, the SortIter + * works as follows: + * a. Up to theLimit results are initially cached in theResults. + * b. If there are no more than theLimit results, theResults is sorted + * and then theSortIter returns these sorted result one at a time. + * c. Otherwise, when theLimit results have been cached, they are + * transfered from theResults to theResultsQueue, which keeps + * the results sorted. Any subsequent result R is compared with + * greatest result, R_MAX, in the queue: if it R > R_MAX, it is + * discarded; otherwise, R is inserted in the queue and R_MAX is + * removed from the queue and discarded. + * d. Note that PriorityQueue gives access to the least result in + * the queue, not the greatest. To handle this, we use the + * ReverseCompareFunction with theResultsQueue, to sort the + * results in the reverse of the desired order. Then, when the + * full result set has been computed, the SortIter transfers + * the results in theResultsQueue to theResultsArray in the + * correct order. After this point, the SortIter returns the + * the results in theResultsArray, one at a time. + */ + int theLimit = -1; + + ArrayList theResults; + + PriorityQueue theResultsQueue; + + MapValue[] theResultsArray; + + int theNumResults; int theCurrResult; - public SortIterState() { + public SortIterState(RuntimeControlBlock rcb, SortIter iter, int limit) { super(); - theResults = new ArrayList(128); + theLimit = limit; + if (theLimit > 0) { + theResults = new ArrayList(theLimit); + } else { + theResults = new ArrayList(4096); + } + theComparator = iter.new CompareFunction(rcb); } @Override public void done() { super.done(); - theCurrResult = 0; - theResults.clear(); + theResults = null; + theResultsQueue = null; + theResultsArray = null; } @Override public void reset(PlanIter iter) { super.reset(iter); theCurrResult = 0; - theResults.clear(); + theNumResults = 0; + if (theLimit > 0) { + theResults = new ArrayList(theLimit); + } else { + theResults = new ArrayList(4096); + } + theResultsQueue = null; + theResultsArray = null; } @Override public void close() { super.close(); - theResults.clear(); + theResults = null; + theResultsQueue = null; + theResultsArray = null; } } @@ -63,13 +149,15 @@ public void close() { private final SortSpec[] theSortSpecs; + private final PlanIter theLimit; + private final boolean theCountMemory; - public SortIter(ByteInputStream in, PlanIterKind kind, short serialVersion) + public SortIter(ByteInputStream in, PlanIterKind kind, short queryVersion) throws IOException { - super(in, serialVersion); - theInput = deserializeIter(in, serialVersion); + super(in, queryVersion); + theInput = deserializeIter(in, queryVersion); theSortFields = SerializationUtil.readStringArray(in); theSortSpecs = readSortSpecs(in); @@ -79,6 +167,12 @@ public SortIter(ByteInputStream in, PlanIterKind kind, short serialVersion) } else { theCountMemory = true; } + + if (queryVersion >= QueryDriver.QUERY_V5) { + theLimit = deserializeIter(in, queryVersion); + } else { + theLimit = null; + } } @Override @@ -93,7 +187,17 @@ PlanIter getInputIter() { @Override public void open(RuntimeControlBlock rcb) { - SortIterState state = new SortIterState(); + + int limit = - 1; + + if (theLimit != null) { + theLimit.open(rcb); + theLimit.next(rcb); + FieldValue val = rcb.getRegVal(theLimit.getResultReg()); + limit = val.getInt(); + } + + SortIterState state = new SortIterState(rcb, this, limit); rcb.setState(theStatePos, state); theInput.open(rcb); } @@ -143,11 +247,55 @@ public boolean next(RuntimeControlBlock rcb) { } } - state.theResults.add(v); + if (state.theLimit < 0) { + state.theResults.add(v); + + if (theCountMemory) { + long sz = v.sizeof() + SizeOf.OBJECT_REF_OVERHEAD; + rcb.incMemoryConsumption(sz); + } + } else if (state.theResults != null && + state.theResults.size() < state.theLimit) { + + state.theResults.add(v); + + if (theCountMemory) { + long sz = v.sizeof() + SizeOf.OBJECT_REF_OVERHEAD; + rcb.incMemoryConsumption(sz); + } + + if (state.theResults.size() == state.theLimit) { + + ReverseCompareFunction comparator = + new ReverseCompareFunction(state.theComparator); + + state.theResultsQueue = new PriorityQueue<>(comparator); - if (theCountMemory) { - long sz = v.sizeof() + SizeOf.OBJECT_REF_OVERHEAD; - rcb.incMemoryConsumption(sz); + for (MapValue result : state.theResults) { + state.theResultsQueue.add(result); + } + state.theResults = null; + } + } else { + MapValue lastResult = state.theResultsQueue.peek(); + + if (state.theComparator.compare(lastResult, v) > 0) { + + state.theResultsQueue.remove(); + state.theResultsQueue.add(v); + + if (rcb.getTraceLevel() >= 4) { + rcb.trace("SortIter: added top result: " + v + + "\nin place of " + lastResult); + } + + if (theCountMemory) { + long sz = v.sizeof(); + rcb.incMemoryConsumption(sz); + sz = lastResult.sizeof(); + rcb.decMemoryConsumption(sz); + } + } } more = theInput.next(rcb); @@ -157,22 +305,43 @@ public boolean next(RuntimeControlBlock rcb) { return false; } - state.theResults.sort( - (v1, v2) -> Compare.sortResults(rcb, - v1, - v2, - theSortFields, - theSortSpecs)); + if (state.theResultsQueue != null) { + /* Move the results from the queue to an array, maintaining their + * sort order. This is needed because iterating over the queue + * using an Iterator does not guarantee that the results will be + * retrieved in their sorted order. */ + state.theResultsArray = new MapValue[state.theLimit]; + + int i = state.theLimit - 1; + while (!state.theResultsQueue.isEmpty()) { + MapValue result = state.theResultsQueue.remove(); + state.theResultsArray[i] = result; + --i; + } + + state.theResultsQueue = null; + state.theNumResults = state.theLimit; + } else { + state.theResults.sort(state.theComparator); + state.theNumResults = state.theResults.size(); + } state.setState(StateEnum.RUNNING); } - if (state.theCurrResult < state.theResults.size()) { + if (state.theCurrResult < state.theNumResults) { + + MapValue v; + if (state.theResultsArray == null) { + v = state.theResults.get(state.theCurrResult); + state.theResults.set(state.theCurrResult, null); + } else { + v = state.theResultsArray[state.theCurrResult]; + state.theResultsArray[state.theCurrResult] = null; + } - MapValue v = state.theResults.get(state.theCurrResult); v.convertEmptyToNull(); rcb.setRegVal(theResultReg, v); - state.theResults.set(state.theCurrResult, null); ++state.theCurrResult; return true; }