ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [28/50] [abbrv] ignite git commit: finalizing - this revision is broken - full api multinode tests hang
Date Mon, 05 Dec 2016 14:41:50 GMT
finalizing - this revision is broken - full api multinode tests hang


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cf19932d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cf19932d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cf19932d

Branch: refs/heads/ignite-comm-balance-master
Commit: cf19932da0f98b48833d00d28eb3576084ae1683
Parents: 5f503b6
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Wed Nov 16 20:34:35 2016 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Wed Nov 16 20:34:35 2016 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  33 ++
 .../org/apache/ignite/internal/IgnitionEx.java  |  56 ++-
 .../managers/communication/GridIoManager.java   |  54 +--
 .../util/SingleConsumerSpinCircularBuffer.java  | 232 ------------
 .../ignite/internal/util/StripedExecutor.java   | 371 ++++++++++++++-----
 .../ignite/thread/IgniteThreadFactory.java      |   8 +-
 6 files changed, 369 insertions(+), 385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cf19932d/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 153870f..8a5c945 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -74,6 +74,7 @@ import org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi
 import org.apache.ignite.spi.swapspace.SwapSpaceSpi;
 import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
 import org.apache.ignite.ssl.SslContextFactory;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 
 import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.STOP;
 
@@ -230,6 +231,9 @@ public class IgniteConfiguration {
     /** Logger. */
     private IgniteLogger log;
 
+    /** Use striped pool for public and system pools. */
+    private boolean useStripedPool = true;
+
     /** Public pool size. */
     private int pubPoolSize = DFLT_PUBLIC_THREAD_CNT;
 
@@ -559,6 +563,7 @@ public class IgniteConfiguration {
         timeSrvPortRange = cfg.getTimeServerPortRange();
         txCfg = cfg.getTransactionConfiguration();
         userAttrs = cfg.getUserAttributes();
+        useStripedPool = cfg.isUseStripedPool();
         utilityCacheKeepAliveTime = cfg.getUtilityCacheKeepAliveTime();
         utilityCachePoolSize = cfg.getUtilityCacheThreadPoolSize();
         waitForSegOnStart = cfg.isWaitForSegmentOnStart();
@@ -712,6 +717,34 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Returns {@code true} if striped pool should be used for public
+     * pool and system pools. Default is {@code true}. If {@code false} then,
+     * {@link IgniteThreadPoolExecutor} is used.
+     * <p>
+     * Striped pool is better for typical cache operations and short-term
+     * compute jobs.
+     *
+     * @return {@code True} if striped pool should be used for public
+     * pool and system pools.
+     *
+     * @see #getPublicThreadPoolSize()
+     * @see #getSystemThreadPoolSize()
+     */
+    public boolean isUseStripedPool() {
+        return useStripedPool;
+    }
+
+    /**
+     * Enables/disables use of striped pools for public and system pools.
+     *
+     * @param useStripedPool {@code True} if striped pool should be used for public
+     * pool and system pools.
+     */
+    public void setUseStripedPool(boolean useStripedPool) {
+        this.useStripedPool = useStripedPool;
+    }
+
+    /**
      * Should return a thread pool size to be used in grid.
      * This executor service will be in charge of processing {@link ComputeJob GridJobs}
      * and user messages sent to node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf19932d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 5b2c3fc..69f674d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -36,6 +36,7 @@ import java.util.Map.Entry;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
 import org.apache.ignite.internal.util.typedef.CA;
 import org.apache.ignite.internal.util.typedef.F;
@@ -1454,10 +1456,10 @@ public class IgnitionEx {
         private volatile IgniteKernal grid;
 
         /** Executor service. */
-        private ThreadPoolExecutor execSvc;
+        private ExecutorService execSvc;
 
         /** System executor service. */
-        private ThreadPoolExecutor sysExecSvc;
+        private ExecutorService sysExecSvc;
 
         /** Management executor service. */
         private ThreadPoolExecutor mgmtExecSvc;
@@ -1640,27 +1642,41 @@ public class IgnitionEx {
                 ensureMultiInstanceSupport(myCfg.getSwapSpaceSpi());
             }
 
-            execSvc = new IgniteThreadPoolExecutor(
-                "pub",
-                cfg.getGridName(),
-                cfg.getPublicThreadPoolSize(),
-                cfg.getPublicThreadPoolSize(),
-                DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
+            {
+                if (cfg.isUseStripedPool())
+                    execSvc = new StripedExecutor(cfg.getPublicThreadPoolSize(), cfg.getGridName(),
"pub", log);
+                else {
+                    IgniteThreadPoolExecutor pool = new IgniteThreadPoolExecutor(
+                        "pub",
+                        cfg.getGridName(),
+                        cfg.getPublicThreadPoolSize(),
+                        cfg.getPublicThreadPoolSize(),
+                        DFLT_THREAD_KEEP_ALIVE_TIME,
+                        new LinkedBlockingQueue<Runnable>());
 
-            execSvc.allowCoreThreadTimeOut(true);
+                    pool.allowCoreThreadTimeOut(true);
 
-            // Note that since we use 'LinkedBlockingQueue', number of
-            // maximum threads has no effect.
-            sysExecSvc = new IgniteThreadPoolExecutor(
-                "sys",
-                cfg.getGridName(),
-                cfg.getSystemThreadPoolSize(),
-                cfg.getSystemThreadPoolSize(),
-                DFLT_THREAD_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>());
+                    execSvc = pool;
+                }
+            }
+
+            {
+                if (cfg.isUseStripedPool())
+                    sysExecSvc = new StripedExecutor(cfg.getSystemThreadPoolSize(), cfg.getGridName(),
"sys", log);
+                else {
+                    IgniteThreadPoolExecutor pool = new IgniteThreadPoolExecutor(
+                        "sys",
+                        cfg.getGridName(),
+                        cfg.getSystemThreadPoolSize(),
+                        cfg.getSystemThreadPoolSize(),
+                        DFLT_THREAD_KEEP_ALIVE_TIME,
+                        new LinkedBlockingQueue<Runnable>());
+
+                    pool.allowCoreThreadTimeOut(true);
 
-            sysExecSvc.allowCoreThreadTimeOut(true);
+                    sysExecSvc = pool;
+                }
+            }
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf19932d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 43df712..7425554 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -30,8 +30,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -108,9 +108,6 @@ import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIM
  * Grid communication manager.
  */
 public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>>
{
-    // TODO
-    private static final boolean STRIPED = Boolean.getBoolean("STRIPED_POOL");
-
     /** Empty array of message factories. */
     public static final MessageFactory[] EMPTY = {};
 
@@ -247,35 +244,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         startSpi();
 
-        if (STRIPED) {
-            CommunicationSpi spi = getSpi();
-
-            striped = new StripedExecutor(
-                !ctx.clientNode() ?
-                    Runtime.getRuntime().availableProcessors() :
-                    4);
-
-            // TODO
-            Thread t = new Thread(new Runnable() {
-                @Override public void run() {
-                    for (; ; ) {
-                        try {
-                            Thread.sleep(ctx.config().getMetricsLogFrequency());
-                        }
-                        catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-
-                        striped.dumpStats(log);
-                    }
-                }
-            });
-
-            t.setDaemon(true);
-
-            t.start();
-        }
-
         getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() {
             @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable
msgC) {
                 try {
@@ -645,9 +613,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         if (log.isDebugEnabled())
             log.debug(stopInfo());
-
-        if (striped != null)
-            striped.stop();
     }
 
     /**
@@ -799,8 +764,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         }
     }
 
-    StripedExecutor striped;
-
     /**
      * @param nodeId Node ID.
      * @param msg Message.
@@ -839,16 +802,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
         }
 
-        if (STRIPED) {
-            int part = msg.partition();
-
-            striped.execute(part != -1 ? part : ThreadLocalRandom.current().nextInt(striped.stripes()),
c);
-
-            return;
-        }
-
         try {
-            pools.poolForPolicy(plc).execute(c);
+            Executor exec = pools.poolForPolicy(plc);
+
+            if (exec instanceof StripedExecutor)
+                ((StripedExecutor)exec).execute(msg.partition(), c);
+            else
+                exec.execute(c);
         }
         catch (RejectedExecutionException e) {
             U.error(log, "Failed to process regular message due to execution rejection. Increase
the upper bound " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf19932d/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java
deleted file mode 100644
index be1bc7d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util;
-
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.locks.LockSupport;
-
-/**
- * This class implements a circular buffer for efficient data exchange.
- */
-public class SingleConsumerSpinCircularBuffer<T> {
-    /** */
-    private static final int SPINS_CNT = 32;
-
-    /** */
-    private static final AtomicLongFieldUpdater<SingleConsumerSpinCircularBuffer> writePosUpd
=
-        AtomicLongFieldUpdater.newUpdater(SingleConsumerSpinCircularBuffer.class, "writePos");
-
-    /** */
-    private volatile long readPos;
-
-    /** */
-    @SuppressWarnings("unused")
-    private long p01, p02, p03, p04, p05, p06, p07;
-
-    /** */
-    private volatile long writePos;
-
-    /** */
-    @SuppressWarnings("unused")
-    private long p11, p12, p13, p14, p15, p16, p17;
-
-    /** */
-    private final long sizeMask;
-
-    /** */
-    private final Item<T>[] arr;
-
-    /** */
-    private volatile Thread consumer;
-
-    /** */
-    private volatile boolean parked;
-
-    /**
-     * @param size Size.
-     */
-    @SuppressWarnings("unchecked")
-    public SingleConsumerSpinCircularBuffer(
-        int size
-    ) {
-        sizeMask = size - 1;
-
-        arr = (Item<T>[])new Item[size];
-
-        // Fill the array.
-        for (int i = 0; i < arr.length; i++)
-            arr[i] = new Item<>(-(arr.length + i));
-
-        readPos = writePos = arr.length * 2;
-    }
-
-    /**
-     * @return Head element or {@code null}.
-     */
-    public T poll() {
-        try {
-            return poll0(false);
-        }
-        catch (InterruptedException e) {
-            assert false; // should never happen.
-
-            throw new Error();
-        }
-    }
-
-    /**
-     * @param take {@code false} to poll, {@code true} to take.
-     * @return Head element or {@code null}.
-     * @throws InterruptedException If interrupted.
-     */
-    private T poll0(boolean take) throws InterruptedException {
-        if (consumer == null)
-            consumer = Thread.currentThread();
-
-        long readPos0 = readPos;
-
-        if (readPos0 == writePos) {
-            if (take) {
-                parked = true;
-
-                try {
-                    for (int i = 0; readPos0 == writePos; i++) {
-                        if ((i & (SPINS_CNT - 1)) == 0) {
-                            LockSupport.park();
-
-                            if (Thread.interrupted())
-                                throw new InterruptedException();
-                        }
-                    }
-                }
-                finally {
-                    parked = false;
-                }
-            }
-            else
-                return null;
-        }
-
-        Item<T> item = arr[(int)(readPos0 & sizeMask)];
-
-        readPos = readPos0 + 1;
-
-        return item.item(readPos0);
-    }
-
-    /**
-     * @return Head element or blocks until buffer is not empty.
-     * @throws InterruptedException If interrupted.
-     */
-    public T take() throws InterruptedException {
-        return poll0(true);
-    }
-
-    /**
-     * @return Size.
-     */
-    public int size() {
-        return (int)(writePos - readPos);
-    }
-
-    /**
-     * @param t Element to put.
-     * @return Current size.
-     */
-    public int add(T t) {
-        long writePos0;
-
-        for (;;) {
-            writePos0 = writePos;
-
-            if (writePosUpd.compareAndSet(this, writePos0, writePos0 + 1))
-                break;
-        }
-
-        Item<T> item = arr[(int)(writePos0 & sizeMask)];
-
-        item.update(writePos0, arr.length, t);
-
-        if (parked)
-            LockSupport.unpark(consumer);
-
-        return (int)(writePos0 + 1 - readPos);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return SingleConsumerSpinCircularBuffer.class.toString();
-    }
-
-    /**
-     *
-     */
-    private static class Item<V> {
-        /** */
-        private volatile long idx;
-
-        /** */
-        private V item;
-
-        /** Padding. */
-        @SuppressWarnings("unused")
-        private long p01, p02, p03, p04, p05, p06;
-
-        /**
-         *
-         */
-        Item(long idx) {
-            this.idx = idx;
-        }
-
-        /**
-         * @return Item.
-         */
-        V item(long readPos) {
-            for (;;) {
-                if (idx == readPos) {
-                    V item1 = this.item;
-
-                    idx = -readPos;
-
-                    return item1;
-                }
-            }
-        }
-
-        /**
-         * @param writePos Index.
-         * @param newItem Item.
-         */
-        void update(long writePos, long diff, V newItem) {
-            for (;;) {
-                if (idx == -(writePos - diff))
-                    break;
-            }
-
-            item = newItem;
-
-            idx = writePos;
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized String toString() {
-            return "Item [idx=" + idx + ']';
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf19932d/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index e4d89c4..2290164 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -17,58 +17,83 @@
 
 package org.apache.ignite.internal.util;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Striped executor.
  */
-public class StripedExecutor {
-    /** Count. */
-    private final int cnt;
-
+public class StripedExecutor implements ExecutorService {
     /** Stripes. */
     private final Stripe[] stripes;
 
-    /** */
-    private volatile boolean inited;
-
     /**
      * Constructor.
      *
      * @param cnt Count.
      */
-    public StripedExecutor(int cnt) {
-        this.cnt = cnt;
+    public StripedExecutor(int cnt, String gridName, String poolName, IgniteLogger log) {
+        boolean success = false;
 
         stripes = new Stripe[cnt];
 
-        for (int i = 0; i < cnt; i++) {
-            Stripe stripe = new StripeConcurrentQueue();
+        try {
+            for (int i = 0; i < cnt; i++) {
+                stripes[i] = new StripeConcurrentQueue(
+                    gridName,
+                    poolName,
+                    i,
+                    log);
 
-            stripes[i] = stripe;
+                stripes[i].start();
+            }
 
-            stripe.start(i);
+            success = true;
         }
+        catch (Error | RuntimeException e) {
+            U.error(log, "Failed to initialize striped pool.", e);
 
-        // TODO
-        System.out.println("Stripes [cls=" + stripes[0].getClass().getSimpleName() + ", cnt="
+ cnt + ']');
+            throw e;
+        }
+        finally {
+            if (!success) {
+                for (Stripe stripe : stripes) {
+                    if (stripe != null)
+                        stripe.signalStop();
+                }
 
-        inited = true;
+                for (Stripe stripe : stripes) {
+                    if (stripe != null)
+                        stripe.awaitStop();
+                }
+            }
+        }
     }
 
     /**
      * @return Stripes count.
      */
     public int stripes() {
-        return cnt;
+        return stripes.length;
     }
 
     /**
@@ -78,19 +103,84 @@ public class StripedExecutor {
      * @param cmd Command.
      */
     public void execute(int idx, Runnable cmd) {
-        stripes[idx % cnt].execute(cmd);
+        if (idx == -1)
+            execute(cmd);
+        else
+            stripes[idx % stripes.length].execute(cmd);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void shutdown() {
+        signalStop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void execute(@NotNull Runnable cmd) {
+        stripes[ThreadLocalRandom.current().nextInt(stripes.length)].execute(cmd);
     }
 
     /**
-     * Stop executor.
+     * {@inheritDoc}
+     *
+     * @return Empty list (always).
+     */
+    @NotNull @Override public List<Runnable> shutdownNow() {
+        signalStop();
+
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean awaitTermination(
+        long timeout,
+        @NotNull TimeUnit unit
+    ) throws InterruptedException {
+        awaitStop();
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isShutdown() {
+        for (Stripe stripe : stripes) {
+            if (stripe != null && stripe.stopping)
+                return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isTerminated() {
+        for (Stripe stripe : stripes) {
+            if (stripe.thread.getState() != Thread.State.TERMINATED)
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Stops executor.
      */
     public void stop() {
-        for (; !inited; )
-            ;
+        signalStop();
+
+        awaitStop();
+    }
 
+    /**
+     * Signals all stripes.
+     */
+    private void signalStop() {
         for (Stripe stripe : stripes)
             stripe.signalStop();
+    }
 
+    /**
+     * @throws IgniteInterruptedException If interrupted.
+     */
+    private void awaitStop() throws IgniteInterruptedException {
         for (Stripe stripe : stripes)
             stripe.awaitStop();
     }
@@ -101,39 +191,137 @@ public class StripedExecutor {
     public void dumpStats(IgniteLogger log) {
         StringBuilder sb = new StringBuilder("Stats ");
 
-        for (int i = 0; i < stripes.length; i++) {
-            sb.append(i)
-                .append(" [cnt=").append(stripes[i].cnt)
-                .append(", qSize=").append(stripes[i].queueSize())
-                .append("]; ");
-
-            stripes[i].cnt = 0;
-        }
+        for (int i = 0; i < stripes.length; i++)
+            sb.append(i).append(" [qSize=").append(stripes[i].queueSize()).append("]; ");
 
         if (log.isInfoEnabled())
             log.info(sb.toString());
     }
 
     /**
+     * @return Return total queue size of all stripes.
+     */
+    public int queueSize() {
+        int size = 0;
+
+        for (Stripe stripe : stripes)
+            size += stripe.queueSize();
+
+        return size;
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public <T> Future<T> submit(
+        @NotNull Runnable task,
+        T result
+    ) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public Future<?> submit(@NotNull Runnable task) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public <T> Future<T> submit(@NotNull Callable<T>
task) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public <T> List<Future<T>> invokeAll(@NotNull Collection<?
extends Callable<T>> tasks)
+        throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public <T> List<Future<T>> invokeAll(
+        @NotNull Collection<? extends Callable<T>> tasks,
+        long timeout,
+        @NotNull TimeUnit unit
+    ) throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @NotNull @Override public <T> T invokeAny(@NotNull Collection<? extends Callable<T>>
tasks)
+        throws InterruptedException, ExecutionException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Operation not supported.
+     */
+    @Override public <T> T invokeAny(
+        @NotNull Collection<? extends Callable<T>> tasks,
+        long timeout,
+        @NotNull TimeUnit unit
+    ) throws InterruptedException, ExecutionException, TimeoutException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StripedExecutor.class, this);
+    }
+
+    /**
      * Stripe.
      */
     private static abstract class Stripe implements Runnable {
+        /** */
+        private final String gridName;
+
+        /** */
+        private final String poolName;
+
+        /** */
+        private final int idx;
+
+        /** */
+        private final IgniteLogger log;
+
         /** Stopping flag. */
         private volatile boolean stopping;
 
         /** Thread executing the loop. */
         protected Thread thread;
 
-        /** */
-        private volatile long cnt;
-
         /**
-         * Start the stripe.
+         * @param gridName Grid name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
          */
-        void start(int idx) {
-            thread = new Thread(this);
+        public Stripe(
+            String gridName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            this.gridName = gridName;
+            this.poolName = poolName;
+            this.idx = idx;
+            this.log = log;
+        }
 
-            thread.setName("stripe-" + idx);
+        /**
+         * Starts the stripe.
+         */
+        void start() {
+            thread = new IgniteThread(gridName, poolName + "-stripe-" + idx, this);
 
             thread.start();
         }
@@ -144,7 +332,7 @@ public class StripedExecutor {
         void signalStop() {
             stopping = true;
 
-            thread.interrupt();
+            U.interrupt(thread);
         }
 
         /**
@@ -152,7 +340,8 @@ public class StripedExecutor {
          */
         void awaitStop() {
             try {
-                thread.join();
+                if (thread != null)
+                    thread.join();
             }
             catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -168,6 +357,9 @@ public class StripedExecutor {
 
                 try {
                     cmd = take();
+
+                    if (cmd != null)
+                        cmd.run();
                 }
                 catch (InterruptedException e) {
                     stopping = true;
@@ -176,30 +368,13 @@ public class StripedExecutor {
 
                     return;
                 }
-
-                if (cmd != null) {
-                    execute0(cmd);
-
-                    cnt++;
+                catch (Throwable e) {
+                    U.error(log, "Failed to execute runnable.", e);
                 }
             }
         }
 
         /**
-         * Internal execution routine.
-         *
-         * @param cmd Command.
-         */
-        private void execute0(Runnable cmd) {
-            try {
-                cmd.run();
-            }
-            catch (Exception e) {
-                U.warn(null, "Unexpected exception in stripe loop.", e);
-            }
-        }
-
-        /**
          * Execute the command.
          *
          * @param cmd Command.
@@ -226,34 +401,6 @@ public class StripedExecutor {
     /**
      * Stripe.
      */
-    private static class StripeSpinCircularBuffer extends Stripe {
-        /** Queue. */
-        private final SingleConsumerSpinCircularBuffer<Runnable> queue = new SingleConsumerSpinCircularBuffer<>(256);
-
-        /** {@inheritDoc} */
-        @Override Runnable take() throws InterruptedException {
-            return queue.take();
-        }
-
-        /** {@inheritDoc} */
-        void execute(Runnable cmd) {
-            queue.add(cmd);
-        }
-
-        /** {@inheritDoc} */
-        @Override int queueSize() {
-            return queue.size();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(StripeSpinCircularBuffer.class, this, super.toString());
-        }
-    }
-
-    /**
-     * Stripe.
-     */
     private static class StripeConcurrentQueue extends Stripe {
         /** Queue. */
         private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
@@ -261,6 +408,24 @@ public class StripedExecutor {
         /** */
         private volatile boolean parked;
 
+        /**
+         * @param gridName Grid name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        public StripeConcurrentQueue(
+            String gridName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            super(gridName,
+                poolName,
+                idx,
+                log);
+        }
+
         /** {@inheritDoc} */
         @Override Runnable take() throws InterruptedException {
             Runnable r;
@@ -318,6 +483,24 @@ public class StripedExecutor {
         /** Queue. */
         private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
 
+        /**
+         * @param gridName Grid name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        public StripeConcurrentQueueNoPark(
+            String gridName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            super(gridName,
+                poolName,
+                idx,
+                log);
+        }
+
         /** {@inheritDoc} */
         @Override Runnable take() {
             for (;;) {
@@ -351,6 +534,24 @@ public class StripedExecutor {
         /** Queue. */
         private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
 
+        /**
+         * @param gridName Grid name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        public StripeConcurrentBlockingQueue(
+            String gridName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            super(gridName,
+                poolName,
+                idx,
+                log);
+        }
+
         /** {@inheritDoc} */
         @Override Runnable take() throws InterruptedException {
             return queue.take();

http://git-wip-us.apache.org/repos/asf/ignite/blob/cf19932d/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
index 55557dd..d173594 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
@@ -20,6 +20,7 @@ package org.apache.ignite.thread;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -62,4 +63,9 @@ public class IgniteThreadFactory implements ThreadFactory {
     @Override public Thread newThread(@NotNull Runnable r) {
         return new IgniteThread(gridName, threadName, r, idxGen.incrementAndGet());
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteThreadFactory.class, this, super.toString());
+    }
+}


Mime
View raw message