ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [20/20] ignite git commit: Merge branch 'master' into ignite-1186
Date Tue, 01 Mar 2016 11:09:46 GMT
Merge branch 'master' into ignite-1186


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/846e8e5d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/846e8e5d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/846e8e5d

Branch: refs/heads/ignite-1186
Commit: 846e8e5d46fcf206369a6dacd2b56a45ac33b1cd
Parents: 5455a9f 8917269
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Tue Mar 1 13:41:48 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Tue Mar 1 14:09:35 2016 +0300

----------------------------------------------------------------------
 .../internal/binary/BinaryClassDescriptor.java  |    2 +-
 .../processors/cache/GridCacheProcessor.java    |   20 +
 .../processors/cache/GridCacheUtils.java        |   15 +
 .../cache/affinity/GridCacheAffinityImpl.java   |    7 +-
 .../CacheContinuousQueryHandlerV2.java          |   11 +-
 .../continuous/CacheContinuousQueryManager.java |    2 +-
 .../processors/query/GridQueryProcessor.java    |   13 +-
 .../ignite/internal/visor/cache/VisorCache.java |    2 +-
 .../cache/VisorCacheAggregatedMetrics.java      |  113 +-
 .../internal/visor/cache/VisorCacheMetrics.java |   88 +-
 .../cache/VisorCacheMetricsCollectorTask.java   |   21 +-
 .../visor/cache/VisorCacheMetricsV2.java        |   66 +
 .../internal/visor/cache/VisorCacheV2.java      |    2 +-
 .../ignite/resources/JobContextResource.java    |    4 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   15 +
 .../ignite/spi/IgniteSpiConsistencyChecked.java |    8 +
 .../ignite/spi/discovery/tcp/ClientImpl.java    |    6 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |    2 +-
 .../TcpDiscoveryMulticastIpFinder.java          |    1 +
 .../spi/swapspace/file/FileSwapSpaceSpi.java    |    2 +
 .../spi/swapspace/noop/NoopSwapSpaceSpi.java    |    2 +
 .../cache/GridCacheAbstractFullApiSelfTest.java |   10 +-
 ...ridCacheSwapSpaceSpiConsistencySelfTest.java |  146 +
 .../IgniteCacheConfigVariationsFullApiTest.java | 5851 ++++++++++++++++++
 ...acheContinuousQueryRandomOperationsTest.java |   51 +
 .../GridSwapSpaceSpiConsistencySelfTest.java    |  131 +
 .../configvariations/CacheStartMode.java        |   29 +
 .../configvariations/ConfigFactory.java         |   39 +
 .../configvariations/ConfigParameter.java       |   34 +
 .../configvariations/ConfigVariations.java      |  346 ++
 .../ConfigVariationsFactory.java                |  197 +
 .../ConfigVariationsTestSuiteBuilder.java       |  382 ++
 .../IgniteConfigVariationsTestSuite.java        |   50 +
 .../configvariations/Parameters.java            |  377 ++
 .../configvariations/VariationsIterator.java    |  174 +
 .../configvariations/VariationsTestsConfig.java |  161 +
 .../testframework/junits/GridAbstractTest.java  |   43 +-
 ...IgniteCacheConfigVariationsAbstractTest.java |  583 ++
 .../IgniteConfigVariationsAbstractTest.java     |  420 ++
 .../ConfigVariationsTestSuiteBuilderTest.java   |  112 +
 .../testframework/test/ParametersTest.java      |   87 +
 .../test/VariationsIteratorTest.java            |  156 +
 .../ignite/testsuites/IgniteBasicTestSuite.java |    8 +
 ...heBasicConfigVariationsFullApiTestSuite.java |   41 +
 .../testsuites/IgniteCacheTestSuite5.java       |    2 +
 .../IgniteSpiSwapSpaceSelfTestSuite.java        |    2 +
 .../processors/query/h2/IgniteH2Indexing.java   |  104 +-
 .../cache/CacheQueryNewClientSelfTest.java      |  108 +
 .../IgniteCacheQuerySelfTestSuite.java          |    6 +-
 .../commands/cache/VisorCacheCommand.scala      |   30 +-
 .../yardstick/IgniteBenchmarkArguments.java     |   11 +
 .../org/apache/ignite/yardstick/IgniteNode.java |    2 +
 .../cache/IgniteCacheAbstractBenchmark.java     |   54 +
 53 files changed, 9994 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/846e8e5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index 6fc2041,0000000..dbe2a46
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@@ -1,217 -1,0 +1,222 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cache.query.continuous;
 +
 +import java.io.Externalizable;
 +import java.io.IOException;
 +import java.io.ObjectInput;
 +import java.io.ObjectOutput;
 +import java.util.UUID;
 +import javax.cache.configuration.Factory;
 +import javax.cache.event.CacheEntryEventFilter;
 +import javax.cache.event.CacheEntryUpdatedListener;
 +import org.apache.ignite.IgniteCheckedException;
 +import org.apache.ignite.internal.GridKernalContext;
 +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
 +import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 +import org.apache.ignite.internal.util.typedef.internal.S;
 +import org.apache.ignite.internal.util.typedef.internal.U;
 +import org.jetbrains.annotations.Nullable;
 +
 +/**
 + * Continuous query handler V2 version. Contains {@link Factory} for remote listener.
 + */
 +public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHandler<K,
V> {
 +    /** */
 +    private static final long serialVersionUID = 0L;
 +
 +    /** Remote filter factory. */
 +    private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
 +
 +    /** Deployable object for filter factory. */
 +    private DeployableObject rmtFilterFactoryDep;
 +
 +    /** Event types for JCache API. */
 +    private byte types = 0;
 +
 +    /** */
 +    protected transient CacheEntryEventFilter filter;
 +
 +    /**
 +     * Required by {@link Externalizable}.
 +     */
 +    public CacheContinuousQueryHandlerV2() {
 +        // No-op.
 +    }
 +
 +    /**
 +     * Constructor.
 +     *
 +     * @param cacheName Cache name.
 +     * @param topic Topic for ordered messages.
 +     * @param locLsnr Local listener.
 +     * @param rmtFilterFactory Remote filter factory.
 +     * @param internal Internal flag.
 +     * @param notifyExisting Notify existing flag.
 +     * @param oldValRequired Old value required flag.
 +     * @param sync Synchronous flag.
 +     * @param ignoreExpired Ignore expired events flag.
 +     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
 +     * @param taskHash Task name hash code.
 +     * @param locCache {@code True} if local cache.
 +     * @param keepBinary Keep binary flag.
 +     * @param types Event types.
 +     */
 +    public CacheContinuousQueryHandlerV2(
 +        String cacheName,
 +        Object topic,
 +        CacheEntryUpdatedListener<K, V> locLsnr,
 +        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
 +        boolean internal,
 +        boolean notifyExisting,
 +        boolean oldValRequired,
 +        boolean sync,
 +        boolean ignoreExpired,
 +        int taskHash,
 +        boolean skipPrimaryCheck,
 +        boolean locCache,
 +        boolean keepBinary,
 +        boolean ignoreClsNotFound,
 +        @Nullable Byte types) {
 +        super(cacheName,
 +            topic,
 +            locLsnr,
 +            null,
 +            internal,
 +            notifyExisting,
 +            oldValRequired,
 +            sync,
 +            ignoreExpired,
 +            taskHash,
 +            skipPrimaryCheck,
 +            locCache,
 +            keepBinary,
 +            ignoreClsNotFound);
 +
 +        assert rmtFilterFactory != null;
 +
 +        this.rmtFilterFactory = rmtFilterFactory;
 +
-         if (types != null)
++        if (types != null) {
++            assert types != 0;
++
 +            this.types = types;
++        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public CacheEntryEventFilter getEventFilter() {
 +        if (filter == null) {
 +            assert rmtFilterFactory != null;
 +
++            Factory<? extends CacheEntryEventFilter> factory = rmtFilterFactory;
++
 +            if (types != 0)
-                 rmtFilterFactory = new JCacheRemoteQueryFactory(rmtFilterFactory, types);
++                factory = new JCacheRemoteQueryFactory(rmtFilterFactory, types);
 +
-             filter = rmtFilterFactory.create();
++            filter = factory.create();
 +        }
 +
 +        return filter;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException
{
 +        super.p2pMarshal(ctx);
 +
 +        if (rmtFilterFactory != null && !U.isGrid(rmtFilterFactory.getClass()))
 +            rmtFilterFactoryDep = new DeployableObject(rmtFilterFactory, ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException
{
 +        super.p2pUnmarshal(nodeId, ctx);
 +
 +        if (rmtFilterFactoryDep != null)
 +            rmtFilterFactory = rmtFilterFactoryDep.unmarshal(nodeId, ctx);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridContinuousHandler clone() {
 +        return super.clone();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return S.toString(CacheContinuousQueryHandlerV2.class, this);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void writeExternal(ObjectOutput out) throws IOException {
 +        super.writeExternal(out);
 +
 +        boolean b = rmtFilterFactoryDep != null;
 +
 +        out.writeBoolean(b);
 +
 +        if (b)
 +            out.writeObject(rmtFilterFactoryDep);
 +        else
 +            out.writeObject(rmtFilterFactory);
 +
 +        out.writeByte(types);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
 +        super.readExternal(in);
 +
 +        boolean b = in.readBoolean();
 +
 +        if (b)
 +            rmtFilterFactoryDep = (DeployableObject)in.readObject();
 +        else
 +            rmtFilterFactory = (Factory)in.readObject();
 +
 +        types = in.readByte();
 +    }
 +
 +    /**
 +     *
 +     */
 +    private static class JCacheRemoteQueryFactory implements Factory<CacheEntryEventFilter>
{
 +        /** */
 +        private static final long serialVersionUID = 0L;
 +
 +        /** Factory. */
 +        protected Factory<? extends CacheEntryEventFilter> impl;
 +
 +        /** */
 +        private byte types;
 +
 +        /**
 +         * @param impl Factory.
 +         * @param types Types.
 +         */
 +        public JCacheRemoteQueryFactory(@Nullable Factory<? extends CacheEntryEventFilter>
impl, byte types) {
 +            this.impl = impl;
 +            this.types = types;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public JCacheQueryRemoteFilter create() {
 +            return new JCacheQueryRemoteFilter(impl != null ? impl.create() : null, types);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/ignite/blob/846e8e5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 2a05865,409c1da..bfe70f1
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@@ -608,235 -589,6 +608,235 @@@ public class CacheContinuousQueryManage
              keepBinary,
              ignoreClassNotFound);
  
 +        return executeQuery0(locLsnr,
 +            bufSize,
 +            timeInterval,
 +            autoUnsubscribe,
 +            notifyExisting,
 +            loc,
 +            keepBinary,
 +            hnd);
 +    }
 +
 +
 +    /**
 +     * @param locLsnr Local listener.
 +     * @param types JCache event types.
 +     * @param bufSize Buffer size.
 +     * @param timeInterval Time interval.
 +     * @param autoUnsubscribe Auto unsubscribe flag.
 +     * @param internal Internal flag.
 +     * @param notifyExisting Notify existing flag.
 +     * @param oldValRequired Old value required flag.
 +     * @param sync Synchronous flag.
 +     * @param ignoreExpired Ignore expired event flag.
 +     * @param loc Local flag.
 +     * @return Continuous routine ID.
 +     * @throws IgniteCheckedException In case of error.
 +     */
 +    private UUID executeJCacheQueryFactory(CacheEntryUpdatedListener locLsnr,
 +        final Factory<CacheEntryEventFilter> rmtFilterFactory,
 +        byte types,
 +        int bufSize,
 +        long timeInterval,
 +        boolean autoUnsubscribe,
 +        boolean internal,
 +        boolean notifyExisting,
 +        boolean oldValRequired,
 +        boolean sync,
 +        boolean ignoreExpired,
 +        boolean loc,
 +        final boolean keepBinary,
 +        boolean ignoreClassNotFound) throws IgniteCheckedException
 +    {
 +        assert types != 0 : types;
 +
 +        cctx.checkSecurity(SecurityPermission.CACHE_READ);
 +
 +        int taskNameHash = !internal && cctx.kernalContext().security().enabled()
?
 +            cctx.kernalContext().job().currentTaskNameHash() : 0;
 +
 +        boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED
&& cctx.affinityNode();
 +
 +        boolean v2 = rmtFilterFactory != null && useV2Protocol(cctx.discovery().allNodes());
 +
 +        GridContinuousHandler hnd;
 +
 +        if (v2)
 +            hnd = new CacheContinuousQueryHandlerV2(
 +                cctx.name(),
 +                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
 +                locLsnr,
 +                rmtFilterFactory,
 +                internal,
 +                notifyExisting,
 +                oldValRequired,
 +                sync,
 +                ignoreExpired,
 +                taskNameHash,
 +                skipPrimaryCheck,
 +                cctx.isLocal(),
 +                keepBinary,
 +                ignoreClassNotFound,
 +                types);
 +        else {
 +            JCacheQueryRemoteFilter jCacheFilter;
 +
 +            CacheEntryEventFilter filter = null;
 +
 +            if (rmtFilterFactory != null) {
 +                filter = rmtFilterFactory.create();
 +
 +                if (!(filter instanceof Serializable))
 +                    throw new IgniteCheckedException("Topology has nodes of the old versions.
In this case " +
 +                        "EntryEventFilter must implement java.io.Serializable interface.
Filter: " + filter);
 +            }
 +
 +            jCacheFilter = new JCacheQueryRemoteFilter(filter, types);
 +
 +            hnd = new CacheContinuousQueryHandler(
 +                cctx.name(),
 +                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
 +                locLsnr,
 +                jCacheFilter,
 +                internal,
 +                notifyExisting,
 +                oldValRequired,
 +                sync,
 +                ignoreExpired,
 +                taskNameHash,
 +                skipPrimaryCheck,
 +                cctx.isLocal(),
 +                keepBinary,
 +                ignoreClassNotFound);
 +        }
 +
 +        return executeQuery0(locLsnr,
 +            bufSize,
 +            timeInterval,
 +            autoUnsubscribe,
 +            notifyExisting,
 +            loc,
 +            keepBinary,
 +            hnd);
 +    }
 +
 +    /**
 +     * @param locLsnr Local listener.
 +     * @param bufSize Buffer size.
 +     * @param timeInterval Time interval.
 +     * @param autoUnsubscribe Auto unsubscribe flag.
 +     * @param internal Internal flag.
 +     * @param notifyExisting Notify existing flag.
 +     * @param oldValRequired Old value required flag.
 +     * @param sync Synchronous flag.
 +     * @param ignoreExpired Ignore expired event flag.
 +     * @param loc Local flag.
 +     * @return Continuous routine ID.
 +     * @throws IgniteCheckedException In case of error.
 +     */
 +    private UUID executeQueryWithFilterFactory(CacheEntryUpdatedListener locLsnr,
 +        final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
 +        int bufSize,
 +        long timeInterval,
 +        boolean autoUnsubscribe,
 +        boolean internal,
 +        boolean notifyExisting,
 +        boolean oldValRequired,
 +        boolean sync,
 +        boolean ignoreExpired,
 +        boolean loc,
 +        final boolean keepBinary,
 +        boolean ignoreClassNotFound) throws IgniteCheckedException
 +    {
 +        cctx.checkSecurity(SecurityPermission.CACHE_READ);
 +
 +        int taskNameHash = !internal && cctx.kernalContext().security().enabled()
?
 +            cctx.kernalContext().job().currentTaskNameHash() : 0;
 +
 +        boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED
&& cctx.affinityNode();
 +
 +        boolean v2 = rmtFilterFactory != null && useV2Protocol(cctx.discovery().allNodes());
 +
 +        GridContinuousHandler hnd;
 +
 +        if (v2)
 +            hnd = new CacheContinuousQueryHandlerV2(
 +                cctx.name(),
 +                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
 +                locLsnr,
 +                rmtFilterFactory,
 +                internal,
 +                notifyExisting,
 +                oldValRequired,
 +                sync,
 +                ignoreExpired,
 +                taskNameHash,
 +                skipPrimaryCheck,
 +                cctx.isLocal(),
 +                keepBinary,
 +                ignoreClassNotFound,
-                 (byte)0);
++                null);
 +        else {
 +            CacheEntryEventFilter fltr = null;
 +
 +            if (rmtFilterFactory != null) {
 +                fltr = rmtFilterFactory.create();
 +
 +                if (!(fltr instanceof CacheEntryEventSerializableFilter))
 +                    throw new IgniteCheckedException("Topology has nodes of the old versions.
In this case " +
 +                        "EntryEventFilter should implement org.apache.ignite.cache.CacheEntryEventSerializableFilter
" +
 +                        "interface. Filter: " + fltr);
 +            }
 +
 +            hnd = new CacheContinuousQueryHandler(
 +                cctx.name(),
 +                TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
 +                locLsnr,
 +                (CacheEntryEventSerializableFilter)fltr,
 +                internal,
 +                notifyExisting,
 +                oldValRequired,
 +                sync,
 +                ignoreExpired,
 +                taskNameHash,
 +                skipPrimaryCheck,
 +                cctx.isLocal(),
 +                keepBinary,
 +                ignoreClassNotFound);
 +        }
 +
 +        return executeQuery0(locLsnr,
 +            bufSize,
 +            timeInterval,
 +            autoUnsubscribe,
 +            notifyExisting,
 +            loc,
 +            keepBinary,
 +            hnd);
 +    }
 +
 +    /**
 +     * @param locLsnr Local listener.
 +     * @param bufSize Buffer size.
 +     * @param timeInterval Time interval.
 +     * @param autoUnsubscribe Auto unsubscribe flag.
 +     * @param notifyExisting Notify existing flag.
 +     * @param loc Local flag.
 +     * @param keepBinary Keep binary.
 +     * @param hnd Handler.
 +     * @return Continuous routine ID.
 +     * @throws IgniteCheckedException In case of error.
 +     */
 +    private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
 +        int bufSize,
 +        long timeInterval,
 +        boolean autoUnsubscribe,
 +        boolean notifyExisting,
 +        boolean loc,
 +        final boolean keepBinary,
 +        final GridContinuousHandler hnd)
 +        throws IgniteCheckedException {
          IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() ==
CacheMode.LOCAL) ?
              F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/846e8e5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index 23b9d85,62ed66f..c18cf35
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@@ -28,11 -28,11 +28,14 @@@ import java.util.SortedMap
  import java.util.TreeMap;
  import java.util.concurrent.ArrayBlockingQueue;
  import java.util.concurrent.BlockingQueue;
++import java.util.concurrent.Callable;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.ConcurrentMap;
  import javax.cache.Cache;
  import javax.cache.configuration.Factory;
  import javax.cache.event.CacheEntryEvent;
++import javax.cache.event.CacheEntryEventFilter;
++import javax.cache.event.CacheEntryListenerException;
  import javax.cache.event.CacheEntryUpdatedListener;
  import javax.cache.integration.CacheLoaderException;
  import javax.cache.integration.CacheWriterException;
@@@ -40,7 -40,7 +43,9 @@@ import javax.cache.processor.EntryProce
  import javax.cache.processor.MutableEntry;
  import org.apache.ignite.Ignite;
  import org.apache.ignite.IgniteCache;
++import org.apache.ignite.IgniteException;
  import org.apache.ignite.cache.CacheAtomicityMode;
++import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
  import org.apache.ignite.cache.CacheMemoryMode;
  import org.apache.ignite.cache.CacheMode;
  import org.apache.ignite.cache.affinity.Affinity;
@@@ -55,6 -55,6 +60,7 @@@ import org.apache.ignite.internal.util.
  import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
  import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
  import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
++import org.apache.ignite.testframework.GridTestUtils;
  import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
  import org.apache.ignite.transactions.Transaction;
  import org.apache.ignite.transactions.TransactionConcurrency;
@@@ -132,6 -132,6 +138,51 @@@ public class CacheContinuousQueryRandom
      /**
       * @throws Exception If failed.
       */
++    public void testFilterAndFactoryProvided() throws Exception {
++        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
++            1,
++            ATOMIC,
++            ONHEAP_TIERED,
++            false);
++
++        final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
++
++        try {
++            final ContinuousQuery qry = new ContinuousQuery();
++
++            qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter>() {
++                @Override public CacheEntryEventFilter create() {
++                    return null;
++                }
++            });
++
++            qry.setRemoteFilter(new CacheEntryEventSerializableFilter() {
++                @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException
{
++                    return false;
++                }
++            });
++
++            qry.setLocalListener(new CacheEntryUpdatedListener() {
++                @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException
{
++                    // No-op.
++                }
++            });
++
++            GridTestUtils.assertThrows(log, new Callable<Object>() {
++                @Override public Object call() throws Exception {
++                    return cache.query(qry);
++                }
++            }, IgniteException.class, null);
++
++        }
++        finally {
++            cache.destroy();
++        }
++    }
++
++    /**
++     * @throws Exception If failed.
++     */
      public void testAtomicClient() throws Exception {
          CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
              1,

http://git-wip-us.apache.org/repos/asf/ignite/blob/846e8e5d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------


Mime
View raw message