Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 84AB118D5C for ; Thu, 11 Jun 2015 07:20:33 +0000 (UTC) Received: (qmail 16490 invoked by uid 500); 11 Jun 2015 07:20:33 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 16460 invoked by uid 500); 11 Jun 2015 07:20:33 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 16446 invoked by uid 99); 11 Jun 2015 07:20:33 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jun 2015 07:20:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id EA50C1A4A3A for ; Thu, 11 Jun 2015 07:20:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id mwXPrXc7BoX9 for ; Thu, 11 Jun 2015 07:20:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 9C6494BB8C for ; Thu, 11 Jun 2015 07:20:26 +0000 (UTC) Received: (qmail 15072 invoked by uid 99); 11 Jun 2015 07:20:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jun 2015 07:20:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1188AE05DD; Thu, 11 Jun 2015 07:20:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 11 Jun 2015 07:20:30 -0000 Message-Id: <824b363161044f7c9d86cade287efaff@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/50] incubator-ignite git commit: IGNITE-389 - Merge branch ignite-sprint-5 into ignite-389 IGNITE-389 - Merge branch ignite-sprint-5 into ignite-389 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2aa1ace0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2aa1ace0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2aa1ace0 Branch: refs/heads/ignite-998 Commit: 2aa1ace0cdbf0fbbbcd5893958bddb7869742ce0 Parents: d0157d4 Author: Alexey Goncharuk Authored: Tue Jun 2 19:34:49 2015 -0700 Committer: Alexey Goncharuk Committed: Tue Jun 2 19:34:49 2015 -0700 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 8 +- .../processors/cache/QueryCursorImpl.java | 23 ++-- .../processors/cache/query/QueryCursorEx.java | 8 ++ .../processors/query/GridQueryIndexing.java | 2 +- .../processors/query/GridQueryProcessor.java | 13 ++- ...niteDynamicCacheWithConfigStartSelfTest.java | 108 +++++++++++++++++++ .../processors/query/h2/IgniteH2Indexing.java | 43 +++++--- .../h2/twostep/GridReduceQueryExecutor.java | 8 +- .../cache/GridCacheCrossCacheQuerySelfTest.java | 12 ++- .../cache/IgniteCacheAbstractQuerySelfTest.java | 23 ++++ modules/spark/pom.xml | 18 ++-- .../org/apache/ignite/spark/IgniteContext.scala | 3 + .../org/apache/ignite/spark/IgniteRDD.scala | 68 ++++++++++-- .../spark/examples/IgniteProcessExample.scala | 2 +- .../org/apache/ignite/spark/IgniteRddSpec.scala | 38 +++---- 15 files changed, 291 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 176543b..b3914e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -497,10 +497,14 @@ public class IgniteCacheProxy extends AsyncSupportAdapter)queryContinuous((ContinuousQuery)qry, qry.isLocal()); if (qry instanceof SqlQuery) { - SqlQuery p = (SqlQuery)qry; + final SqlQuery p = (SqlQuery)qry; if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) - return (QueryCursor)new QueryCursorImpl<>(ctx.kernalContext().query().queryLocal(ctx, p)); + return (QueryCursor)new QueryCursorImpl<>(new Iterable>() { + @Override public Iterator> iterator() { + return ctx.kernalContext().query().queryLocal(ctx, p); + } + }); return (QueryCursor)ctx.kernalContext().query().queryTwoStep(ctx, p); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java index 7cb9efc..d68c377 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java @@ -27,6 +27,9 @@ import java.util.*; * Query cursor implementation. */ public class QueryCursorImpl implements QueryCursorEx { + /** Query executor. */ + private Iterable iterExec; + /** */ private Iterator iter; @@ -34,18 +37,18 @@ public class QueryCursorImpl implements QueryCursorEx { private boolean iterTaken; /** */ - private Collection fieldsMeta; + private List fieldsMeta; /** - * @param iter Iterator. + * @param iterExec Query executor. */ - public QueryCursorImpl(Iterator iter) { - this.iter = iter; + public QueryCursorImpl(Iterable iterExec) { + this.iterExec = iterExec; } /** {@inheritDoc} */ @Override public Iterator iterator() { - if (iter == null) + if (iter == null && iterTaken) throw new IgniteException("Cursor is closed."); if (iterTaken) @@ -53,12 +56,16 @@ public class QueryCursorImpl implements QueryCursorEx { iterTaken = true; + iter = iterExec.iterator(); + + assert iter != null; + return iter; } /** {@inheritDoc} */ @Override public List getAll() { - ArrayList all = new ArrayList<>(); + List all = new ArrayList<>(); try { for (T t : this) // Implicitly calls iterator() to do all checks. @@ -103,14 +110,14 @@ public class QueryCursorImpl implements QueryCursorEx { /** * @param fieldsMeta SQL Fields query result metadata. */ - public void fieldsMeta(Collection fieldsMeta) { + public void fieldsMeta(List fieldsMeta) { this.fieldsMeta = fieldsMeta; } /** * @return SQL Fields query result metadata. */ - public Collection fieldsMeta() { + @Override public List fieldsMeta() { return fieldsMeta; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java index bf1d4ea..5e19b99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java @@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.query; import org.apache.ignite.*; import org.apache.ignite.cache.query.*; +import org.apache.ignite.internal.processors.query.*; + +import java.util.*; /** * Extended query cursor interface allowing for "getAll" to output data into destination other than Collection. @@ -32,6 +35,11 @@ public interface QueryCursorEx extends QueryCursor { public void getAll(Consumer c) throws IgniteCheckedException; /** + * @return Query metadata. + */ + public List fieldsMeta(); + + /** * Query value consumer. */ public static interface Consumer { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 0bb820d..cc0916a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -60,7 +60,7 @@ public interface GridQueryIndexing { * @param qry Query. * @return Cursor. */ - public QueryCursor> queryTwoStep(GridCacheContext cctx, GridCacheTwoStepQuery qry); + public Iterable> queryTwoStep(GridCacheContext cctx, GridCacheTwoStepQuery qry); /** * Parses SQL query into two step query and executes it. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index cd4d543..31337ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -532,7 +532,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param qry Query. * @return Cursor. */ - public QueryCursor> queryTwoStep(String space, GridCacheTwoStepQuery qry) { + public Iterable> queryTwoStep(String space, GridCacheTwoStepQuery qry) { checkxEnabled(); if (!busyLock.enterBusy()) @@ -670,7 +670,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param qry Query. * @return Iterator. */ - public QueryCursor> queryLocalFields(GridCacheContext cctx, SqlFieldsQuery qry) { + public QueryCursor> queryLocalFields(final GridCacheContext cctx, SqlFieldsQuery qry) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); @@ -679,7 +679,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { String sql = qry.getSql(); Object[] args = qry.getArgs(); - GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter()); + final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter()); if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { ctx.event().record(new CacheQueryExecutedEvent<>( @@ -697,8 +697,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { null)); } - QueryCursorImpl> cursor = new QueryCursorImpl<>( - new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable())); + QueryCursorImpl> cursor = new QueryCursorImpl<>(new Iterable>() { + @Override public Iterator> iterator() { + return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable()); + } + }); cursor.fieldsMeta(res.metaData()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java new file mode 100644 index 0000000..704cf26 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE_NAME = "partitioned"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + if (!client) + cfg.setCacheConfiguration(cacheConfiguration(gridName)); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration(String cacheName) { + CacheConfiguration ccfg = new CacheConfiguration<>(CACHE_NAME); + + ccfg.setIndexedTypes(String.class, String.class); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheOnClient() throws Exception { + int srvCnt = 3; + + startGrids(srvCnt); + + try { + client = true; + + int clientCnt = 12; + + IgniteEx[] clients = new IgniteEx[clientCnt]; + + for (int i = 0; i < clients.length; i++) + clients[i] = startGrid(i + srvCnt); + + final AtomicInteger idx = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Object call() throws Exception { + final int idx0 = idx.getAndIncrement(); + + ignite(idx0).cache(CACHE_NAME).get(1); + + return null; + } + }, clients.length, "runner"); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 200da77..6ec329f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -592,7 +592,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @throws SQLException If failed. */ private static List meta(ResultSetMetaData rsMeta) throws SQLException { - ArrayList meta = new ArrayList<>(rsMeta.getColumnCount()); + List meta = new ArrayList<>(rsMeta.getColumnCount()); for (int i = 1; i <= rsMeta.getColumnCount(); i++) { String schemaName = rsMeta.getSchemaName(i); @@ -771,8 +771,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public QueryCursor> queryTwoStep(GridCacheContext cctx, GridCacheTwoStepQuery qry) { - return rdcQryExec.query(cctx, qry); + @Override public Iterable> queryTwoStep(final GridCacheContext cctx, final GridCacheTwoStepQuery qry) { + return new Iterable>() { + @Override public Iterator> iterator() { + return rdcQryExec.query(cctx, qry); + } + }; } /** {@inheritDoc} */ @@ -802,25 +806,30 @@ public class IgniteH2Indexing implements GridQueryIndexing { final QueryCursor> res = queryTwoStep(cctx, fqry); - final Iterator> iter0 = res.iterator(); + final Iterable> converted = new Iterable>() { + @Override public Iterator> iterator() { + final Iterator> iter0 = res.iterator(); - Iterator> iter = new Iterator>() { - @Override public boolean hasNext() { - return iter0.hasNext(); - } + return new Iterator>() { + @Override public boolean hasNext() { + return iter0.hasNext(); + } - @Override public Cache.Entry next() { - List l = iter0.next(); + @Override public Cache.Entry next() { + List l = iter0.next(); - return new CacheEntryImpl<>((K)l.get(0),(V)l.get(1)); - } + return new CacheEntryImpl<>((K)l.get(0),(V)l.get(1)); + } - @Override public void remove() { - throw new UnsupportedOperationException(); + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; } }; - return new QueryCursorImpl>(iter) { + // No metadata for SQL queries. + return new QueryCursorImpl>(converted) { @Override public void close() { res.close(); } @@ -844,7 +853,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } GridCacheTwoStepQuery twoStepQry; - Collection meta; + List meta; try { twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), qry.isCollocated()); @@ -863,7 +872,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { twoStepQry.pageSize(qry.getPageSize()); - QueryCursorImpl> cursor = (QueryCursorImpl>)queryTwoStep(cctx, twoStepQry); + QueryCursorImpl> cursor = new QueryCursorImpl<>(queryTwoStep(cctx, twoStepQry)); cursor.fieldsMeta(meta); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 50c30a5..cfacfcf 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -269,7 +269,7 @@ public class GridReduceQueryExecutor { * @param qry Query. * @return Cursor. */ - public QueryCursor> query(GridCacheContext cctx, GridCacheTwoStepQuery qry) { + public Iterator> query(GridCacheContext cctx, GridCacheTwoStepQuery qry) { long qryReqId = reqIdGen.incrementAndGet(); QueryRun r = new QueryRun(); @@ -356,7 +356,7 @@ public class GridReduceQueryExecutor { // dropTable(r.conn, tbl.getName()); TODO } - return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable())); + return new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable()); } catch (IgniteCheckedException | InterruptedException | RuntimeException e) { U.closeQuiet(r.conn); @@ -381,7 +381,7 @@ public class GridReduceQueryExecutor { * @return Cursor for plans. * @throws IgniteCheckedException if failed. */ - private QueryCursor> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry) + private Iterator> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry) throws IgniteCheckedException { List> lists = new ArrayList<>(); @@ -403,7 +403,7 @@ public class GridReduceQueryExecutor { lists.add(F.asList(getPlan(rs))); - return new QueryCursorImpl<>(lists.iterator()); + return lists.iterator(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index 4e9bf31..dd7c879 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -127,9 +127,17 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { q.addMapQuery("_cnts_", "select count(*) x from \"partitioned\".FactPurchase where ? = ?", 2, 2); - Object cnt = qryProc.queryTwoStep(cache, q).getAll().iterator().next().get(0); + Iterator> it = qryProc.queryTwoStep(cache, q).iterator(); - assertEquals(10L, cnt); + try { + Object cnt = it.next().get(0); + + assertEquals(10L, cnt); + } + finally { + if (it instanceof AutoCloseable) + ((AutoCloseable)it).close(); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index fa62361..0d45711 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.distributed.replicated.*; import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -987,6 +988,28 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac /** * @throws Exception If failed. */ + public void testFieldsQueryMetadata() throws Exception { + IgniteCache cache = ignite.cache(null); + + for (int i = 0; i < 100; i++) + cache.put(UUID.randomUUID(), new Person("name-" + i, (i + 1) * 100)); + + QueryCursor> cur = cache.query(new SqlFieldsQuery("select name, salary from Person where name like ?") + .setArgs("name-")); + + assertTrue(cur instanceof QueryCursorEx); + + QueryCursorEx> curEx = (QueryCursorEx>)cur; + + List meta = curEx.fieldsMeta(); + + assertNotNull(meta); + assertEquals(2, meta.size()); + } + + /** + * @throws Exception If failed. + */ private void checkSqlQueryEvents() throws Exception { final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml index 84055d6..a4a25f5 100644 --- a/modules/spark/pom.xml +++ b/modules/spark/pom.xml @@ -31,7 +31,7 @@ ignite-spark - 1.0.7-SNAPSHOT + 1.1.1-SNAPSHOT @@ -58,16 +58,12 @@ org.apache.spark spark-core_2.10 1.3.1 - - - com.twitter - chill_2.11 - - - com.twitter - chill-java - - + + + + org.apache.spark + spark-sql_2.10 + 1.3.1 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala index 6259665..5cdbad0 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -22,6 +22,7 @@ import org.apache.ignite.internal.IgnitionEx import org.apache.ignite.{Ignition, Ignite} import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext /** * Ignite context. @@ -42,6 +43,8 @@ class IgniteContext[K, V]( this(sc, () ⇒ IgnitionEx.loadConfiguration(springUrl).get1()) } + val sqlContext = new SQLContext(sparkContext) + def fromCache(cacheName: String): IgniteRDD[K, V] = { new IgniteRDD[K, V](this, cacheName, null) } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index f286b58..0b8e845 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -18,14 +18,18 @@ package org.apache.ignite.spark import javax.cache.Cache -import org.apache.ignite.cache.query.{SqlFieldsQuery, SqlQuery, ScanQuery} +import org.apache.ignite.cache.query._ import org.apache.ignite.cluster.ClusterNode import org.apache.ignite.configuration.CacheConfiguration +import org.apache.ignite.internal.processors.cache.query.QueryCursorEx +import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata import org.apache.ignite.lang.IgniteUuid -import org.apache.ignite.spark.impl.{IgniteAbstractRDD, IgniteSqlRDD, IgnitePartition, IgniteQueryIterator} +import org.apache.ignite.spark.impl._ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode import org.apache.spark.rdd.RDD -import org.apache.spark.{TaskContext, Partition} +import org.apache.spark.sql.types._ +import org.apache.spark.sql._ +import org.apache.spark._ import scala.collection.JavaConversions._ @@ -98,12 +102,16 @@ class IgniteRDD[K, V] ( new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry ⇒ (entry.getKey, entry.getValue)) } - def sql(sql: String, args: Any*): RDD[Seq[Any]] = { + def sql(sql: String, args: Any*): DataFrame = { val qry = new SqlFieldsQuery(sql) qry.setArgs(args.map(_.asInstanceOf[Object]):_*) - new IgniteSqlRDD[Seq[Any], java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list ⇒ list) + val schema = buildSchema(ensureCache().query(qry).asInstanceOf[QueryCursorEx[java.util.List[_]]].fieldsMeta()) + + val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list ⇒ Row.fromSeq(list)) + + ic.sqlContext.createDataFrame(rowRdd, schema) } def saveValues(rdd: RDD[V]) = { @@ -138,10 +146,6 @@ class IgniteRDD[K, V] ( // Make sure to deploy the cache ensureCache() - val locNode = ig.cluster().localNode() - - val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode)) - val streamer = ig.dataStreamer[K, V](cacheName) try { @@ -159,7 +163,49 @@ class IgniteRDD[K, V] ( ensureCache().removeAll() } - private def affinityKeyFunc(value: V, node: ClusterNode): Object = { - IgniteUuid.randomUuid() + /** + * Builds spark schema from query metadata. + * + * @param fieldsMeta Fields metadata. + * @return Spark schema. + */ + private def buildSchema(fieldsMeta: java.util.List[GridQueryFieldMetadata]): StructType = { + new StructType(fieldsMeta.map(i ⇒ new StructField(i.fieldName(), dataType(i.fieldTypeName()), nullable = true)) + .toArray) + } + + /** + * Gets Spark data type based on type name. + * + * @param typeName Type name. + * @return Spark data type. + */ + private def dataType(typeName: String): DataType = typeName match { + case "java.lang.Boolean" ⇒ BooleanType + case "java.lang.Byte" ⇒ ByteType + case "java.lang.Short" ⇒ ShortType + case "java.lang.Integer" ⇒ IntegerType + case "java.lang.Long" ⇒ LongType + case "java.lang.Float" ⇒ FloatType + case "java.lang.Double" ⇒ DoubleType + case "java.lang.String" ⇒ StringType + case "java.util.Date" ⇒ DateType + case "java.sql.Timestamp" ⇒ TimestampType + case "[B" ⇒ BinaryType + + case _ ⇒ StructType(new Array[StructField](0)) // TODO Do we need to fill user types? + } + + /** + * Generates affinity key for given cluster node. + * + * @param value Value to generate key for. + * @param node Node to generate key for. + * @return Affinity key. + */ + private def affinityKeyFunc(value: V, node: ClusterNode): IgniteUuid = { + val aff = ic.ignite().affinity[IgniteUuid](cacheName) + + Stream.continually(IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node)).get } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala index db8b5a3..ab91c62 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala @@ -47,6 +47,6 @@ object IgniteProcessExample { ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId = ?", 20, 12).collect() // SQL fields query - val sqlRes: RDD[Seq[Any]] = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20) + val df = ignite.fromCache("indexed").sql("select name, age from Person where age > ?", 20) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala index 68273da..26ce693 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala @@ -117,39 +117,29 @@ class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME) + import ic.sqlContext.implicits._ + cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) - val res = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000).collect() + val df = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000) + + df.printSchema() + + val res = df.collect() assert(res.length == 1, "Invalid result length") - assert(50 == res(0).head, "Invalid result") + assert(50 == res(0)(0), "Invalid result") assert("name50" == res(0)(1), "Invalid result") assert(5000 == res(0)(2), "Invalid result") - assert(500 == cache.sql("select id from Entity where id > 500").count(), "Invalid count") - } - finally { - sc.stop() - } - } - - it("should successfully store values RDD") { - val sc = new SparkContext("local[*]", "test") - - try { - val ic = new IgniteContext[String, Entity](sc, - () ⇒ configuration("client", client = true)) + val df0 = cache.sql("select id, name, salary from Entity").where('NAME === "name50" and 'SALARY === 5000) - val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME) + val res0 = df0.collect() - cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) - - val res = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000).collect() - - assert(res.length == 1, "Invalid result length") - assert(50 == res(0).head, "Invalid result") - assert("name50" == res(0)(1), "Invalid result") - assert(5000 == res(0)(2), "Invalid result") + assert(res0.length == 1, "Invalid result length") + assert(50 == res0(0)(0), "Invalid result") + assert("name50" == res0(0)(1), "Invalid result") + assert(5000 == res0(0)(2), "Invalid result") assert(500 == cache.sql("select id from Entity where id > 500").count(), "Invalid count") }