ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [19/50] incubator-ignite git commit: IGNITE-389 - Merge branch ignite-sprint-5 into ignite-389
Date Thu, 11 Jun 2015 07:18:34 GMT
IGNITE-389 - Merge branch ignite-sprint-5 into ignite-389


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2aa1ace0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2aa1ace0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2aa1ace0

Branch: refs/heads/ignite-484-1
Commit: 2aa1ace0cdbf0fbbbcd5893958bddb7869742ce0
Parents: d0157d4
Author: Alexey Goncharuk <agoncharuk@gridgain.com>
Authored: Tue Jun 2 19:34:49 2015 -0700
Committer: Alexey Goncharuk <agoncharuk@gridgain.com>
Committed: Tue Jun 2 19:34:49 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |   8 +-
 .../processors/cache/QueryCursorImpl.java       |  23 ++--
 .../processors/cache/query/QueryCursorEx.java   |   8 ++
 .../processors/query/GridQueryIndexing.java     |   2 +-
 .../processors/query/GridQueryProcessor.java    |  13 ++-
 ...niteDynamicCacheWithConfigStartSelfTest.java | 108 +++++++++++++++++++
 .../processors/query/h2/IgniteH2Indexing.java   |  43 +++++---
 .../h2/twostep/GridReduceQueryExecutor.java     |   8 +-
 .../cache/GridCacheCrossCacheQuerySelfTest.java |  12 ++-
 .../cache/IgniteCacheAbstractQuerySelfTest.java |  23 ++++
 modules/spark/pom.xml                           |  18 ++--
 .../org/apache/ignite/spark/IgniteContext.scala |   3 +
 .../org/apache/ignite/spark/IgniteRDD.scala     |  68 ++++++++++--
 .../spark/examples/IgniteProcessExample.scala   |   2 +-
 .../org/apache/ignite/spark/IgniteRddSpec.scala |  38 +++----
 15 files changed, 291 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 176543b..b3914e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -497,10 +497,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
                 return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry,
qry.isLocal());
 
             if (qry instanceof SqlQuery) {
-                SqlQuery p = (SqlQuery)qry;
+                final SqlQuery p = (SqlQuery)qry;
 
                 if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
-                    return (QueryCursor<R>)new QueryCursorImpl<>(ctx.kernalContext().query().<K,
V>queryLocal(ctx, p));
+                    return (QueryCursor<R>)new QueryCursorImpl<>(new Iterable<Cache.Entry<K,
V>>() {
+                        @Override public Iterator<Cache.Entry<K, V>> iterator()
{
+                            return ctx.kernalContext().query().<K, V>queryLocal(ctx,
p);
+                        }
+                    });
 
                 return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx,
p);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index 7cb9efc..d68c377 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -27,6 +27,9 @@ import java.util.*;
  * Query cursor implementation.
  */
 public class QueryCursorImpl<T> implements QueryCursorEx<T> {
+    /** Query executor. */
+    private Iterable<T> iterExec;
+
     /** */
     private Iterator<T> iter;
 
@@ -34,18 +37,18 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T>
{
     private boolean iterTaken;
 
     /** */
-    private Collection<GridQueryFieldMetadata> fieldsMeta;
+    private List<GridQueryFieldMetadata> fieldsMeta;
 
     /**
-     * @param iter Iterator.
+     * @param iterExec Query executor.
      */
-    public QueryCursorImpl(Iterator<T> iter) {
-        this.iter = iter;
+    public QueryCursorImpl(Iterable<T> iterExec) {
+        this.iterExec = iterExec;
     }
 
     /** {@inheritDoc} */
     @Override public Iterator<T> iterator() {
-        if (iter == null)
+        if (iter == null && iterTaken)
             throw new IgniteException("Cursor is closed.");
 
         if (iterTaken)
@@ -53,12 +56,16 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T>
{
 
         iterTaken = true;
 
+        iter = iterExec.iterator();
+
+        assert iter != null;
+
         return iter;
     }
 
     /** {@inheritDoc} */
     @Override public List<T> getAll() {
-        ArrayList<T> all = new ArrayList<>();
+        List<T> all = new ArrayList<>();
 
         try {
             for (T t : this) // Implicitly calls iterator() to do all checks.
@@ -103,14 +110,14 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T>
{
     /**
      * @param fieldsMeta SQL Fields query result metadata.
      */
-    public void fieldsMeta(Collection<GridQueryFieldMetadata> fieldsMeta) {
+    public void fieldsMeta(List<GridQueryFieldMetadata> fieldsMeta) {
         this.fieldsMeta = fieldsMeta;
     }
 
     /**
      * @return SQL Fields query result metadata.
      */
-    public Collection<GridQueryFieldMetadata> fieldsMeta() {
+    @Override public List<GridQueryFieldMetadata> fieldsMeta() {
         return fieldsMeta;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
index bf1d4ea..5e19b99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.query;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
+
+import java.util.*;
 
 /**
  * Extended query cursor interface allowing for "getAll" to output data into destination
other than Collection.
@@ -32,6 +35,11 @@ public interface QueryCursorEx<T> extends QueryCursor<T> {
     public void getAll(Consumer<T> c) throws IgniteCheckedException;
 
     /**
+     * @return Query metadata.
+     */
+    public List<GridQueryFieldMetadata> fieldsMeta();
+
+    /**
      * Query value consumer.
      */
     public static interface Consumer<T> {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 0bb820d..cc0916a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -60,7 +60,7 @@ public interface GridQueryIndexing {
      * @param qry Query.
      * @return Cursor.
      */
-    public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx,
GridCacheTwoStepQuery qry);
+    public Iterable<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery
qry);
 
     /**
      * Parses SQL query into two step query and executes it.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index cd4d543..31337ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -532,7 +532,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param qry Query.
      * @return Cursor.
      */
-    public QueryCursor<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery
qry) {
+    public Iterable<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery
qry) {
         checkxEnabled();
 
         if (!busyLock.enterBusy())
@@ -670,7 +670,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param qry Query.
      * @return Iterator.
      */
-    public QueryCursor<List<?>> queryLocalFields(GridCacheContext<?,?>
cctx, SqlFieldsQuery qry) {
+    public QueryCursor<List<?>> queryLocalFields(final GridCacheContext<?,?>
cctx, SqlFieldsQuery qry) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
@@ -679,7 +679,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             String sql = qry.getSql();
             Object[] args = qry.getArgs();
 
-            GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
+            final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args),
idx.backupFilter());
 
             if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                 ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -697,8 +697,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                         null));
             }
 
-            QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
-                new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable()));
+            QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new
Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable());
+                }
+            });
 
             cursor.fieldsMeta(res.metaData());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
new file mode 100644
index 0000000..704cf26
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheWithConfigStartSelfTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteDynamicCacheWithConfigStartSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String CACHE_NAME = "partitioned";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        if (!client)
+            cfg.setCacheConfiguration(cacheConfiguration(gridName));
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+        ccfg.setIndexedTypes(String.class, String.class);
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnClient() throws Exception {
+        int srvCnt = 3;
+
+        startGrids(srvCnt);
+
+        try {
+            client = true;
+
+            int clientCnt = 12;
+
+            IgniteEx[] clients = new IgniteEx[clientCnt];
+
+            for (int i = 0; i < clients.length; i++)
+                clients[i] = startGrid(i + srvCnt);
+
+            final AtomicInteger idx = new AtomicInteger();
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    final int idx0 = idx.getAndIncrement();
+
+                    ignite(idx0).cache(CACHE_NAME).get(1);
+
+                    return null;
+                }
+            }, clients.length, "runner");
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 200da77..6ec329f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -592,7 +592,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws SQLException If failed.
      */
     private static List<GridQueryFieldMetadata> meta(ResultSetMetaData rsMeta) throws
SQLException {
-        ArrayList<GridQueryFieldMetadata> meta = new ArrayList<>(rsMeta.getColumnCount());
+        List<GridQueryFieldMetadata> meta = new ArrayList<>(rsMeta.getColumnCount());
 
         for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
             String schemaName = rsMeta.getSchemaName(i);
@@ -771,8 +771,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?>
cctx, GridCacheTwoStepQuery qry) {
-        return rdcQryExec.query(cctx, qry);
+    @Override public Iterable<List<?>> queryTwoStep(final GridCacheContext<?,?>
cctx, final GridCacheTwoStepQuery qry) {
+        return new Iterable<List<?>>() {
+            @Override public Iterator<List<?>> iterator() {
+                return rdcQryExec.query(cctx, qry);
+            }
+        };
     }
 
     /** {@inheritDoc} */
@@ -802,25 +806,30 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry);
 
-        final Iterator<List<?>> iter0 = res.iterator();
+        final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K,
V>>() {
+            @Override public Iterator<Cache.Entry<K, V>> iterator() {
+                final Iterator<List<?>> iter0 = res.iterator();
 
-        Iterator<Cache.Entry<K,V>> iter = new Iterator<Cache.Entry<K,V>>()
{
-            @Override public boolean hasNext() {
-                return iter0.hasNext();
-            }
+                return new Iterator<Cache.Entry<K,V>>() {
+                    @Override public boolean hasNext() {
+                        return iter0.hasNext();
+                    }
 
-            @Override public Cache.Entry<K,V> next() {
-                List<?> l = iter0.next();
+                    @Override public Cache.Entry<K,V> next() {
+                        List<?> l = iter0.next();
 
-                return new CacheEntryImpl<>((K)l.get(0),(V)l.get(1));
-            }
+                        return new CacheEntryImpl<>((K)l.get(0),(V)l.get(1));
+                    }
 
-            @Override public void remove() {
-                throw new UnsupportedOperationException();
+                    @Override public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
             }
         };
 
-        return new QueryCursorImpl<Cache.Entry<K,V>>(iter) {
+        // No metadata for SQL queries.
+        return new QueryCursorImpl<Cache.Entry<K,V>>(converted) {
             @Override public void close() {
                 res.close();
             }
@@ -844,7 +853,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
 
         GridCacheTwoStepQuery twoStepQry;
-        Collection<GridQueryFieldMetadata> meta;
+        List<GridQueryFieldMetadata> meta;
 
         try {
             twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(),
qry.isCollocated());
@@ -863,7 +872,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         twoStepQry.pageSize(qry.getPageSize());
 
-        QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)queryTwoStep(cctx,
twoStepQry);
+        QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(queryTwoStep(cctx,
twoStepQry));
 
         cursor.fieldsMeta(meta);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 50c30a5..cfacfcf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -269,7 +269,7 @@ public class GridReduceQueryExecutor {
      * @param qry Query.
      * @return Cursor.
      */
-    public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery
qry) {
+    public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery
qry) {
         long qryReqId = reqIdGen.incrementAndGet();
 
         QueryRun r = new QueryRun();
@@ -356,7 +356,7 @@ public class GridReduceQueryExecutor {
 //                dropTable(r.conn, tbl.getName()); TODO
             }
 
-            return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res),
cctx, cctx.keepPortable()));
+            return new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable());
         }
         catch (IgniteCheckedException | InterruptedException | RuntimeException e) {
             U.closeQuiet(r.conn);
@@ -381,7 +381,7 @@ public class GridReduceQueryExecutor {
      * @return Cursor for plans.
      * @throws IgniteCheckedException if failed.
      */
-    private QueryCursor<List<?>> explainPlan(JdbcConnection c, String space,
GridCacheTwoStepQuery qry)
+    private Iterator<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery
qry)
         throws IgniteCheckedException {
         List<List<?>> lists = new ArrayList<>();
 
@@ -403,7 +403,7 @@ public class GridReduceQueryExecutor {
 
         lists.add(F.asList(getPlan(rs)));
 
-        return new QueryCursorImpl<>(lists.iterator());
+        return lists.iterator();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 4e9bf31..dd7c879 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -127,9 +127,17 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest
{
 
         q.addMapQuery("_cnts_", "select count(*) x from \"partitioned\".FactPurchase where
? = ?", 2, 2);
 
-        Object cnt = qryProc.queryTwoStep(cache, q).getAll().iterator().next().get(0);
+        Iterator<List<?>> it = qryProc.queryTwoStep(cache, q).iterator();
 
-        assertEquals(10L, cnt);
+        try {
+            Object cnt = it.next().get(0);
+
+            assertEquals(10L, cnt);
+        }
+        finally {
+            if (it instanceof AutoCloseable)
+                ((AutoCloseable)it).close();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index fa62361..0d45711 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
 import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -987,6 +988,28 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
     /**
      * @throws Exception If failed.
      */
+    public void testFieldsQueryMetadata() throws Exception {
+        IgniteCache<UUID, Person> cache = ignite.cache(null);
+
+        for (int i = 0; i < 100; i++)
+            cache.put(UUID.randomUUID(), new Person("name-" + i, (i + 1) * 100));
+
+        QueryCursor<List<?>> cur = cache.query(new SqlFieldsQuery("select name,
salary from Person where name like ?")
+            .setArgs("name-"));
+
+        assertTrue(cur instanceof QueryCursorEx);
+
+        QueryCursorEx<List<?>> curEx = (QueryCursorEx<List<?>>)cur;
+
+        List<GridQueryFieldMetadata> meta = curEx.fieldsMeta();
+
+        assertNotNull(meta);
+        assertEquals(2, meta.size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     private void checkSqlQueryEvents() throws Exception {
         final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1
: gridCount());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index 84055d6..a4a25f5 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -31,7 +31,7 @@
     </parent>
 
     <artifactId>ignite-spark</artifactId>
-    <version>1.0.7-SNAPSHOT</version>
+    <version>1.1.1-SNAPSHOT</version>
 
     <dependencies>
         <dependency>
@@ -58,16 +58,12 @@
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.10</artifactId>
             <version>1.3.1</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.twitter</groupId>
-                    <artifactId>chill_2.11</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.twitter</groupId>
-                    <artifactId>chill-java</artifactId>
-                </exclusion>
-            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <version>1.3.1</version>
         </dependency>
 
         <!-- Test dependencies -->

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 6259665..5cdbad0 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.IgnitionEx
 import org.apache.ignite.{Ignition, Ignite}
 import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
 import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
 
 /**
  * Ignite context.
@@ -42,6 +43,8 @@ class IgniteContext[K, V](
         this(sc, () ⇒ IgnitionEx.loadConfiguration(springUrl).get1())
     }
 
+    val sqlContext = new SQLContext(sparkContext)
+
     def fromCache(cacheName: String): IgniteRDD[K, V] = {
         new IgniteRDD[K, V](this, cacheName, null)
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index f286b58..0b8e845 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -18,14 +18,18 @@ package org.apache.ignite.spark
 
 import javax.cache.Cache
 
-import org.apache.ignite.cache.query.{SqlFieldsQuery, SqlQuery, ScanQuery}
+import org.apache.ignite.cache.query._
 import org.apache.ignite.cluster.ClusterNode
 import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata
 import org.apache.ignite.lang.IgniteUuid
-import org.apache.ignite.spark.impl.{IgniteAbstractRDD, IgniteSqlRDD, IgnitePartition, IgniteQueryIterator}
+import org.apache.ignite.spark.impl._
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode
 import org.apache.spark.rdd.RDD
-import org.apache.spark.{TaskContext, Partition}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql._
+import org.apache.spark._
 
 import scala.collection.JavaConversions._
 
@@ -98,12 +102,16 @@ class IgniteRDD[K, V] (
         new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry
⇒ (entry.getKey, entry.getValue))
     }
 
-    def sql(sql: String, args: Any*): RDD[Seq[Any]] = {
+    def sql(sql: String, args: Any*): DataFrame = {
         val qry = new SqlFieldsQuery(sql)
 
         qry.setArgs(args.map(_.asInstanceOf[Object]):_*)
 
-        new IgniteSqlRDD[Seq[Any], java.util.List[_], K, V](ic, cacheName, cacheCfg, qry,
list ⇒ list)
+        val schema = buildSchema(ensureCache().query(qry).asInstanceOf[QueryCursorEx[java.util.List[_]]].fieldsMeta())
+
+        val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](ic, cacheName, cacheCfg,
qry, list ⇒ Row.fromSeq(list))
+
+        ic.sqlContext.createDataFrame(rowRdd, schema)
     }
 
     def saveValues(rdd: RDD[V]) = {
@@ -138,10 +146,6 @@ class IgniteRDD[K, V] (
             // Make sure to deploy the cache
             ensureCache()
 
-            val locNode = ig.cluster().localNode()
-
-            val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
-
             val streamer = ig.dataStreamer[K, V](cacheName)
 
             try {
@@ -159,7 +163,49 @@ class IgniteRDD[K, V] (
         ensureCache().removeAll()
     }
 
-    private def affinityKeyFunc(value: V, node: ClusterNode): Object = {
-        IgniteUuid.randomUuid()
+    /**
+     * Builds spark schema from query metadata.
+     *
+     * @param fieldsMeta Fields metadata.
+     * @return Spark schema.
+     */
+    private def buildSchema(fieldsMeta: java.util.List[GridQueryFieldMetadata]): StructType
= {
+        new StructType(fieldsMeta.map(i ⇒ new StructField(i.fieldName(), dataType(i.fieldTypeName()),
nullable = true))
+            .toArray)
+    }
+
+    /**
+     * Gets Spark data type based on type name.
+     *
+     * @param typeName Type name.
+     * @return Spark data type.
+     */
+    private def dataType(typeName: String): DataType = typeName match {
+        case "java.lang.Boolean" ⇒ BooleanType
+        case "java.lang.Byte" ⇒ ByteType
+        case "java.lang.Short" ⇒ ShortType
+        case "java.lang.Integer" ⇒ IntegerType
+        case "java.lang.Long" ⇒ LongType
+        case "java.lang.Float" ⇒ FloatType
+        case "java.lang.Double" ⇒ DoubleType
+        case "java.lang.String" ⇒ StringType
+        case "java.util.Date" ⇒ DateType
+        case "java.sql.Timestamp" ⇒ TimestampType
+        case "[B" ⇒ BinaryType
+
+        case _ ⇒ StructType(new Array[StructField](0)) // TODO Do we need to fill user
types?
+    }
+
+    /**
+     * Generates affinity key for given cluster node.
+     *
+     * @param value Value to generate key for.
+     * @param node Node to generate key for.
+     * @return Affinity key.
+     */
+    private def affinityKeyFunc(value: V, node: ClusterNode): IgniteUuid = {
+        val aff = ic.ignite().affinity[IgniteUuid](cacheName)
+
+        Stream.continually(IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node)).get
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
index db8b5a3..ab91c62 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
@@ -47,6 +47,6 @@ object IgniteProcessExample {
         ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId =
?", 20, 12).collect()
 
         // SQL fields query
-        val sqlRes: RDD[Seq[Any]] = ignite.fromCache("indexed").sql("select name, age from
Person where age > ?", 20)
+        val df = ignite.fromCache("indexed").sql("select name, age from Person where age
> ?", 20)
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2aa1ace0/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
index 68273da..26ce693 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala
@@ -117,39 +117,29 @@ class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll
with Be
 
                 val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME)
 
+                import ic.sqlContext.implicits._
+
                 cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i),
new Entity(i, "name" + i, i * 100))))
 
-                val res = cache.sql("select id, name, salary from Entity where name = ? and
salary = ?", "name50", 5000).collect()
+                val df = cache.sql("select id, name, salary from Entity where name = ? and
salary = ?", "name50", 5000)
+
+                df.printSchema()
+
+                val res = df.collect()
 
                 assert(res.length == 1, "Invalid result length")
-                assert(50 == res(0).head, "Invalid result")
+                assert(50 == res(0)(0), "Invalid result")
                 assert("name50" == res(0)(1), "Invalid result")
                 assert(5000 == res(0)(2), "Invalid result")
 
-                assert(500 == cache.sql("select id from Entity where id > 500").count(),
"Invalid count")
-            }
-            finally {
-                sc.stop()
-            }
-        }
-
-        it("should successfully store values RDD") {
-            val sc = new SparkContext("local[*]", "test")
-
-            try {
-                val ic = new IgniteContext[String, Entity](sc,
-                    () ⇒ configuration("client", client = true))
+                val df0 = cache.sql("select id, name, salary from Entity").where('NAME ===
"name50" and 'SALARY === 5000)
 
-                val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME)
+                val res0 = df0.collect()
 
-                cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i),
new Entity(i, "name" + i, i * 100))))
-
-                val res = cache.sql("select id, name, salary from Entity where name = ? and
salary = ?", "name50", 5000).collect()
-
-                assert(res.length == 1, "Invalid result length")
-                assert(50 == res(0).head, "Invalid result")
-                assert("name50" == res(0)(1), "Invalid result")
-                assert(5000 == res(0)(2), "Invalid result")
+                assert(res0.length == 1, "Invalid result length")
+                assert(50 == res0(0)(0), "Invalid result")
+                assert("name50" == res0(0)(1), "Invalid result")
+                assert(5000 == res0(0)(2), "Invalid result")
 
                 assert(500 == cache.sql("select id from Entity where id > 500").count(),
"Invalid count")
             }


Mime
View raw message