ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [1/9] ignite git commit: IGNITE-2546 - Transformers for SCAN queries. Fixes #949.
Date Wed, 31 Aug 2016 03:40:05 GMT
Repository: ignite
Updated Branches:
  refs/heads/master d98cd3093 -> 8f697876a


IGNITE-2546 - Transformers for SCAN queries. Fixes #949.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/407071e4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/407071e4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/407071e4

Branch: refs/heads/master
Commit: 407071e466d1a4fcec86571d4791ace8bb206f53
Parents: 0465874
Author: Eduard Shangareev <eshangareev@gridgain.com>
Authored: Mon Aug 29 17:32:31 2016 -0700
Committer: Valentin Kulichenko <valentin.lulichenko@gmail.com>
Committed: Mon Aug 29 17:32:31 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     |  15 +
 .../processors/cache/IgniteCacheProxy.java      | 112 +++-
 .../processors/cache/query/CacheQuery.java      |  11 +-
 .../query/GridCacheDistributedQueryManager.java |  22 +-
 .../cache/query/GridCacheLocalQueryManager.java |   3 +-
 .../cache/query/GridCacheQueryAdapter.java      |  69 ++-
 .../cache/query/GridCacheQueryBean.java         |   8 +-
 .../cache/query/GridCacheQueryInfo.java         |   8 +-
 .../cache/query/GridCacheQueryManager.java      | 125 ++--
 .../cache/query/GridCacheQueryRequest.java      |   6 +-
 .../GridCacheQueryTransformerSelfTest.java      | 570 +++++++++++++++++++
 .../multijvm/IgniteCacheProcessProxy.java       |   6 +
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 13 files changed, 832 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 40eedaf..2290fc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -56,6 +56,7 @@ import org.apache.ignite.lang.IgniteAsyncSupport;
 import org.apache.ignite.lang.IgniteAsyncSupported;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
 import org.apache.ignite.transactions.TransactionHeuristicException;
@@ -295,6 +296,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
     public <R> QueryCursor<R> query(Query<R> qry);
 
     /**
+     * Queries the cache transforming the entries on the server nodes. Can be used, for example,
+     * to avoid network overhead in case only one field out of the large is required by client.
+     * <p>
+     * Currently transformers are supported ONLY for {@link ScanQuery}. Passing any other
+     * subclass of {@link Query} interface to this method will end up with
+     * {@link UnsupportedOperationException}.
+     *
+     * @param qry Query.
+     * @param transformer Transformer.
+     * @return Cursor.
+     */
+    public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer);
+
+    /**
      * Allows for iteration over local cache entries.
      *
      * @param peekModes Peek modes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 0d7bc6a..9b26c1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -466,50 +467,74 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /**
-     * @param filter Filter.
+     * @param scanQry ScanQry.
+     * @param transformer Transformer
      * @param grp Optional cluster group.
      * @return Cursor.
      */
     @SuppressWarnings("unchecked")
-    private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp)
+    private <T, R> QueryCursor<R> query(
+        final ScanQuery scanQry,
+        @Nullable final IgniteClosure<T, R> transformer,
+        @Nullable ClusterGroup grp)
         throws IgniteCheckedException {
-        final CacheQuery<Map.Entry<K, V>> qry;
+
+        final CacheQuery<R> qry;
 
         boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary();
 
-        if (filter instanceof ScanQuery) {
-            IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter();
+        IgniteBiPredicate<K, V> p = scanQry.getFilter();
 
-            qry = ctx.queries().createScanQuery(p, ((ScanQuery)filter).getPartition(), isKeepBinary);
+        qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(), isKeepBinary);
 
-            if (grp != null)
-                qry.projection(grp);
+        if (grp != null)
+            qry.projection(grp);
 
-            final GridCloseableIterator<Entry<K, V>> iter = ctx.kernalContext().query().executeQuery(ctx,
-                new IgniteOutClosureX<GridCloseableIterator<Entry<K, V>>>() {
-                    @Override public GridCloseableIterator<Entry<K, V>> applyx() throws IgniteCheckedException {
-                        final GridCloseableIterator<Map.Entry> iter0 = qry.executeScanQuery();
+        final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(ctx,
+            new IgniteOutClosureX<GridCloseableIterator<R>>() {
+                @Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException {
+                    final GridCloseableIterator iter0 = qry.executeScanQuery();
 
-                        return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() {
-                            @Override protected Cache.Entry<K, V> onNext() throws IgniteCheckedException {
-                                Map.Entry<K, V> next = iter0.nextX();
+                    final boolean needToConvert = transformer == null;
 
-                                return new CacheEntryImpl<>(next.getKey(), next.getValue());
-                            }
+                    return new GridCloseableIteratorAdapter<R>() {
+                        @Override protected R onNext() throws IgniteCheckedException {
+                            Object next = iter0.nextX();
 
-                            @Override protected boolean onHasNext() throws IgniteCheckedException {
-                                return iter0.hasNextX();
-                            }
+                            if (needToConvert) {
+                                Map.Entry<K, V> entry = (Map.Entry<K, V>)next;
 
-                            @Override protected void onClose() throws IgniteCheckedException {
-                                iter0.close();
+                                return (R) new CacheEntryImpl<>(entry.getKey(), entry.getValue());
                             }
-                        };
-                    }
-                }, false);
 
-            return new QueryCursorImpl<>(iter);
-        }
+                            return (R)next;
+                        }
+
+                        @Override protected boolean onHasNext() throws IgniteCheckedException {
+                            return iter0.hasNextX();
+                        }
+
+                        @Override protected void onClose() throws IgniteCheckedException {
+                            iter0.close();
+                        }
+                    };
+                }
+            }, false);
+
+        return new QueryCursorImpl<>(iter);
+    }
+
+    /**
+     * @param filter Filter.
+     * @param grp Optional cluster group.
+     * @return Cursor.
+     */
+    @SuppressWarnings("unchecked")
+    private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp)
+        throws IgniteCheckedException {
+        final CacheQuery<Map.Entry<K, V>> qry;
+
+        boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary();
 
         final CacheQueryFuture<Map.Entry<K, V>> fut;
 
@@ -692,6 +717,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
                 return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
             }
 
+            if (qry instanceof ScanQuery)
+                return query((ScanQuery)qry, null, projection(qry.isLocal()));
+
             return (QueryCursor<R>)query(qry, projection(qry.isLocal()));
         }
         catch (Exception e) {
@@ -705,6 +733,36 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
+        A.notNull(qry, "qry");
+        A.notNull(transformer, "transformer");
+
+        if (!(qry instanceof ScanQuery))
+            throw new UnsupportedOperationException("Transformers are supported only for SCAN queries.");
+
+        GridCacheGateway<K, V> gate = this.gate;
+
+        CacheOperationContext prev = onEnter(gate, opCtx);
+
+        try {
+            ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+            validate(qry);
+
+            return query((ScanQuery<K, V>)qry, transformer, projection(qry.isLocal()));
+        }
+        catch (Exception e) {
+            if (e instanceof CacheException)
+                throw (CacheException)e;
+
+            throw new CacheException(e);
+        }
+        finally {
+            onLeave(gate, prev);
+        }
+    }
+
     /**
      * @return {@code true} If this is a replicated cache and we are on a data node.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 47c6e89..3fa041b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -273,15 +273,6 @@ public interface CacheQuery<T> {
     public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args);
 
     /**
-     * Executes the query the same way as {@link #execute(Object...)} method but transforms result remotely.
-     *
-     * @param rmtTransform Remote transformer.
-     * @param args Optional arguments.
-     * @return Future for the query result.
-     */
-    public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, @Nullable Object... args);
-
-    /**
      * Gets metrics for this query.
      *
      * @return Query metrics.
@@ -296,5 +287,5 @@ public interface CacheQuery<T> {
     /**
      * @return Scan query iterator.
      */
-    public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException;
+    public GridCloseableIterator executeScanQuery() throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 5f6cb8f..d34047e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -253,7 +253,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
      */
     @Nullable private GridCacheQueryInfo distributedQueryInfo(UUID sndId, GridCacheQueryRequest req) {
         IgniteReducer<Object, Object> rdc = req.reducer();
-        IgniteClosure<Object, Object> trans = req.transformer();
+        IgniteClosure<Object, Object> trans = (IgniteClosure<Object, Object>)req.transformer();
 
         ClusterNode sndNode = cctx.node(sndId);
 
@@ -578,16 +578,16 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
 
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked", "serial"})
-    @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(final GridCacheQueryAdapter qry,
+    @Override public GridCloseableIterator scanQueryDistributed(final GridCacheQueryAdapter qry,
         Collection<ClusterNode> nodes) throws IgniteCheckedException {
         assert !cctx.isLocal() : cctx.name();
         assert qry.type() == GridCacheQueryType.SCAN: qry;
 
-        GridCloseableIterator<Map.Entry<K, V>> locIter0 = null;
+        GridCloseableIterator locIter0 = null;
 
         for (ClusterNode node : nodes) {
             if (node.isLocal()) {
-                locIter0 = (GridCloseableIterator)scanQueryLocal(qry, false);
+                locIter0 = scanQueryLocal(qry, false);
 
                 Collection<ClusterNode> rmtNodes = new ArrayList<>(nodes.size() - 1);
 
@@ -603,21 +603,21 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
             }
         }
 
-        final GridCloseableIterator<Map.Entry<K, V>> locIter = locIter0;
+        final GridCloseableIterator locIter = locIter0;
 
-        final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null);
+        final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, qry.<K, V>transform(), null);
 
-        final CacheQueryFuture<Map.Entry<K, V>> fut = (CacheQueryFuture<Map.Entry<K, V>>)queryDistributed(bean, nodes);
+        final CacheQueryFuture fut = (CacheQueryFuture)queryDistributed(bean, nodes);
 
-        return new GridCloseableIteratorAdapter<Map.Entry<K, V>>() {
+        return new GridCloseableIteratorAdapter() {
             /** */
-            private Map.Entry<K, V> cur;
+            private Object cur;
 
-            @Override protected Map.Entry<K, V> onNext() throws IgniteCheckedException {
+            @Override protected Object onNext() throws IgniteCheckedException {
                 if (!onHasNext())
                     throw new NoSuchElementException();
 
-                Map.Entry<K, V> e = cur;
+                Object e = cur;
 
                 cur = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
index 183abde..147725b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.query;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -90,7 +89,7 @@ public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry,
+    @Override public GridCloseableIterator scanQueryDistributed(GridCacheQueryAdapter qry,
         Collection<ClusterNode> nodes) throws IgniteCheckedException {
         assert cctx.isLocal() : cctx.name();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 90e14f4..f65b733 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -87,6 +87,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     /** */
     private final IgniteBiPredicate<Object, Object> filter;
 
+    /** Transformer. */
+    private IgniteClosure<?, ?> transform;
+
     /** Partition. */
     private Integer part;
 
@@ -126,6 +129,39 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     /**
      * @param cctx Context.
      * @param type Query type.
+     * @param filter Scan filter.
+     * @param part Partition.
+     * @param keepBinary Keep binary flag.
+     */
+    public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx,
+        GridCacheQueryType type,
+        @Nullable IgniteBiPredicate<Object, Object> filter,
+        @Nullable IgniteClosure<Map.Entry, Object> transform,
+        @Nullable Integer part,
+        boolean keepBinary) {
+        assert cctx != null;
+        assert type != null;
+        assert part == null || part >= 0;
+
+        this.cctx = cctx;
+        this.type = type;
+        this.filter = filter;
+        this.transform = transform;
+        this.part = part;
+        this.keepBinary = keepBinary;
+
+        log = cctx.logger(getClass());
+
+        metrics = new GridCacheQueryMetricsAdapter();
+
+        this.incMeta = false;
+        this.clsName = null;
+        this.clause = null;
+    }
+
+    /**
+     * @param cctx Context.
+     * @param type Query type.
      * @param clsName Class name.
      * @param clause Clause.
      * @param filter Scan filter.
@@ -376,6 +412,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     }
 
     /**
+     * @return Transformer.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable public <K, V> IgniteClosure<Map.Entry<K, V>, Object> transform() {
+        return (IgniteClosure<Map.Entry<K, V>, Object>) transform;
+    }
+
+    /**
      * @return Partition.
      */
     @Nullable public Integer partition() {
@@ -402,17 +446,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
     /** {@inheritDoc} */
     @Override public CacheQueryFuture<T> execute(@Nullable Object... args) {
-        return execute(null, null, args);
+        return execute0(null, args);
     }
 
     /** {@inheritDoc} */
     @Override public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args) {
-        return execute(rmtReducer, null, args);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, @Nullable Object... args) {
-        return execute(null, rmtTransform, args);
+        return execute0(rmtReducer, args);
     }
 
     /** {@inheritDoc} */
@@ -427,13 +466,11 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
     /**
      * @param rmtReducer Optional reducer.
-     * @param rmtTransform Optional transformer.
      * @param args Arguments.
      * @return Future.
      */
     @SuppressWarnings({"IfMayBeConditional", "unchecked"})
-    private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer,
-        @Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) {
+    private <R> CacheQueryFuture<R> execute0(@Nullable IgniteReducer<T, R> rmtReducer, @Nullable Object... args) {
         assert type != SCAN : this;
 
         Collection<ClusterNode> nodes;
@@ -455,7 +492,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
         if (cctx.deploymentEnabled()) {
             try {
-                cctx.deploy().registerClasses(filter, rmtReducer, rmtTransform);
+                cctx.deploy().registerClasses(filter, rmtReducer);
                 cctx.deploy().registerClasses(args);
             }
             catch (IgniteCheckedException e) {
@@ -469,7 +506,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         taskHash = cctx.kernalContext().job().currentTaskNameHash();
 
         final GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer,
-            (IgniteClosure<Object, Object>)rmtTransform, args);
+            null, args);
 
         final GridCacheQueryManager qryMgr = cctx.queries();
 
@@ -484,8 +521,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"IfMayBeConditional", "unchecked"})
-    @Override public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException {
-        assert type == SCAN: "Wrong processing of qyery: " + type;
+    @Override public GridCloseableIterator executeScanQuery() throws IgniteCheckedException {
+        assert type == SCAN : "Wrong processing of qyery: " + type;
 
         Collection<ClusterNode> nodes = nodes();
 
@@ -508,7 +545,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         final GridCacheQueryManager qryMgr = cctx.queries();
 
         if (part != null && !cctx.isLocal())
-            return (GridCloseableIterator<R>)new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
+            return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
         else {
             boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
 
@@ -676,7 +713,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                 try {
                     GridCloseableIterator it = qryMgr.scanQueryLocal(qry, true);
 
-                    tuple= new T2(it, null);
+                    tuple = new T2(it, null);
                 }
                 catch (IgniteClientDisconnectedCheckedException e) {
                     throw CU.convertToCacheException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
index 5a4d693..286ddc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java
@@ -33,7 +33,7 @@ public class GridCacheQueryBean {
     private final IgniteReducer<Object, Object> rdc;
 
     /** */
-    private final IgniteClosure<Object, Object> trans;
+    private final IgniteClosure<?, ?> trans;
 
     /** */
     private final Object[] args;
@@ -45,7 +45,7 @@ public class GridCacheQueryBean {
      * @param args Optional arguments.
      */
     public GridCacheQueryBean(GridCacheQueryAdapter<?> qry, @Nullable IgniteReducer<Object, Object> rdc,
-        @Nullable IgniteClosure<Object, Object> trans, @Nullable Object[] args) {
+        @Nullable IgniteClosure<?, ?> trans, @Nullable Object[] args) {
         assert qry != null;
 
         this.qry = qry;
@@ -71,7 +71,7 @@ public class GridCacheQueryBean {
     /**
      * @return Transformer.
      */
-    @Nullable public IgniteClosure<Object, Object> transform() {
+    @Nullable public IgniteClosure<?, ?> transform() {
 
 
         return trans;
@@ -88,4 +88,4 @@ public class GridCacheQueryBean {
     @Override public String toString() {
         return S.toString(GridCacheQueryBean.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
index 8d2e67d..0a108d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java
@@ -31,7 +31,7 @@ class GridCacheQueryInfo {
     private boolean loc;
 
     /** */
-    private IgniteClosure<Object, Object> trans;
+    private IgniteClosure<?, ?> trans;
 
     /** */
     private IgniteReducer<Object, Object> rdc;
@@ -71,7 +71,7 @@ class GridCacheQueryInfo {
      */
     GridCacheQueryInfo(
         boolean loc,
-        IgniteClosure<Object, Object> trans,
+        IgniteClosure<?, ?> trans,
         IgniteReducer<Object, Object> rdc,
         GridCacheQueryAdapter<?> qry,
         GridCacheLocalQueryFuture<?, ?, ?> locFut,
@@ -117,7 +117,7 @@ class GridCacheQueryInfo {
     /**
      * @return Transformer.
      */
-    IgniteClosure<?, Object> transformer() {
+    IgniteClosure<?, ?> transformer() {
         return trans;
     }
 
@@ -167,4 +167,4 @@ class GridCacheQueryInfo {
     @Override public String toString() {
         return S.toString(GridCacheQueryInfo.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 163bac5..454ce04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -506,7 +506,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @return Iterator.
      * @throws IgniteCheckedException If failed.
      */
-    public abstract GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry,
+    public abstract GridCloseableIterator scanQueryDistributed(GridCacheQueryAdapter qry,
         Collection<ClusterNode> nodes) throws IgniteCheckedException;
 
     /**
@@ -1067,13 +1067,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         final GridDhtLocalPartition locPart0 = locPart;
 
         return new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), locNode, true) {
-                @Override protected void onClose() {
-                    super.onClose();
+            @Override protected void onClose() {
+                super.onClose();
 
-                    if (locPart0 != null)
-                        locPart0.release();
-                }
-            };
+                if (locPart0 != null)
+                    locPart0.release();
+            }
+        };
     }
 
     /**
@@ -1163,9 +1163,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                     key = (K)cctx.unwrapBinaryIfNeeded(key, keepBinary);
 
-                    if (filter != null || locNode) {
+                    if (filter != null || locNode)
                         val = (V)cctx.unwrapBinaryIfNeeded(val, keepBinary);
-                    }
 
                     if (filter != null && !filter.apply(key, val))
                         continue;
@@ -1187,14 +1186,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param locNode Local node.
      * @return Final key-value iterator.
      */
-    private GridIterator<IgniteBiTuple<K,V>> scanExpiryIterator(
+    private GridIterator<IgniteBiTuple<K, V>> scanExpiryIterator(
         final Iterator<Map.Entry<byte[], byte[]>> it,
         AffinityTopologyVersion topVer,
         @Nullable final IgniteBiPredicate<K, V> filter,
         ExpiryPolicy expPlc,
         final boolean keepBinary,
         boolean locNode) {
-        Iterator <K> keyIter = new Iterator<K>() {
+        Iterator<K> keyIter = new Iterator<K>() {
             /** {@inheritDoc} */
             @Override public boolean hasNext() {
                 return it.hasNext();
@@ -1267,10 +1266,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             try {
                 // Preparing query closures.
-                IgniteClosure<Object, Object> trans = (IgniteClosure<Object, Object>)qryInfo.transformer();
                 IgniteReducer<Object, Object> rdc = (IgniteReducer<Object, Object>)qryInfo.reducer();
 
-                injectResources(trans);
                 injectResources(rdc);
 
                 GridCacheQueryAdapter<?> qry = qryInfo.query();
@@ -1289,7 +1286,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 res = qryInfo.local() ?
                     executeFieldsQuery(qry, qryInfo.arguments(), qryInfo.local(), qry.subjectId(), taskName,
-                    recipient(qryInfo.senderId(), qryInfo.requestId())) :
+                        recipient(qryInfo.senderId(), qryInfo.requestId())) :
                     fieldsQueryResult(qryInfo, taskName);
 
                 // If metadata needs to be returned to user and cleaned from internal fields - copy it.
@@ -1460,10 +1457,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             try {
                 // Preparing query closures.
-                IgniteClosure<Map.Entry<K, V>, Object> trans =
-                    (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer();
+                IgniteClosure<Cache.Entry<K, V>, Object> trans =
+                    (IgniteClosure<Cache.Entry<K, V>, Object>)qryInfo.transformer();
 
-                IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
+                IgniteReducer<Cache.Entry<K, V>, Object> rdc = (IgniteReducer<Cache.Entry<K, V>, Object>)qryInfo.reducer();
 
                 injectResources(trans);
                 injectResources(rdc);
@@ -1481,13 +1478,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 res = loc ?
                     executeQuery(qry, qryInfo.arguments(), loc, qry.subjectId(), taskName,
-                    recipient(qryInfo.senderId(), qryInfo.requestId())) :
+                        recipient(qryInfo.senderId(), qryInfo.requestId())) :
                     queryResult(qryInfo, taskName);
 
                 iter = res.iterator(recipient(qryInfo.senderId(), qryInfo.requestId()));
                 type = res.type();
 
-                GridCacheAdapter<K, V> cache = cctx.cache();
+                final GridCacheAdapter<K, V> cache = cctx.cache();
 
                 if (log.isDebugEnabled())
                     log.debug("Received index iterator [iterHasNext=" + iter.hasNext() +
@@ -1518,7 +1515,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                         break;
                     }
 
-                    K key = row.getKey();
+                    final K key = row.getKey();
 
                     // Filter backups for SCAN queries, if it isn't partition scan.
                     // Other types are filtered in indexing manager.
@@ -1561,8 +1558,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     }
 
                     if (readEvt) {
-                        K key0  = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
-                        V val0  = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
+                        K key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
+                        V val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
 
                         switch (type) {
                             case SQL:
@@ -1630,27 +1627,33 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                         }
                     }
 
-                    Map.Entry<K, V> entry = F.t(key, val);
+                    if (rdc != null || trans != null) {
+                        Cache.Entry<K, V> entry;
 
-                    // Unwrap entry for reducer or transformer only.
-                    if (rdc != null || trans != null)
-                        entry = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(entry, qry.keepBinary());
+                        if (qry.keepBinary())
+                            entry = cache.<K, V>keepBinary().getEntry(key);
+                        else
+                            entry = cache.<K, V>getEntry(key);
 
-                    // Reduce.
-                    if (rdc != null) {
-                        if (!rdc.collect(entry) || !iter.hasNext()) {
-                            onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
+                        // Reduce.
+                        if (rdc != null) {
+                            if (!rdc.collect(entry) || !iter.hasNext()) {
+                                onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
 
-                            pageSent = true;
+                                pageSent = true;
 
-                            break;
+                                break;
+                            }
+                            else
+                                continue;
                         }
-                        else
-                            continue;
+
+                        data.add(trans != null ? trans.apply(entry) :
+                            !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
                     }
+                    else
+                        data.add(!loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
 
-                    data.add(trans != null ? trans.apply(entry) :
-                        !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
 
                     if (!loc) {
                         if (++cnt == pageSize || !iter.hasNext()) {
@@ -1720,7 +1723,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param updStatisticsIfNeeded Update statistics flag.
      */
     @SuppressWarnings({"unchecked", "serial"})
-    protected GridCloseableIterator<IgniteBiTuple<K, V>> scanQueryLocal(final GridCacheQueryAdapter qry,
+    protected GridCloseableIterator scanQueryLocal(final GridCacheQueryAdapter qry,
         final boolean updStatisticsIfNeeded) throws IgniteCheckedException {
         if (!enterBusy())
             throw new IllegalStateException("Failed to process query request (grid is stopping).");
@@ -1769,8 +1772,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
-            return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
-                @Override protected IgniteBiTuple<K, V> onNext() throws IgniteCheckedException {
+            return new GridCloseableIteratorAdapter<Object>() {
+                @Override protected Object onNext() throws IgniteCheckedException {
                     long start = statsEnabled ? System.nanoTime() : 0L;
 
                     IgniteBiTuple<K, V> next = iter.nextX();
@@ -1803,7 +1806,20 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             null));
                     }
 
-                    return next;
+                    IgniteClosure transform = qry.transform();
+
+                    if (transform == null)
+                        return next;
+
+                    Cache.Entry<K, V> entry;
+
+                    if (qry.keepBinary())
+                        entry = cctx.cache().keepBinary().getEntry(next.getKey());
+                     else
+                        entry = cctx.cache().getEntry(next.getKey());
+
+
+                    return transform.apply(entry);
                 }
 
                 @Override protected boolean onHasNext() throws IgniteCheckedException {
@@ -1832,7 +1848,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @return Iterator.
      * @throws IgniteCheckedException In case of error.
      */
-    private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
+    private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo,
+        String taskName) throws IgniteCheckedException {
         assert qryInfo != null;
 
         final UUID sndId = qryInfo.senderId();
@@ -2845,7 +2862,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             if (locNode)
                 return new IgniteBiTuple<>(key, val);
-            else{
+            else {
                 if (key instanceof CacheObject)
                     ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext());
 
@@ -3256,16 +3273,28 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param keepBinary Keep binary flag.
      * @return Created query.
      */
-    public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
+    public <R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
         @Nullable Integer part, boolean keepBinary) {
+        return createScanQuery(filter, null, part, keepBinary);
+    }
 
-        return new GridCacheQueryAdapter<>(cctx,
+    /**
+     * Creates user's predicate based scan query.
+     *
+     * @param filter Scan filter.
+     * @param part Partition.
+     * @param keepBinary Keep binary flag.
+     * @return Created query.
+     */
+    public <T, R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
+        @Nullable IgniteClosure<T, R> trans,
+        @Nullable Integer part, boolean keepBinary) {
+
+        return new GridCacheQueryAdapter(cctx,
             SCAN,
-            null,
-            null,
-            (IgniteBiPredicate<Object, Object>)filter,
+            filter,
+            trans,
             part,
-            false,
             keepBinary);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index f50fba0..5610bef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -84,7 +84,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
 
     /** */
     @GridDirectTransient
-    private IgniteClosure<Object, Object> trans;
+    private IgniteClosure<?, ?> trans;
 
     /** */
     private byte[] transBytes;
@@ -233,7 +233,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
         IgniteBiPredicate<Object, Object> keyValFilter,
         @Nullable Integer part,
         IgniteReducer<Object, Object> rdc,
-        IgniteClosure<Object, Object> trans,
+        IgniteClosure<?, ?> trans,
         int pageSize,
         boolean incBackups,
         Object[] args,
@@ -422,7 +422,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
     /**
      * @return Transformer.
      */
-    public IgniteClosure<Object, Object> transformer() {
+    public IgniteClosure<?, ?> transformer() {
         return trans;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
new file mode 100644
index 0000000..6b13e05
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SpiQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for scan query with transformer.
+ */
+public class GridCacheQueryTransformerSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+        cfg.setMarshaller(null);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(3);
+
+        Ignition.setClientMode(true);
+
+        startGrid();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetKeys() throws Exception {
+        IgniteCache<Integer, String> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, "val" + i);
+
+            IgniteClosure<Cache.Entry<Integer, String>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, String>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, String> e) {
+                        return e.getKey();
+                    }
+                };
+
+            List<Integer> keys = cache.query(new ScanQuery<Integer, String>(), transformer).getAll();
+
+            assertEquals(50, keys.size());
+
+            Collections.sort(keys);
+
+            for (int i = 0; i < 50; i++)
+                assertEquals(i, keys.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetKeysFiltered() throws Exception {
+        IgniteCache<Integer, String> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, "val" + i);
+
+            IgniteBiPredicate<Integer, String> filter = new IgniteBiPredicate<Integer, String>() {
+                @Override public boolean apply(Integer k, String v) {
+                    return k % 10 == 0;
+                }
+            };
+
+            IgniteClosure<Cache.Entry<Integer, String>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, String>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, String> e) {
+                        return e.getKey();
+                    }
+                };
+
+            List<Integer> keys = cache.query(new ScanQuery<>(filter), transformer).getAll();
+
+            assertEquals(5, keys.size());
+
+            Collections.sort(keys);
+
+            for (int i = 0; i < 5; i++)
+                assertEquals(i * 10, keys.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetObjectField() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+                        return e.getValue().idx;
+                    }
+                };
+
+            List<Integer> res = cache.query(new ScanQuery<Integer, Value>(), transformer).getAll();
+
+            assertEquals(50, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 50; i++)
+                assertEquals(i * 100, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetObjectFieldFiltered() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            IgniteBiPredicate<Integer, Value> filter = new IgniteBiPredicate<Integer, Value>() {
+                @Override public boolean apply(Integer k, Value v) {
+                    return v.idx % 1000 == 0;
+                }
+            };
+
+            IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+                        return e.getValue().idx;
+                    }
+                };
+
+            List<Integer> res = cache.query(new ScanQuery<>(filter), transformer).getAll();
+
+            assertEquals(5, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 5; i++)
+                assertEquals(i * 1000, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testKeepBinary() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            IgniteCache<Integer, BinaryObject> binaryCache = cache.withKeepBinary();
+
+            IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+                        return e.getValue().field("idx");
+                    }
+                };
+
+            List<Integer> res = binaryCache.query(new ScanQuery<Integer, BinaryObject>(), transformer).getAll();
+
+            assertEquals(50, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 50; i++)
+                assertEquals(i * 100, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testKeepBinaryFiltered() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            IgniteCache<Integer, BinaryObject> binaryCache = cache.withKeepBinary();
+
+            IgniteBiPredicate<Integer, BinaryObject> filter = new IgniteBiPredicate<Integer, BinaryObject>() {
+                @Override public boolean apply(Integer k, BinaryObject v) {
+                    return v.<Integer>field("idx") % 1000 == 0;
+                }
+            };
+
+            IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+                        return e.getValue().field("idx");
+                    }
+                };
+
+            List<Integer> res = binaryCache.query(new ScanQuery<>(filter), transformer).getAll();
+
+            assertEquals(5, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 5; i++)
+                assertEquals(i * 1000, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocal() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public List<Integer> call() throws Exception {
+                    IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+                        new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+                            @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+                                return e.getValue().idx;
+                            }
+                        };
+
+                    return ignite.cache("test-cache").query(new ScanQuery<Integer, Value>().setLocal(true),
+                        transformer).getAll();
+                }
+            });
+
+            List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+            assertEquals(50, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 50; i++)
+                assertEquals(i * 100, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalFiltered() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public List<Integer> call() throws Exception {
+                    IgniteBiPredicate<Integer, Value> filter = new IgniteBiPredicate<Integer, Value>() {
+                        @Override public boolean apply(Integer k, Value v) {
+                            return v.idx % 1000 == 0;
+                        }
+                    };
+
+                    IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+                        new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+                            @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+                                return e.getValue().idx;
+                            }
+                        };
+
+                    return ignite.cache("test-cache").query(new ScanQuery<>(filter).setLocal(true),
+                        transformer).getAll();
+                }
+            });
+
+            List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+            assertEquals(5, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 5; i++)
+                assertEquals(i * 1000, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalKeepBinary() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public List<Integer> call() throws Exception {
+                    IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+                        new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+                            @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+                                return e.getValue().field("idx");
+                            }
+                        };
+
+                    return ignite.cache("test-cache").withKeepBinary().query(
+                        new ScanQuery<Integer, BinaryObject>().setLocal(true), transformer).getAll();
+                }
+            });
+
+            List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+            assertEquals(50, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 50; i++)
+                assertEquals(i * 100, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalKeepBinaryFiltered() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public List<Integer> call() throws Exception {
+                    IgniteBiPredicate<Integer, BinaryObject> filter = new IgniteBiPredicate<Integer, BinaryObject>() {
+                        @Override public boolean apply(Integer k, BinaryObject v) {
+                            return v.<Integer>field("idx") % 1000 == 0;
+                        }
+                    };
+
+                    IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer =
+                        new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() {
+                            @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) {
+                                return e.getValue().field("idx");
+                            }
+                        };
+
+                    return ignite.cache("test-cache").withKeepBinary().query(new ScanQuery<>(filter).setLocal(true),
+                        transformer).getAll();
+                }
+            });
+
+            List<Integer> res = new ArrayList<>(F.flatCollections(lists));
+
+            assertEquals(5, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < 5; i++)
+                assertEquals(i * 1000, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUnsupported() throws Exception {
+        final IgniteCache<Integer, Integer> cache = grid().createCache("test-cache");
+
+        final IgniteClosure<Cache.Entry<Integer, Integer>, Integer> transformer =
+            new IgniteClosure<Cache.Entry<Integer, Integer>, Integer>() {
+                @Override public Integer apply(Cache.Entry<Integer, Integer> e) {
+                    return null;
+                }
+            };
+
+        try {
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.query(new SqlQuery<Integer, Integer>(Integer.class, "clause"), transformer);
+
+                        return null;
+                    }
+                },
+                UnsupportedOperationException.class,
+                "Transformers are supported only for SCAN queries."
+            );
+
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.query(new SqlFieldsQuery("clause"), new IgniteClosure<List<?>, Object>() {
+                            @Override public Object apply(List<?> objects) {
+                                return null;
+                            }
+                        });
+
+                        return null;
+                    }
+                },
+                UnsupportedOperationException.class,
+                "Transformers are supported only for SCAN queries."
+            );
+
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.query(new TextQuery<Integer, Integer>(Integer.class, "clause"), transformer);
+
+                        return null;
+                    }
+                },
+                UnsupportedOperationException.class,
+                "Transformers are supported only for SCAN queries."
+            );
+
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.query(new SpiQuery<Integer, Integer>(), transformer);
+
+                        return null;
+                    }
+                },
+                UnsupportedOperationException.class,
+                "Transformers are supported only for SCAN queries."
+            );
+
+            GridTestUtils.assertThrows(
+                log,
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.query(new ContinuousQuery<Integer, Integer>(), transformer);
+
+                        return null;
+                    }
+                },
+                UnsupportedOperationException.class,
+                "Transformers are supported only for SCAN queries."
+            );
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     */
+    private static class Value {
+        /** */
+        @SuppressWarnings("unused")
+        private String str;
+
+        /** */
+        private int idx;
+
+        /**
+         * @param str String.
+         * @param idx Integer.
+         */
+        public Value(String str, int idx) {
+            this.str = str;
+            this.idx = idx;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
index 740b201..71dc964 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
 import org.apache.ignite.resources.IgniteInstanceResource;
@@ -172,6 +173,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
+        throw new UnsupportedOperationException("Method should be supported.");
+    }
+
+    /** {@inheritDoc} */
     @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
         return compute.call(new LocalEntriesTask<K, V>(cacheName, isAsync, peekModes));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 1b1908d..3652acd 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryTransformerSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
 import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
@@ -115,6 +116,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteBinaryWrappedObjectFieldsQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheQueryH2IndexingLeakTest.class);
         suite.addTestSuite(IgniteCacheQueryNoRebalanceSelfTest.class);
+        suite.addTestSuite(GridCacheQueryTransformerSelfTest.class);
 
         return suite;
     }


Mime
View raw message