Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E090618EAD for ; Mon, 9 Nov 2015 03:32:12 +0000 (UTC) Received: (qmail 561 invoked by uid 500); 9 Nov 2015 03:32:12 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 525 invoked by uid 500); 9 Nov 2015 03:32:12 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 516 invoked by uid 99); 9 Nov 2015 03:32:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Nov 2015 03:32:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 91BC0DFDCC; Mon, 9 Nov 2015 03:32:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: anovikov@apache.org To: commits@ignite.apache.org Message-Id: <3936855f94bf4ff790b9004a1b760fbf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: IGNITE-1161 Close rest sql cursor after delay. - Fixes #197. Date: Mon, 9 Nov 2015 03:32:12 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/ignite-1.5 7dfaa3b06 -> 621ecac31 IGNITE-1161 Close rest sql cursor after delay. - Fixes #197. Signed-off-by: Andrey Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/621ecac3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/621ecac3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/621ecac3 Branch: refs/heads/ignite-1.5 Commit: 621ecac317f55ba467bdb16321bfe550f5d3319b Parents: 7dfaa3b Author: Andrey Authored: Mon Nov 9 10:30:56 2015 +0700 Committer: Andrey Committed: Mon Nov 9 10:30:56 2015 +0700 ---------------------------------------------------------------------- .../rest/AbstractRestProcessorSelfTest.java | 5 +- .../JettyRestProcessorAbstractSelfTest.java | 37 +++ .../configuration/ConnectorConfiguration.java | 61 ++++- .../handlers/query/QueryCommandHandler.java | 258 +++++++++++++++---- 4 files changed, 302 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java index bde9180..9a030a7 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java @@ -75,6 +75,9 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest { clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml"); + clientCfg.setIdleQueryCursorTimeout(5000); + clientCfg.setIdleQueryCursorCheckFrequency(5000); + cfg.setConnectorConfiguration(clientCfg); TcpDiscoverySpi disco = new TcpDiscoverySpi(); @@ -99,4 +102,4 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest { @Override protected IgniteCache jcache() { return grid(0).cache(null); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index c413bbd..c9c4ced 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata; import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.GridTestUtils; @@ -1406,6 +1407,42 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro assertFalse(queryCursorFound()); } + /** + * @throws Exception If failed. + */ + public void testQueryDelay() throws Exception { + String qry = "salary > ? and salary <= ?"; + + Map params = new HashMap<>(); + params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key()); + params.put("type", "Person"); + params.put("pageSize", "1"); + params.put("cacheName", "person"); + params.put("qry", URLEncoder.encode(qry)); + params.put("arg1", "1000"); + params.put("arg2", "2000"); + + String ret = null; + + for (int i = 0; i < 10; ++i) + ret = content(params); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + JSONObject json = JSONObject.fromObject(ret); + + List items = (List)((Map)json.get("response")).get("items"); + + assertEquals(1, items.size()); + + assertTrue(queryCursorFound()); + + U.sleep(10000); + + assertFalse(queryCursorFound()); + } + protected abstract String signature() throws Exception; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java index 88d015c..1bfcbe4 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java @@ -59,6 +59,12 @@ public class ConnectorConfiguration { /** Default socket send and receive buffer size. */ public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; + /** Default REST idle timeout for query cursor. */ + private static final long DFLT_IDLE_QRY_CUR_TIMEOUT = 10 * 60 * 1000; + + /** Default REST check frequency for idle query cursor. */ + private static final long DFLT_IDLE_QRY_CUR_CHECK_FRQ = 60 * 1000; + /** Jetty XML configuration path. */ private String jettyPath; @@ -83,6 +89,12 @@ public class ConnectorConfiguration { /** REST TCP receive buffer size. */ private int rcvBufSize = DFLT_SOCK_BUF_SIZE; + /** REST idle timeout for query cursor. */ + private long idleQryCurTimeout = DFLT_IDLE_QRY_CUR_TIMEOUT; + + /** REST idle check frequency for query cursor. */ + private long idleQryCurCheckFreq = DFLT_IDLE_QRY_CUR_CHECK_FRQ; + /** REST TCP send queue limit. */ private int sndQueueLimit; @@ -146,6 +158,8 @@ public class ConnectorConfiguration { sslClientAuth = cfg.isSslClientAuth(); sslCtxFactory = cfg.getSslContextFactory(); sslEnabled = cfg.isSslEnabled(); + idleQryCurTimeout = cfg.getIdleQueryCursorTimeout(); + idleQryCurCheckFreq = cfg.getIdleQueryCursorCheckFrequency(); } /** @@ -545,4 +559,49 @@ public class ConnectorConfiguration { public void setMessageInterceptor(ConnectorMessageInterceptor interceptor) { msgInterceptor = interceptor; } -} \ No newline at end of file + + /** + * Sets idle query cursors timeout. + * + * @param idleQryCurTimeout Idle query cursors timeout in milliseconds. + * @see #getIdleQueryCursorTimeout() + */ + public void setIdleQueryCursorTimeout(long idleQryCurTimeout) { + this.idleQryCurTimeout = idleQryCurTimeout; + } + + /** + * Gets idle query cursors timeout in milliseconds. + *

+ * This setting is used to reject open query cursors that is not used. If no fetch query request + * come within idle timeout, it will be removed on next check for old query cursors + * (see {@link #getIdleQueryCursorCheckFrequency()}). + * + * @return Idle query cursors timeout in milliseconds + */ + public long getIdleQueryCursorTimeout() { + return idleQryCurTimeout; + } + + /** + * Sets idle query cursor check frequency. + * + * @param idleQryCurCheckFreq Idle query check frequency in milliseconds. + * @see #getIdleQueryCursorCheckFrequency() + */ + public void setIdleQueryCursorCheckFrequency(long idleQryCurCheckFreq) { + this.idleQryCurCheckFreq = idleQryCurCheckFreq; + } + + /** + * Gets idle query cursors check frequency. + * This setting is used to reject open query cursors that is not used. + *

+ * Scheduler tries with specified period to close queries' cursors that are overtime. + * + * @return Idle query cursor check frequency in milliseconds. + */ + public long getIdleQueryCursorCheckFrequency() { + return idleQryCurCheckFreq; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/621ecac3/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java index f4ddd59..54cdd29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -23,9 +23,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.query.Query; @@ -45,7 +47,6 @@ import org.apache.ignite.internal.processors.rest.request.RestQueryRequest; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; -import org.apache.ignite.lang.IgniteBiTuple; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLOSE_SQL_QUERY; import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SCAN_QUERY; @@ -68,25 +69,53 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { private static final AtomicLong qryIdGen = new AtomicLong(); /** Current queries cursors. */ - private final ConcurrentHashMap> qryCurs = new ConcurrentHashMap<>(); + private final ConcurrentHashMap qryCurs = new ConcurrentHashMap<>(); /** * @param ctx Context. */ public QueryCommandHandler(GridKernalContext ctx) { super(ctx); + + final long idleQryCurTimeout = ctx.config().getConnectorConfiguration().getIdleQueryCursorTimeout(); + + long idleQryCurCheckFreq = ctx.config().getConnectorConfiguration().getIdleQueryCursorCheckFrequency(); + + ctx.timeout().schedule(new Runnable() { + @Override public void run() { + long time = U.currentTimeMillis(); + + for (Map.Entry e : qryCurs.entrySet()) { + QueryCursorIterator qryCurIt = e.getValue(); + + long createTime = qryCurIt.timestamp(); + + if (createTime + idleQryCurTimeout > time && qryCurIt.tryLock()) { + try { + qryCurIt.timestamp(-1); + + qryCurs.remove(e.getKey(), qryCurIt); + + qryCurIt.close(); + } + finally { + qryCurIt.unlock(); + } + } + } + } + }, idleQryCurCheckFreq, idleQryCurCheckFreq); } /** - * @param qryCurs Query cursors. * @param cur Current cursor. * @param req Sql request. * @param qryId Query id. + * @param qryCurs Query cursors. * @return Query result with items. */ private static CacheQueryResult createQueryResult( - ConcurrentHashMap> qryCurs, - Iterator cur, RestQueryRequest req, Long qryId) { + Iterator cur, RestQueryRequest req, Long qryId, ConcurrentHashMap qryCurs) { CacheQueryResult res = new CacheQueryResult(); List items = new ArrayList<>(); @@ -101,12 +130,39 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { res.setQueryId(qryId); if (!cur.hasNext()) - qryCurs.remove(qryId); + removeQueryCursor(qryId, qryCurs); return res; } /** + * Removes query cursor. + * + * @param qryId Query id. + * @param qryCurs Query cursors. + */ + private static void removeQueryCursor(Long qryId, ConcurrentHashMap qryCurs) { + QueryCursorIterator qryCurIt = qryCurs.get(qryId); + + if (qryCurIt == null) + return; + + qryCurIt.lock(); + + try { + if (qryCurIt.timestamp() == -1) + return; + + qryCurIt.close(); + + qryCurs.remove(qryId); + } + finally { + qryCurIt.unlock(); + } + } + + /** * Creates class instance. * * @param cls Target class. @@ -169,7 +225,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { case FETCH_SQL_QUERY: { return ctx.closure().callLocalSafe( - new FetchQueryCallable(ctx, (RestQueryRequest)req, qryCurs), false); + new FetchQueryCallable((RestQueryRequest)req, qryCurs), false); } case CLOSE_SQL_QUERY: { @@ -191,16 +247,16 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestQueryRequest req; - /** Queries cursors. */ - private ConcurrentHashMap> qryCurs; + /** Current queries cursors. */ + private final ConcurrentHashMap qryCurs; /** * @param ctx Kernal context. * @param req Execute query request. - * @param qryCurs Queries cursors. + * @param qryCurs Query cursors. */ public ExecuteQueryCallable(GridKernalContext ctx, RestQueryRequest req, - ConcurrentHashMap> qryCurs) { + ConcurrentHashMap qryCurs) { this.ctx = ctx; this.req = req; this.qryCurs = qryCurs; @@ -208,7 +264,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { - long qryId = qryIdGen.getAndIncrement(); + final long qryId = qryIdGen.getAndIncrement(); try { Query qry; @@ -248,38 +304,51 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Failed to find cache with name: " + req.cacheName()); - QueryCursor qryCur = cache.query(qry); + final QueryCursor qryCur = cache.query(qry); Iterator cur = qryCur.iterator(); - qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur)); + QueryCursorIterator qryCurIt = new QueryCursorIterator(qryCur, cur); - CacheQueryResult res = createQueryResult(qryCurs, cur, req, qryId); + qryCurIt.lock(); - switch (req.queryType()) { - case SQL: - case SQL_FIELDS: - List fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); + try { + qryCurs.put(qryId, qryCurIt); - res.setFieldsMetadata(convertMetadata(fieldsMeta)); + CacheQueryResult res = createQueryResult(cur, req, qryId, qryCurs); - break; - case SCAN: - CacheQueryFieldsMetaResult keyField = new CacheQueryFieldsMetaResult(); - keyField.setFieldName("key"); + switch (req.queryType()) { + case SQL: + case SQL_FIELDS: + List fieldsMeta = ((QueryCursorImpl)qryCur).fieldsMeta(); - CacheQueryFieldsMetaResult valField = new CacheQueryFieldsMetaResult(); - valField.setFieldName("value"); + res.setFieldsMetadata(convertMetadata(fieldsMeta)); - res.setFieldsMetadata(U.sealList(keyField, valField)); + break; + case SCAN: + CacheQueryFieldsMetaResult keyField = new CacheQueryFieldsMetaResult(); + keyField.setFieldName("key"); - break; - } + CacheQueryFieldsMetaResult valField = new CacheQueryFieldsMetaResult(); + valField.setFieldName("value"); + + res.setFieldsMetadata(U.sealList(keyField, valField)); - return new GridRestResponse(res); + break; + } + + List fieldsMeta = ((QueryCursorImpl)qryCur).fieldsMeta(); + + res.setFieldsMetadata(convertMetadata(fieldsMeta)); + + return new GridRestResponse(res); + } + finally { + qryCurIt.unlock(); + } } catch (Exception e) { - qryCurs.remove(qryId); + removeQueryCursor(qryId, qryCurs); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -305,17 +374,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { * Close query callable. */ private static class CloseQueryCallable implements Callable { - /** Queries cursors. */ - private final ConcurrentHashMap> qryCurs; + /** Current queries cursors. */ + private final ConcurrentHashMap qryCurs; + /** Execute query request. */ private RestQueryRequest req; /** * @param req Execute query request. - * @param qryCurs Queries cursors. + * @param qryCurs Query cursors. */ - public CloseQueryCallable(RestQueryRequest req, - ConcurrentHashMap> qryCurs) { + public CloseQueryCallable(RestQueryRequest req, ConcurrentHashMap qryCurs) { this.req = req; this.qryCurs = qryCurs; } @@ -323,20 +392,29 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - QueryCursor cur = qryCurs.get(req.queryId()).get1(); + QueryCursorIterator qryCurIt = qryCurs.get(req.queryId()); - if (cur == null) - return new GridRestResponse(GridRestResponse.STATUS_FAILED, - "Failed to find query with ID: " + req.queryId()); + if (qryCurIt == null) + return new GridRestResponse(true); - cur.close(); + qryCurIt.lock(); - qryCurs.remove(req.queryId()); + try { + if (qryCurIt.timestamp() == -1) + return new GridRestResponse(true); + + qryCurIt.close(); + + qryCurs.remove(req.queryId()); + } + finally { + qryCurIt.unlock(); + } return new GridRestResponse(true); } catch (Exception e) { - qryCurs.remove(req.queryId()); + removeQueryCursor(req.queryId(), qryCurs); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -347,21 +425,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { * Fetch query callable. */ private static class FetchQueryCallable implements Callable { - /** Queries cursors. */ - private final ConcurrentHashMap> qryCurs; - /** Grid kernal context. */ - private final GridKernalContext ctx; + /** Current queries cursors. */ + private final ConcurrentHashMap qryCurs; + /** Execute query request. */ private RestQueryRequest req; /** - * @param ctx Grid kernal context. * @param req Execute query request. - * @param qryCurs Queries cursors. + * @param qryCurs Query cursors. */ - public FetchQueryCallable(GridKernalContext ctx, RestQueryRequest req, - ConcurrentHashMap> qryCurs) { - this.ctx = ctx; + public FetchQueryCallable(RestQueryRequest req, ConcurrentHashMap qryCurs) { this.req = req; this.qryCurs = qryCurs; } @@ -369,21 +443,91 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - Iterator cur = qryCurs.get(req.queryId()).get2(); + QueryCursorIterator qryCurIt = qryCurs.get(req.queryId()); - if (cur == null) + if (qryCurIt == null) return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId()); - CacheQueryResult res = createQueryResult(qryCurs, cur, req, req.queryId()); + qryCurIt.lock(); + + try { + if (qryCurIt.timestamp() == -1) + return new GridRestResponse(GridRestResponse.STATUS_FAILED, + "Query is closed by timeout. Restart query with ID: " + req.queryId()); + + qryCurIt.timestamp(U.currentTimeMillis()); - return new GridRestResponse(res); + Iterator cur = qryCurIt.iterator(); + + CacheQueryResult res = createQueryResult(cur, req, req.queryId(), qryCurs); + + return new GridRestResponse(res); + } + finally { + qryCurIt.unlock(); + } } catch (Exception e) { - qryCurs.remove(req.queryId()); + removeQueryCursor(req.queryId(), qryCurs); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } } } + + /** + * Query cursor iterator. + */ + private static class QueryCursorIterator extends ReentrantLock { + /** */ + private static final long serialVersionUID = 0L; + + /** Query cursor. */ + private QueryCursor cur; + + /** Query iterator. */ + private Iterator it; + + /** Last timestamp. */ + private volatile long ts; + + /** + * @param cur Query cursor. + * @param it Query iterator. + */ + public QueryCursorIterator(QueryCursor cur, Iterator it) { + this.cur = cur; + this.it = it; + ts = U.currentTimeMillis(); + } + + /** + * @return Query iterator. + */ + public Iterator iterator() { + return it; + } + + /** + * @return Timestamp. + */ + public long timestamp() { + return ts; + } + + /** + * @param time Current time or -1 if cursor is closed. + */ + public void timestamp(long time) { + ts = time; + } + + /** + * Close query cursor. + */ + public void close() { + cur.close(); + } + } }