ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anovi...@apache.org
Subject ignite git commit: IGNITE-1161 Close rest sql cursor after delay. - Fixes #197.
Date Mon, 09 Nov 2015 03:32:12 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1.5 7dfaa3b06 -> 621ecac31


IGNITE-1161 Close rest sql cursor after delay. - Fixes #197.

Signed-off-by: Andrey <anovikov@gridgain.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/621ecac3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/621ecac3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/621ecac3

Branch: refs/heads/ignite-1.5
Commit: 621ecac317f55ba467bdb16321bfe550f5d3319b
Parents: 7dfaa3b
Author: Andrey <anovikov@gridgain.com>
Authored: Mon Nov 9 10:30:56 2015 +0700
Committer: Andrey <anovikov@gridgain.com>
Committed: Mon Nov 9 10:30:56 2015 +0700

----------------------------------------------------------------------
 .../rest/AbstractRestProcessorSelfTest.java     |   5 +-
 .../JettyRestProcessorAbstractSelfTest.java     |  37 +++
 .../configuration/ConnectorConfiguration.java   |  61 ++++-
 .../handlers/query/QueryCommandHandler.java     | 258 +++++++++++++++----
 4 files changed, 302 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
index bde9180..9a030a7 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java
@@ -75,6 +75,9 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest
{
 
         clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml");
 
+        clientCfg.setIdleQueryCursorTimeout(5000);
+        clientCfg.setIdleQueryCursorCheckFrequency(5000);
+
         cfg.setConnectorConfiguration(clientCfg);
 
         TcpDiscoverySpi disco = new TcpDiscoverySpi();
@@ -99,4 +102,4 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest
{
     @Override protected <K, V> IgniteCache<K, V> jcache() {
         return grid(0).cache(null);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index c413bbd..c9c4ced 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata;
 import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -1406,6 +1407,42 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
         assertFalse(queryCursorFound());
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryDelay() throws Exception {
+        String qry = "salary > ? and salary <= ?";
+
+        Map<String, String> params = new HashMap<>();
+        params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key());
+        params.put("type", "Person");
+        params.put("pageSize", "1");
+        params.put("cacheName", "person");
+        params.put("qry", URLEncoder.encode(qry));
+        params.put("arg1", "1000");
+        params.put("arg2", "2000");
+
+        String ret = null;
+
+        for (int i = 0; i < 10; ++i)
+            ret = content(params);
+
+        assertNotNull(ret);
+        assertTrue(!ret.isEmpty());
+
+        JSONObject json = JSONObject.fromObject(ret);
+
+        List items = (List)((Map)json.get("response")).get("items");
+
+        assertEquals(1, items.size());
+
+        assertTrue(queryCursorFound());
+
+        U.sleep(10000);
+
+        assertFalse(queryCursorFound());
+    }
+
     protected abstract String signature() throws Exception;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
index 88d015c..1bfcbe4 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java
@@ -59,6 +59,12 @@ public class ConnectorConfiguration {
     /** Default socket send and receive buffer size. */
     public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
 
+    /** Default REST idle timeout for query cursor. */
+    private static final long DFLT_IDLE_QRY_CUR_TIMEOUT = 10 * 60 * 1000;
+
+    /** Default REST check frequency for idle query cursor. */
+    private static final long DFLT_IDLE_QRY_CUR_CHECK_FRQ = 60 * 1000;
+
     /** Jetty XML configuration path. */
     private String jettyPath;
 
@@ -83,6 +89,12 @@ public class ConnectorConfiguration {
     /** REST TCP receive buffer size. */
     private int rcvBufSize = DFLT_SOCK_BUF_SIZE;
 
+    /** REST idle timeout for query cursor. */
+    private long idleQryCurTimeout = DFLT_IDLE_QRY_CUR_TIMEOUT;
+
+    /** REST idle check frequency for query cursor. */
+    private long idleQryCurCheckFreq = DFLT_IDLE_QRY_CUR_CHECK_FRQ;
+
     /** REST TCP send queue limit. */
     private int sndQueueLimit;
 
@@ -146,6 +158,8 @@ public class ConnectorConfiguration {
         sslClientAuth = cfg.isSslClientAuth();
         sslCtxFactory = cfg.getSslContextFactory();
         sslEnabled = cfg.isSslEnabled();
+        idleQryCurTimeout = cfg.getIdleQueryCursorTimeout();
+        idleQryCurCheckFreq = cfg.getIdleQueryCursorCheckFrequency();
     }
 
     /**
@@ -545,4 +559,49 @@ public class ConnectorConfiguration {
     public void setMessageInterceptor(ConnectorMessageInterceptor interceptor) {
         msgInterceptor = interceptor;
     }
-}
\ No newline at end of file
+
+    /**
+     * Sets idle query cursors timeout.
+     *
+     * @param idleQryCurTimeout Idle query cursors timeout in milliseconds.
+     * @see #getIdleQueryCursorTimeout()
+     */
+    public void setIdleQueryCursorTimeout(long idleQryCurTimeout) {
+        this.idleQryCurTimeout = idleQryCurTimeout;
+    }
+
+    /**
+     * Gets idle query cursors timeout in milliseconds.
+     * <p>
+     * This setting is used to reject open query cursors that is not used. If no fetch query
request
+     * come within idle timeout, it will be removed on next check for old query cursors
+     * (see {@link #getIdleQueryCursorCheckFrequency()}).
+     *
+     * @return Idle query cursors timeout in milliseconds
+     */
+    public long getIdleQueryCursorTimeout() {
+        return idleQryCurTimeout;
+    }
+
+    /**
+     * Sets idle query cursor check frequency.
+     *
+     * @param idleQryCurCheckFreq Idle query check frequency in milliseconds.
+     * @see #getIdleQueryCursorCheckFrequency()
+     */
+    public void setIdleQueryCursorCheckFrequency(long idleQryCurCheckFreq) {
+        this.idleQryCurCheckFreq = idleQryCurCheckFreq;
+    }
+
+    /**
+     * Gets idle query cursors check frequency.
+     * This setting is used to reject open query cursors that is not used.
+     * <p>
+     * Scheduler tries with specified period to close queries' cursors that are overtime.
+     *
+     * @return Idle query cursor check frequency in milliseconds.
+     */
+    public long getIdleQueryCursorCheckFrequency() {
+        return idleQryCurCheckFreq;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
index f4ddd59..54cdd29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java
@@ -23,9 +23,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.Query;
@@ -45,7 +47,6 @@ import org.apache.ignite.internal.processors.rest.request.RestQueryRequest;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgniteBiTuple;
 
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLOSE_SQL_QUERY;
 import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SCAN_QUERY;
@@ -68,25 +69,53 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter
{
     private static final AtomicLong qryIdGen = new AtomicLong();
 
     /** Current queries cursors. */
-    private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>>
qryCurs = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs = new ConcurrentHashMap<>();
 
     /**
      * @param ctx Context.
      */
     public QueryCommandHandler(GridKernalContext ctx) {
         super(ctx);
+
+        final long idleQryCurTimeout = ctx.config().getConnectorConfiguration().getIdleQueryCursorTimeout();
+
+        long idleQryCurCheckFreq = ctx.config().getConnectorConfiguration().getIdleQueryCursorCheckFrequency();
+
+        ctx.timeout().schedule(new Runnable() {
+            @Override public void run() {
+                long time = U.currentTimeMillis();
+
+                for (Map.Entry<Long, QueryCursorIterator> e : qryCurs.entrySet()) {
+                    QueryCursorIterator qryCurIt = e.getValue();
+
+                    long createTime = qryCurIt.timestamp();
+
+                    if (createTime + idleQryCurTimeout > time && qryCurIt.tryLock())
{
+                        try {
+                            qryCurIt.timestamp(-1);
+
+                            qryCurs.remove(e.getKey(), qryCurIt);
+
+                            qryCurIt.close();
+                        }
+                        finally {
+                            qryCurIt.unlock();
+                        }
+                    }
+                }
+            }
+        }, idleQryCurCheckFreq, idleQryCurCheckFreq);
     }
 
     /**
-     * @param qryCurs Query cursors.
      * @param cur Current cursor.
      * @param req Sql request.
      * @param qryId Query id.
+     * @param qryCurs Query cursors.
      * @return Query result with items.
      */
     private static CacheQueryResult createQueryResult(
-        ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs,
-        Iterator cur, RestQueryRequest req, Long qryId) {
+        Iterator cur, RestQueryRequest req, Long qryId, ConcurrentHashMap<Long, QueryCursorIterator>
qryCurs) {
         CacheQueryResult res = new CacheQueryResult();
 
         List<Object> items = new ArrayList<>();
@@ -101,12 +130,39 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter
{
         res.setQueryId(qryId);
 
         if (!cur.hasNext())
-            qryCurs.remove(qryId);
+            removeQueryCursor(qryId, qryCurs);
 
         return res;
     }
 
     /**
+     * Removes query cursor.
+     *
+     * @param qryId Query id.
+     * @param qryCurs Query cursors.
+     */
+    private static void removeQueryCursor(Long qryId, ConcurrentHashMap<Long, QueryCursorIterator>
qryCurs) {
+        QueryCursorIterator qryCurIt = qryCurs.get(qryId);
+
+        if (qryCurIt == null)
+            return;
+
+        qryCurIt.lock();
+
+        try {
+            if (qryCurIt.timestamp() == -1)
+                return;
+
+            qryCurIt.close();
+
+            qryCurs.remove(qryId);
+        }
+        finally {
+            qryCurIt.unlock();
+        }
+    }
+
+    /**
      * Creates class instance.
      *
      * @param cls Target class.
@@ -169,7 +225,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter
{
 
             case FETCH_SQL_QUERY: {
                 return ctx.closure().callLocalSafe(
-                    new FetchQueryCallable(ctx, (RestQueryRequest)req, qryCurs), false);
+                    new FetchQueryCallable((RestQueryRequest)req, qryCurs), false);
             }
 
             case CLOSE_SQL_QUERY: {
@@ -191,16 +247,16 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter
{
         /** Execute query request. */
         private RestQueryRequest req;
 
-        /** Queries cursors. */
-        private ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>>
qryCurs;
+        /** Current queries cursors. */
+        private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
 
         /**
          * @param ctx Kernal context.
          * @param req Execute query request.
-         * @param qryCurs Queries cursors.
+         * @param qryCurs Query cursors.
          */
         public ExecuteQueryCallable(GridKernalContext ctx, RestQueryRequest req,
-            ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs)
{
+            ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) {
             this.ctx = ctx;
             this.req = req;
             this.qryCurs = qryCurs;
@@ -208,7 +264,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter
{
 
         /** {@inheritDoc} */
         @Override public GridRestResponse call() throws Exception {
-            long qryId = qryIdGen.getAndIncrement();
+            final long qryId = qryIdGen.getAndIncrement();
 
             try {
                 Query qry;
@@ -248,38 +304,51 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter
{
                     return new GridRestResponse(GridRestResponse.STATUS_FAILED,
                         "Failed to find cache with name: " + req.cacheName());
 
-                QueryCursor qryCur = cache.query(qry);
+                final QueryCursor qryCur = cache.query(qry);
 
                 Iterator cur = qryCur.iterator();
 
-                qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur));
+                QueryCursorIterator qryCurIt = new QueryCursorIterator(qryCur, cur);
 
-                CacheQueryResult res = createQueryResult(qryCurs, cur, req, qryId);
+                qryCurIt.lock();
 
-                switch (req.queryType()) {
-                    case SQL:
-                    case SQL_FIELDS:
-                        List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl)
qryCur).fieldsMeta();
+                try {
+                    qryCurs.put(qryId, qryCurIt);
 
-                        res.setFieldsMetadata(convertMetadata(fieldsMeta));
+                    CacheQueryResult res = createQueryResult(cur, req, qryId, qryCurs);
 
-                        break;
-                    case SCAN:
-                        CacheQueryFieldsMetaResult keyField = new CacheQueryFieldsMetaResult();
-                        keyField.setFieldName("key");
+                    switch (req.queryType()) {
+                        case SQL:
+                        case SQL_FIELDS:
+                            List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl)qryCur).fieldsMeta();
 
-                        CacheQueryFieldsMetaResult valField = new CacheQueryFieldsMetaResult();
-                        valField.setFieldName("value");
+                            res.setFieldsMetadata(convertMetadata(fieldsMeta));
 
-                        res.setFieldsMetadata(U.sealList(keyField, valField));
+                            break;
+                        case SCAN:
+                            CacheQueryFieldsMetaResult keyField = new CacheQueryFieldsMetaResult();
+                            keyField.setFieldName("key");
 
-                        break;
-                }
+                            CacheQueryFieldsMetaResult valField = new CacheQueryFieldsMetaResult();
+                            valField.setFieldName("value");
+
+                            res.setFieldsMetadata(U.sealList(keyField, valField));
 
-                return new GridRestResponse(res);
+                            break;
+                    }
+
+                    List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>)qryCur).fieldsMeta();
+
+                    res.setFieldsMetadata(convertMetadata(fieldsMeta));
+
+                    return new GridRestResponse(res);
+                }
+                finally {
+                    qryCurIt.unlock();
+                }
             }
             catch (Exception e) {
-                qryCurs.remove(qryId);
+                removeQueryCursor(qryId, qryCurs);
 
                 return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
             }
@@ -305,17 +374,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter
{
      * Close query callable.
      */
     private static class CloseQueryCallable implements Callable<GridRestResponse> {
-        /** Queries cursors. */
-        private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>>
qryCurs;
+        /** Current queries cursors. */
+        private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
+
         /** Execute query request. */
         private RestQueryRequest req;
 
         /**
          * @param req Execute query request.
-         * @param qryCurs Queries cursors.
+         * @param qryCurs Query cursors.
          */
-        public CloseQueryCallable(RestQueryRequest req,
-            ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs)
{
+        public CloseQueryCallable(RestQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator>
qryCurs) {
             this.req = req;
             this.qryCurs = qryCurs;
         }
@@ -323,20 +392,29 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter
{
         /** {@inheritDoc} */
         @Override public GridRestResponse call() throws Exception {
             try {
-                QueryCursor cur = qryCurs.get(req.queryId()).get1();
+                QueryCursorIterator qryCurIt = qryCurs.get(req.queryId());
 
-                if (cur == null)
-                    return new GridRestResponse(GridRestResponse.STATUS_FAILED,
-                        "Failed to find query with ID: " + req.queryId());
+                if (qryCurIt == null)
+                    return new GridRestResponse(true);
 
-                cur.close();
+                qryCurIt.lock();
 
-                qryCurs.remove(req.queryId());
+                try {
+                    if (qryCurIt.timestamp() == -1)
+                        return new GridRestResponse(true);
+
+                    qryCurIt.close();
+
+                    qryCurs.remove(req.queryId());
+                }
+                finally {
+                    qryCurIt.unlock();
+                }
 
                 return new GridRestResponse(true);
             }
             catch (Exception e) {
-                qryCurs.remove(req.queryId());
+                removeQueryCursor(req.queryId(), qryCurs);
 
                 return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
             }
@@ -347,21 +425,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter
{
      * Fetch query callable.
      */
     private static class FetchQueryCallable implements Callable<GridRestResponse> {
-        /** Queries cursors. */
-        private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>>
qryCurs;
-        /** Grid kernal context. */
-        private final GridKernalContext ctx;
+        /** Current queries cursors. */
+        private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs;
+
         /** Execute query request. */
         private RestQueryRequest req;
 
         /**
-         * @param ctx Grid kernal context.
          * @param req Execute query request.
-         * @param qryCurs Queries cursors.
+         * @param qryCurs Query cursors.
          */
-        public FetchQueryCallable(GridKernalContext ctx, RestQueryRequest req,
-            ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs)
{
-            this.ctx = ctx;
+        public FetchQueryCallable(RestQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator>
qryCurs) {
             this.req = req;
             this.qryCurs = qryCurs;
         }
@@ -369,21 +443,91 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter
{
         /** {@inheritDoc} */
         @Override public GridRestResponse call() throws Exception {
             try {
-                Iterator cur = qryCurs.get(req.queryId()).get2();
+                QueryCursorIterator qryCurIt = qryCurs.get(req.queryId());
 
-                if (cur == null)
+                if (qryCurIt == null)
                     return new GridRestResponse(GridRestResponse.STATUS_FAILED,
                         "Failed to find query with ID: " + req.queryId());
 
-                CacheQueryResult res = createQueryResult(qryCurs, cur, req, req.queryId());
+                qryCurIt.lock();
+
+                try {
+                    if (qryCurIt.timestamp() == -1)
+                        return new GridRestResponse(GridRestResponse.STATUS_FAILED,
+                            "Query is closed by timeout. Restart query with ID: " + req.queryId());
+
+                    qryCurIt.timestamp(U.currentTimeMillis());
 
-                return new GridRestResponse(res);
+                    Iterator cur = qryCurIt.iterator();
+
+                    CacheQueryResult res = createQueryResult(cur, req, req.queryId(), qryCurs);
+
+                    return new GridRestResponse(res);
+                }
+                finally {
+                    qryCurIt.unlock();
+                }
             }
             catch (Exception e) {
-                qryCurs.remove(req.queryId());
+                removeQueryCursor(req.queryId(), qryCurs);
 
                 return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage());
             }
         }
     }
+
+    /**
+     * Query cursor iterator.
+     */
+    private static class QueryCursorIterator extends ReentrantLock {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Query cursor. */
+        private QueryCursor cur;
+
+        /** Query iterator. */
+        private Iterator it;
+
+        /** Last timestamp. */
+        private volatile long ts;
+
+        /**
+         * @param cur Query cursor.
+         * @param it Query iterator.
+         */
+        public QueryCursorIterator(QueryCursor cur, Iterator it) {
+            this.cur = cur;
+            this.it = it;
+            ts = U.currentTimeMillis();
+        }
+
+        /**
+         * @return Query iterator.
+         */
+        public Iterator iterator() {
+            return it;
+        }
+
+        /**
+         * @return Timestamp.
+         */
+        public long timestamp() {
+            return ts;
+        }
+
+        /**
+         * @param time Current time or -1 if cursor is closed.
+         */
+        public void timestamp(long time) {
+            ts = time;
+        }
+
+        /**
+         * Close query cursor.
+         */
+        public void close() {
+            cur.close();
+        }
+    }
 }


Mime
View raw message