ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [09/13] ignite git commit: Merge branch 'master' into ignite-2004
Date Fri, 08 Apr 2016 17:38:20 GMT
Merge branch 'master' into ignite-2004

Conflicts:
	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fcf0c95f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fcf0c95f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fcf0c95f

Branch: refs/heads/ignite-2004
Commit: fcf0c95f17ae227c1a95aa45ec65e1070fe3aac6
Parents: 3e449b0 f970c11
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Thu Apr 7 17:38:16 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Thu Apr 7 17:38:39 2016 +0300

----------------------------------------------------------------------
 .../store/jdbc/CacheAbstractJdbcStore.java      |   4 +
 .../discovery/GridDiscoveryManager.java         |   2 +-
 .../affinity/GridAffinityAssignment.java        |  15 +
 .../affinity/GridAffinityAssignmentCache.java   |  72 +++-
 .../cache/CacheAffinitySharedManager.java       |  10 -
 .../cache/GridCacheAffinityManager.java         |  11 -
 .../processors/cache/GridCacheMapEntry.java     |   7 +-
 .../GridCachePartitionExchangeManager.java      |  16 -
 .../GridDhtPartitionsExchangeFuture.java        |   2 +-
 .../continuous/CacheContinuousQueryHandler.java | 111 +++--
 .../transactions/IgniteTxLocalAdapter.java      |   2 +-
 .../continuous/GridContinuousProcessor.java     |  13 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   2 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  46 +--
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  23 +-
 .../affinity/AffinityHistoryCleanupTest.java    | 414 +++++++++++++++++++
 .../CacheJdbcPojoStoreAbstractSelfTest.java     |  28 +-
 ...eJdbcStoreAbstractMultithreadedSelfTest.java |  25 +-
 .../ignite/cache/store/jdbc/model/Person.java   |  25 ++
 .../IgniteClientReconnectAbstractTest.java      |   7 +-
 .../BinaryObjectOffHeapUnswapTemporaryTest.java | 362 ++++++++++++++++
 .../GridCacheBinaryObjectsAbstractSelfTest.java | 181 +++++---
 ...ffinityAssignmentNodeJoinValidationTest.java | 134 ++++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |   8 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |  39 +-
 .../TcpDiscoverySpiFailureTimeoutSelfTest.java  |  23 +-
 .../spi/discovery/tcp/TestTcpDiscoverySpi.java  |   5 +-
 .../IgniteBinaryObjectsTestSuite.java           |   6 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 .../testsuites/IgniteCacheTestSuite5.java       |   2 +
 30 files changed, 1320 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fcf0c95f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/fcf0c95f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4bf22e7,16513b0..56c02d6
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@@ -547,112 -590,70 +547,99 @@@ public class CacheContinuousQueryHandle
  
          final GridCacheContext cctx = cacheContext(ctx);
  
-         final Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
 -        Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new
ArrayList<>();
++        final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0
= new ArrayList<>();
  
 -        for (CacheContinuousQueryEntry e : entries) {
 -            GridCacheDeploymentManager depMgr = cctx.deploy();
 +        final List<PartitionRecovery> rcvs = new ArrayList<>();
  
 -            ClassLoader ldr = depMgr.globalLoader();
 +        try {
 +            for (CacheContinuousQueryEntry e : entries) {
 +                GridCacheDeploymentManager depMgr = cctx.deploy();
 +
 +                ClassLoader ldr = depMgr.globalLoader();
  
 -            if (ctx.config().isPeerClassLoadingEnabled()) {
 -                GridDeploymentInfo depInfo = e.deployInfo();
 +                if (ctx.config().isPeerClassLoadingEnabled()) {
 +                    GridDeploymentInfo depInfo = e.deployInfo();
  
 -                if (depInfo != null) {
 -                    depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(),
depInfo.deployMode(),
 -                        depInfo.participants(), depInfo.localDeploymentOwner());
 +                    if (depInfo != null) {
 +                        depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(),
depInfo.deployMode(),
 +                            depInfo.participants(), depInfo.localDeploymentOwner());
 +                    }
                  }
 -            }
  
 -            try {
 -                e.unmarshal(cctx, ldr);
 +                try {
 +                    e.unmarshal(cctx, ldr);
 +
 +                    if (!asyncCallback) {
-                         T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery>
evts = handleEvent(ctx, e, false);
++                        T2<Collection<CacheEntryEvent<? extends K, ? extends V>>,
PartitionRecovery> evts =
++                            handleEvent(ctx, e, false);
  
 -                entries0.addAll(handleEvent(ctx, e));
 +                        if (evts.get2() != null)
 +                            rcvs.add(evts.get2());
 +
 +                        entries0.addAll(evts.get1());
 +                    }
 +                }
 +                catch (IgniteCheckedException ex) {
 +                    if (ignoreClsNotFound)
 +                        assert internal;
 +                    else
 +                        U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
 +                }
              }
 -            catch (IgniteCheckedException ex) {
 -                if (ignoreClsNotFound)
 -                    assert internal;
 -                else
 -                    U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
 +
-             final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
- 
 +            if (asyncCallback) {
 +                for (final CacheContinuousQueryEntry e : entries) {
 +                    ctx.continuousQueryPool().execute(new Runnable() {
 +                        @Override public void run() {
-                             T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery>
evts =
++                            T2<Collection<CacheEntryEvent<? extends K, ? extends
V>>, PartitionRecovery> evts =
 +                                handleEvent(ctx, e, false);
 +
-                             for (CacheContinuousQueryEntry entry : evts.get1()) {
-                                 CacheContinuousQueryEvent evt =
-                                     new CacheContinuousQueryEvent<>(cache, cctx, entry);
- 
-                                 locLsnr.onUpdated(Collections.<CacheEntryEvent<? extends
K, ? extends V>>
-                                     singleton(evt));
-                             }
++                            locLsnr.onUpdated(evts.get1());
 +                        }
 +                    }, e.partition());
 +                }
              }
-             else if (!entries0.isEmpty()) {
-                 Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
-                     new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K,
? extends V>>() {
-                         @Override public CacheEntryEvent<? extends K, ? extends V>
apply(CacheContinuousQueryEntry e) {
-                             return new CacheContinuousQueryEvent<>(cache, cctx, e);
-                         }
-                     },
-                     new IgnitePredicate<CacheContinuousQueryEntry>() {
-                         @Override public boolean apply(CacheContinuousQueryEntry entry)
{
-                             return !entry.isFiltered();
-                         }
-                     }
-                 );
- 
-                 locLsnr.onUpdated(evts);
-             }
++            else if (!entries0.isEmpty())
++                locLsnr.onUpdated(entries0);
 +        }
 +        finally {
 +            for (PartitionRecovery rec : rcvs)
 +                rec.unlock();
          }
 -
 -        if (!entries0.isEmpty())
 -            locLsnr.onUpdated(entries0);
      }
  
      /**
       * @param ctx Context.
       * @param e entry.
 +     * @param async Async.
       * @return Entry collection.
       */
-     private T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery> handleEvent(GridKernalContext
ctx,
-         CacheContinuousQueryEntry e, boolean async) {
 -    private Collection<CacheEntryEvent<? extends K, ? extends V>> handleEvent(GridKernalContext
ctx,
 -        CacheContinuousQueryEntry e) {
++    private T2<Collection<CacheEntryEvent<? extends K, ? extends V>>, PartitionRecovery>
++        handleEvent(GridKernalContext ctx, CacheContinuousQueryEntry e, boolean async) {
          assert e != null;
  
+         GridCacheContext<K, V> cctx = cacheContext(ctx);
+ 
+         final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
+ 
          if (internal) {
              if (e.isFiltered())
 -                return Collections.emptyList();
 +                return new T2(Collections.emptyList(), null);
              else
-                 return new T2(F.asList(e), null);
 -                return F.<CacheEntryEvent<? extends K, ? extends V>>asList(
 -                    new CacheContinuousQueryEvent<K, V>(cache, cctx, e));
++                return new T2(F.<CacheEntryEvent<? extends K, ? extends V>>asList(
++                    new CacheContinuousQueryEvent<K, V>(cache, cctx, e)), null);
          }
  
          // Initial query entry or evicted entry. These events should be fired immediately.
-         if (e.updateCounter() == -1L)
-             return new T2(F.asList(e), null);
+         if (e.updateCounter() == -1L) {
 -            return !e.isFiltered() ? F.<CacheEntryEvent<? extends K, ? extends V>>asList(
 -                new CacheContinuousQueryEvent<K, V>(cache, cctx, e)) :
 -                Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
++            return !e.isFiltered() ? new T2(F.<CacheEntryEvent<? extends K, ? extends
V>>asList(
++                    new CacheContinuousQueryEvent<K, V>(cache, cctx, e)), null) :
++                new T2(Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList(),
null);
+         }
  
          PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
  
-         return new T2<>(rec.collectEntries(e, async), async ? null : rec);
 -        return rec.collectEntries(cctx, cache, e);
++        return new T2<>(rec.<K, V>collectEntries(e, cctx, async, cache), rec);
      }
  
      /**
@@@ -779,24 -776,26 +765,30 @@@
          /**
           * Add continuous entry.
           *
+          * @param cctx Cache context.
+          * @param cache Cache.
           * @param entry Cache continuous query entry.
-          * @return Collection entries which will be fired.
+          * @return Collection entries which will be fired. This collection should contains
only non-filtered events.
           */
-         public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry
entry, boolean async) {
 -        public <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>>
collectEntries(GridCacheContext cctx,
 -            IgniteCache cache,
 -            CacheContinuousQueryEntry entry) {
 -            assert entry != null;
++        <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>> collectEntries(CacheContinuousQueryEntry
entry,
++            GridCacheContext cctx,
++            boolean async,
++            IgniteCache cache) {
 +            if (!async)
 +                lock.lock();
  
 -            if (entry.topologyVersion() == null) { // Possible if entry is sent from old
node.
 -                assert entry.updateCounter() == 0L : entry;
 +            try {
 +                assert entry != null;
  
 -                return F.<CacheEntryEvent<? extends K, ? extends V>>
 -                    asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 -            }
 +                if (entry.topologyVersion() == null) { // Possible if entry is sent from
old node.
 +                    assert entry.updateCounter() == 0L : entry;
  
-                     return F.asList(entry);
 -            List<CacheEntryEvent<? extends K, ? extends V>> entries;
++                    return F.<CacheEntryEvent<? extends K, ? extends V>>
++                        asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
 +                }
 +
-                 List<CacheContinuousQueryEntry> entries;
++                List<CacheEntryEvent<? extends K, ? extends V>> entries;
  
 -            synchronized (pendingEvts) {
                  // Received first event.
                  if (curTop == AffinityTopologyVersion.NONE) {
                      lastFiredEvt = entry.updateCounter();
@@@ -1251,276 -1241,6 +1247,261 @@@
      }
  
      /**
 +     *
 +     */
 +    private class ContinuousQueryClosureImpl implements CacheContinuousQueryClosure {
 +        /** */
 +        private final IgniteCache cache;
 +
 +        /** */
 +        private final IgniteLogger log;
 +
 +        /** */
 +        private final boolean fireEvent;
 +
 +        /** */
 +        private CacheContinuousQueryEvent<K, V> evt;
 +
 +        /** */
 +        private CacheEntryEventFilter filter;
 +
 +        /** */
 +        private final GridCacheContext<K, V> cctx;
 +
 +        /** */
 +        private boolean primary;
 +
 +        /** */
 +        private boolean loc;
 +
 +        /** */
 +        private GridKernalContext ctx;
 +
 +        /** */
 +        private UUID nodeId;
 +
 +        /** */
 +        private UUID routineId;
 +
 +        /** */
 +        private boolean recordIgniteEvt;
 +
 +        /** */
 +        private final String taskName;
 +
 +        /** */
 +        private boolean notify;
 +
 +        /** */
 +        private boolean backup;
 +
 +        /** */
 +        private final CountDownLatch latch = new CountDownLatch(1);
 +
 +        /**
 +         * @param taskName Task name.
 +         * @param recordIgniteEvt Fired event.
 +         * @param routineId Routine id.
 +         * @param nodeId Node id.
 +         * @param ctx Kernal context.
 +         * @param loc Local.
 +         * @param primary Primary flag.
 +         * @param cctx Cache context.
 +         * @param filter Filter.
 +         * @param evt Event.
 +         * @param fireEvent Immediately fire event.
 +         * @param cache Cache.
 +         */
 +        ContinuousQueryClosureImpl(String taskName,
 +            boolean recordIgniteEvt,
 +            UUID routineId,
 +            UUID nodeId,
 +            GridKernalContext ctx,
 +            boolean loc,
 +            boolean primary,
 +            GridCacheContext<K, V> cctx,
 +            CacheEntryEventFilter filter,
 +            CacheContinuousQueryEvent<K, V> evt,
 +            boolean fireEvent, IgniteCache cache) {
 +            this.taskName = taskName;
 +            this.recordIgniteEvt = recordIgniteEvt;
 +            this.routineId = routineId;
 +            this.nodeId = nodeId;
 +            this.ctx = ctx;
 +            this.loc = loc;
 +            this.primary = primary;
 +            this.cctx = cctx;
 +            this.filter = filter;
 +            this.evt = evt;
 +            this.cache = cache;
 +            this.fireEvent = fireEvent;
 +
 +            log = ctx.log(CacheContinuousQueryHandler.class);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void run() {
 +            filter();
 +
 +            if (fireEvent || waitIfAsync())
 +                onEntryUpdate0();
 +        }
 +
 +        /**
 +         * @return {@code True} if event fired on this node.
 +         */
 +        private boolean primary() {
 +            return primary || skipPrimaryCheck;
 +        }
 +
 +        /**
 +         * @return {@code False} if filter sync.
 +         */
 +        private boolean waitIfAsync() {
 +            if (backup)
 +                return false;
 +
 +            try {
 +                U.await(latch);
 +            }
 +            catch (IgniteInterruptedCheckedException e) {
 +                log.error("Failed to wait latch.");
 +            }
 +
 +            return true;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void skipEvent() {
 +            if (evt != null && evt.entry() != null)
 +                evt.entry().markFiltered();
 +
 +            onEntryUpdate();
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void onEntryUpdate() {
 +            if (backup)
 +                return;
 +
 +            if (!fireEvent && asyncCallback) {
 +                latch.countDown();
 +
 +                return;
 +            }
 +
 +            onEntryUpdate0();
 +        }
 +
 +        /**
 +         *
 +         */
 +        private void onEntryUpdate0() {
 +            try {
 +                final CacheContinuousQueryEntry entry = evt.entry();
 +
 +                if (loc) {
 +                    if (!locCache) {
-                         T2<Collection<CacheContinuousQueryEntry>, PartitionRecovery>
events =
++                        T2<Collection<CacheEntryEvent<? extends K, ? extends V>>,
PartitionRecovery> events =
 +                            handleEvent(ctx, entry, asyncCallback);
 +
 +                        try {
-                             Collection<CacheContinuousQueryEntry> entries = events.get1();
- 
-                             if (!entries.isEmpty()) {
-                                 Iterable<CacheEntryEvent<? extends K, ? extends V>>
evts = F.viewReadOnly(entries,
-                                     new C1<CacheContinuousQueryEntry, CacheEntryEvent<?
extends K, ? extends V>>() {
-                                         @Override public CacheEntryEvent<? extends K,
? extends V> apply(
-                                             CacheContinuousQueryEntry e) {
-                                             return new CacheContinuousQueryEvent<>(cache,
cctx, e);
-                                         }
-                                     },
-                                     new IgnitePredicate<CacheContinuousQueryEntry>()
{
-                                         @Override public boolean apply(CacheContinuousQueryEntry
entry) {
-                                             return !entry.isFiltered();
-                                         }
-                                     }
-                                 );
- 
-                                 if (!F.isEmpty(evts))
-                                     locLsnr.onUpdated(evts);
++                            Collection<CacheEntryEvent<? extends K, ? extends V>>
evts = events.get1();
++
++                            if (!evts.isEmpty()) {
++                                locLsnr.onUpdated(evts);
 +
 +                                if (!internal && !skipPrimaryCheck)
 +                                    sendBackupAcknowledge(ackBuf.onAcknowledged(entry),
routineId, ctx);
 +                            }
 +                        }
 +                        finally {
 +                            if (events.get2() != null)
 +                                events.get2().unlock();
 +                        }
 +                    }
 +                    else {
 +                        if (!entry.isFiltered())
 +                            locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends
V>>asList(evt));
 +                    }
 +                }
 +                else {
 +                    if (!entry.isFiltered())
 +                        prepareEntry(cctx, nodeId, entry);
 +
 +                    CacheContinuousQueryEntry e = handleEntry(entry);
 +
 +                    if (e != null)
 +                        ctx.continuous().addNotification(nodeId, routineId, entry, topic,
sync, true);
 +                }
 +            }
 +            catch (ClusterTopologyCheckedException ex) {
 +                if (log.isDebugEnabled())
 +                    log.debug("Failed to send event notification to node, node left cluster
" +
 +                        "[node=" + nodeId + ", err=" + ex + ']');
 +            }
 +            catch (IgniteCheckedException ex) {
 +                U.error(ctx.log(getClass()), "Failed to send event notification to node:
" + nodeId, ex);
 +            }
 +
 +            if (recordIgniteEvt && notify) {
 +                ctx.event().record(new CacheQueryReadEvent<>(
 +                    ctx.discovery().localNode(),
 +                    "Continuous query executed.",
 +                    EVT_CACHE_QUERY_OBJECT_READ,
 +                    CacheQueryType.CONTINUOUS.name(),
 +                    cacheName,
 +                    null,
 +                    null,
 +                    null,
 +                    filter instanceof CacheEntryEventSerializableFilter ?
 +                        (CacheEntryEventSerializableFilter)filter : null,
 +                    null,
 +                    nodeId,
 +                    taskName,
 +                    evt.getKey(),
 +                    evt.getValue(),
 +                    evt.getOldValue(),
 +                    null
 +                ));
 +            }
 +        }
 +
 +        /**
 +         *
 +         */
 +        public void filter() {
 +            CacheContinuousQueryEntry entry = evt.entry();
 +
 +            notify = !entry.isFiltered();
 +
 +            try {
 +                if (notify && filter != null)
 +                    notify = filter.evaluate(evt);
 +            }
 +            catch (Exception e) {
 +                U.error(log, "CacheEntryEventFilter failed: " + e);
 +            }
 +
 +            if (!notify)
 +                entry.markFiltered();
 +
 +            if (!primary()) {
 +                if (!internal) {
 +                    // Skip init query and expire entries.
 +                    if (entry.updateCounter() != -1L) {
 +                        entry.markBackup();
 +
 +                        backupQueue.add(entry);
 +                    }
 +                }
 +
 +                backup = true;
 +            }
 +        }
 +    }
 +
 +    /**
       * Deployable object.
       */
      protected static class DeployableObject implements Externalizable {


Mime
View raw message