ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-2004 Review
Date Thu, 14 Apr 2016 06:17:22 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2004 db1bf2cbd -> ff595dbe7


ignite-2004 Review


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ff595dbe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ff595dbe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ff595dbe

Branch: refs/heads/ignite-2004
Commit: ff595dbe7cf3c15a8b7ec3935055646549c2b427
Parents: db1bf2c
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Apr 14 08:59:55 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Apr 14 09:12:36 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |   3 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  21 ---
 .../continuous/CacheContinuousQueryHandler.java |  15 +-
 .../thread/IgniteStripedThreadPoolExecutor.java | 184 ++++++-------------
 4 files changed, 64 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ff595dbe/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index c8ad3cd..64aa6f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1651,10 +1651,9 @@ public class IgnitionEx {
                 0,
                 new LinkedBlockingQueue<Runnable>());
 
-            // Note that we do not pre-start threads here as continuous query pool may not
be needed.
+            // Note that we do not pre-start threads here as this pool may not be needed.
             callbackExecSvc = new IgniteStripedThreadPoolExecutor(
                 cfg.getAsyncCallbackPoolSize(),
-                1,
                 cfg.getGridName(),
                 "callback");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff595dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 0232f22..a177b07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -155,10 +155,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements
Grid
     @GridDirectTransient
     private List<Integer> partIds;
 
-    /** */
-    @GridDirectTransient
-    private List<CacheObject> locPrevVals;
-
     /** Keep binary flag. */
     private boolean keepBinary;
 
@@ -517,16 +513,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements
Grid
 
     /**
      * @param idx Key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject localPreviousValue(int idx) {
-        assert locPrevVals != null;
-
-        return locPrevVals.get(idx);
-    }
-
-    /**
-     * @param idx Key index.
      * @return Entry processor.
      */
     @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx)
{
@@ -1060,13 +1046,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements
Grid
     private void cleanup() {
         nearVals = null;
         prevVals = null;
-
-        // Do not keep values if they are not needed for continuous query notification.
-        if (locPrevVals == null) {
-            keys = null;
-            vals = null;
-            locPrevVals = null;
-        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff595dbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 0f9f5b7..e2ee7c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -690,7 +690,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         // Initial query entry or evicted entry. These events should be fired immediately.
         if (e.updateCounter() == -1L) {
             return !e.isFiltered() ? F.<CacheEntryEvent<? extends K, ? extends V>>asList(
-                    new CacheContinuousQueryEvent<K, V>(cache, cctx, e)) :
+                new CacheContinuousQueryEvent<K, V>(cache, cctx, e)) :
                 Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
         }
 
@@ -1403,16 +1403,16 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      */
     private class ContinuousQueryAsyncClosure implements Runnable {
         /** */
-        private CacheContinuousQueryEvent<K, V> evt;
+        private final CacheContinuousQueryEvent<K, V> evt;
 
         /** */
-        private boolean primary;
+        private final boolean primary;
 
         /** */
-        private boolean recordIgniteEvt;
+        private final boolean recordIgniteEvt;
 
         /** */
-        private IgniteInternalFuture<?> fut;
+        private final IgniteInternalFuture<?> fut;
 
         /**
          * @param primary Primary flag.
@@ -1463,6 +1463,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
             return true;
         }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return S.toString(ContinuousQueryAsyncClosure.class, this);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff595dbe/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 44ea823..5876b08 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -17,75 +17,57 @@
 
 package org.apache.ignite.thread;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
  */
 public class IgniteStripedThreadPoolExecutor implements ExecutorService {
     /** */
-    public static final int DFLT_SEG_POOL_SIZE = 8;
-
-    /** */
-    public static final int DFLT_CONCUR_LVL = 16;
-
-    /** */
     private final ExecutorService[] execs;
 
-    /** */
-    private final int segShift;
-
-    /** */
-    private final int segMask;
-
-    /**
-     * Create thread pool with default concurrency level {@link #DFLT_CONCUR_LVL}.
-     */
-    public IgniteStripedThreadPoolExecutor() {
-        this(DFLT_CONCUR_LVL, DFLT_SEG_POOL_SIZE, "null", "null");
-    }
-
     /**
      * Create striped thread pool.
      *
      * @param concurrentLvl Concurrency level.
-     * @param poolSize Pool size.
+     * @param gridName Node name.
+     * @param threadNamePrefix Thread name prefix.
      */
-    public IgniteStripedThreadPoolExecutor(int concurrentLvl, int poolSize, String gridName,
String threadNamePrefix) {
+    public IgniteStripedThreadPoolExecutor(int concurrentLvl, String gridName, String threadNamePrefix)
{
         execs = new ExecutorService[concurrentLvl];
 
         ThreadFactory factory = new IgniteThreadFactory(gridName, threadNamePrefix);
 
         for (int i = 0; i < concurrentLvl; i++)
-            if (poolSize == 1)
-                execs[i] = Executors.newSingleThreadExecutor(factory);
-            else
-                execs[i] = Executors.newFixedThreadPool(poolSize, factory);
-
-        // Find power-of-two sizes best matching arguments
-        int sshift = 0;
-        int ssize = 1;
-
-        while (ssize < concurrentLvl) {
-            ++sshift;
-
-            ssize <<= 1;
-        }
+            execs[i] = Executors.newSingleThreadExecutor(factory);
+    }
 
-        segShift = 32 - sshift;
-        segMask = ssize - 1;
+    /**
+     * Executes the given command at some time in the future. The command with the same {@code
index}
+     * will be executed in the same thread.
+     *
+     * @param task the runnable task
+     * @param idx Striped index.
+     * @throws RejectedExecutionException if this task cannot be
+     * accepted for execution.
+     * @throws NullPointerException If command is null
+     */
+    public void execute(Runnable task, int idx) {
+        if (idx < execs.length)
+            execs[idx].execute(task);
+        else
+            execs[idx % execs.length].execute(task);
     }
 
     /** {@inheritDoc} */
@@ -96,7 +78,7 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService
{
 
     /** {@inheritDoc} */
     @Override public List<Runnable> shutdownNow() {
-        List<Runnable> res = new LinkedList<>();
+        List<Runnable> res = new ArrayList<>();
 
         for (ExecutorService exec : execs) {
             for (Runnable r : exec.shutdownNow())
@@ -137,121 +119,61 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService
{
     }
 
     /** {@inheritDoc} */
-    @Override public <T> Future<T> submit(Callable<T> task) {
-        return execForTask(task).submit(task);
-    }
+    @NotNull @Override public <T> Future<T> submit(Callable<T> task) {
+        assert false;
 
-    /** {@inheritDoc} */
-    @Override public <T> Future<T> submit(Runnable task, T result) {
-        return execForTask(task).submit(task, result);
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public Future<?> submit(Runnable task) {
-        return execForTask(task).submit(task);
-    }
+    @NotNull @Override public <T> Future<T> submit(Runnable task, T res) {
+        assert false;
 
-    /**
-     * Executes the given command at some time in the future. The command with the same {@code
index}
-     * will be executed in the same thread.
-     *
-     * @param task the runnable task
-     * @param idx Striped index.
-     * @throws RejectedExecutionException if this task cannot be
-     * accepted for execution.
-     * @throws NullPointerException if command is null
-     */
-    public void execute(Runnable task, int idx) {
-        if (idx < execs.length)
-            execs[idx].execute(task);
-        else
-            execs[idx % execs.length].execute(task);
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> List<Future<T>> invokeAll(Collection<? extends
Callable<T>> tasks)
-        throws InterruptedException {
-        List<Future<T>> futs = new LinkedList<>();
-
-        for (Callable<T> task : tasks)
-            futs.add(execForTask(task).submit(task));
-
-        boolean done = false;
+    @NotNull @Override public Future<?> submit(Runnable task) {
+        assert false;
 
-        try {
-            for (Future<T> fut : futs) {
-                try {
-                    fut.get();
-                }
-                catch (ExecutionException | InterruptedException ignored) {
-                    // No-op.
-                }
-            }
-
-            done = true;
-
-            return futs;
-        }
-        finally {
-            if (!done) {
-                for (Future<T> fut : futs)
-                    fut.cancel(true);
-            }
-        }
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> List<Future<T>> invokeAll(Collection<? extends
Callable<T>> tasks, long timeout,
-        TimeUnit unit) throws InterruptedException {
-        throw new RuntimeException("Not implemented.");
+    @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<?
extends Callable<T>> tasks) {
+        assert false;
+
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T invokeAny(Collection<? extends Callable<T>>
tasks) throws InterruptedException,
-        ExecutionException {
-        throw new RuntimeException("Not implemented.");
+    @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<?
extends Callable<T>> tasks,
+        long timeout,
+        TimeUnit unit) {
+        assert false;
+
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T invokeAny(Collection<? extends Callable<T>>
tasks, long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-        throw new RuntimeException("Not implemented.");
+    @NotNull @Override public <T> T invokeAny(Collection<? extends Callable<T>>
tasks) {
+        assert false;
+
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Override public void execute(Runnable cmd) {
-        execForTask(cmd).execute(cmd);
-    }
+    @Override public <T> T invokeAny(Collection<? extends Callable<T>>
tasks, long timeout, TimeUnit unit) {
+        assert false;
 
-    /**
-     * Applies a supplemental hash function to a given hashCode, which
-     * defends against poor quality hash functions.  This is critical
-     * because ConcurrentHashMap uses power-of-two length hash tables,
-     * that otherwise encounter collisions for hashCodes that do not
-     * differ in lower or upper bits.
-     *
-     * @param h Hash code.
-     * @return Enhanced hash code.
-     */
-    private int hash(int h) {
-        // Spread bits to regularize both segment and index locations,
-        // using variant of single-word Wang/Jenkins hash.
-        h += (h <<  15) ^ 0xffffcd7d;
-        h ^= (h >>> 10);
-        h += (h <<   3);
-        h ^= (h >>>  6);
-        h += (h <<   2) + (h << 14);
-        return h ^ (h >>> 16);
+        throw new UnsupportedOperationException();
     }
 
-    /**
-     * @param cmd Command.
-     * @return Service.
-     */
-    private <T> ExecutorService execForTask(T cmd) {
-        assert cmd != null;
+    /** {@inheritDoc} */
+    @Override public void execute(Runnable cmd) {
+        assert false;
 
-        return execs[(hash(System.identityHashCode(cmd)) >>> segShift) & segMask];
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */


Mime
View raw message