ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nizhi...@apache.org
Subject [ignite] branch master updated: IGNITE-12393: Striped thread pool queue system view. (#7084)
Date Wed, 04 Dec 2019 08:14:59 GMT
This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d72874  IGNITE-12393: Striped thread pool queue system view. (#7084)
6d72874 is described below

commit 6d72874067bf3731817bba2213d9d20f3c06577c
Author: Nikolay <nizhikov@apache.org>
AuthorDate: Wed Dec 4 11:14:42 2019 +0300

    IGNITE-12393: Striped thread pool queue system view. (#7084)
---
 .../internal/jdbc2/JdbcMetadataSelfTest.java       |   4 +-
 .../ignite/jdbc/thin/JdbcThinMetadataSelfTest.java |  14 +-
 .../SystemViewRowAttributeWalkerGenerator.java     |   2 +
 .../org/apache/ignite/internal/IgniteKernal.java   |   2 +
 .../internal/StripedExecutorMXBeanAdapter.java     |   2 +-
 .../managers/systemview/GridSystemViewManager.java |  38 +++++
 .../walker/StripedExecutorTaskViewWalker.java      |  50 +++++++
 .../GridCacheDatabaseSharedManager.java            |   8 +-
 .../processors/metric/GridMetricManager.java       |   2 +-
 .../ignite/internal/util/StripedExecutor.java      | 157 ++++-----------------
 .../systemview/view/StripedExecutorTaskView.java   |  68 +++++++++
 .../ignite/internal/metric/JmxExporterSpiTest.java |  63 ++++++++-
 .../ignite/internal/metric/SystemViewSelfTest.java |  95 +++++++++++++
 .../cache/metric/SqlViewExporterSpiTest.java       |  63 ++++++++-
 14 files changed, 430 insertions(+), 138 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java
index ba690c5..61d9a27 100755
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMetadataSelfTest.java
@@ -339,7 +339,9 @@ public class JdbcMetadataSelfTest extends GridCommonAbstractTest {
             "VIEWS",
             "TABLE_COLUMNS",
             "VIEW_COLUMNS",
-            "CONTINUOUS_QUERIES"
+            "CONTINUOUS_QUERIES",
+            "STRIPED_THREADPOOL_QUEUE",
+            "DATASTREAM_THREADPOOL_QUEUE"
         ));
 
         Set<String> actViews = new HashSet<>();
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
index 7b3acf1..2b80482 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
@@ -349,7 +349,9 @@ public class JdbcThinMetadataSelfTest extends JdbcThinAbstractSelfTest
{
                 "SYS.VIEWS",
                 "SYS.TABLE_COLUMNS",
                 "SYS.VIEW_COLUMNS",
-                "SYS.CONTINUOUS_QUERIES"
+                "SYS.CONTINUOUS_QUERIES",
+                "SYS.STRIPED_THREADPOOL_QUEUE",
+                "SYS.DATASTREAM_THREADPOOL_QUEUE"
             ))
         );
     }
@@ -852,7 +854,15 @@ public class JdbcThinMetadataSelfTest extends JdbcThinAbstractSelfTest
{
                 "SYS.CONTINUOUS_QUERIES.ROUTINE_ID.null.2147483647",
                 "SYS.CONTINUOUS_QUERIES.REMOTE_FILTER.null.2147483647",
                 "SYS.CONTINUOUS_QUERIES.CACHE_NAME.null.2147483647",
-                "SYS.CONTINUOUS_QUERIES.LOCAL_LISTENER.null.2147483647"
+                "SYS.CONTINUOUS_QUERIES.LOCAL_LISTENER.null.2147483647",
+                "SYS.STRIPED_THREADPOOL_QUEUE.STRIPE_INDEX.null.10",
+                "SYS.STRIPED_THREADPOOL_QUEUE.DESCRIPTION.null.2147483647",
+                "SYS.STRIPED_THREADPOOL_QUEUE.THREAD_NAME.null.2147483647",
+                "SYS.STRIPED_THREADPOOL_QUEUE.TASK_NAME.null.2147483647",
+                "SYS.DATASTREAM_THREADPOOL_QUEUE.STRIPE_INDEX.null.10",
+                "SYS.DATASTREAM_THREADPOOL_QUEUE.DESCRIPTION.null.2147483647",
+                "SYS.DATASTREAM_THREADPOOL_QUEUE.THREAD_NAME.null.2147483647",
+                "SYS.DATASTREAM_THREADPOOL_QUEUE.TASK_NAME.null.2147483647"
             ));
 
             Assert.assertEquals(expectedCols, actualSystemCols);
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
b/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
index 1178a26..1fa96c6 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/SystemViewRowAttributeWalkerGenerator.java
@@ -51,6 +51,7 @@ import org.apache.ignite.spi.systemview.view.SqlTableColumnView;
 import org.apache.ignite.spi.systemview.view.SqlTableView;
 import org.apache.ignite.spi.systemview.view.SqlViewColumnView;
 import org.apache.ignite.spi.systemview.view.SqlViewView;
+import org.apache.ignite.spi.systemview.view.StripedExecutorTaskView;
 import org.apache.ignite.spi.systemview.view.SystemView;
 import org.apache.ignite.spi.systemview.view.SystemViewRowAttributeWalker;
 import org.apache.ignite.spi.systemview.view.TransactionView;
@@ -94,6 +95,7 @@ public class SystemViewRowAttributeWalkerGenerator {
         gen.generateAndWrite(ScanQueryView.class, DFLT_SRC_DIR);
         gen.generateAndWrite(SqlQueryView.class, DFLT_SRC_DIR);
         gen.generateAndWrite(SqlQueryHistoryView.class, DFLT_SRC_DIR);
+        gen.generateAndWrite(StripedExecutorTaskView.class, DFLT_SRC_DIR);
 
         gen.generateAndWrite(SqlSchemaView.class, INDEXING_SRC_DIR);
         gen.generateAndWrite(SqlTableView.class, INDEXING_SRC_DIR);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 42ad529..c44d2ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1376,6 +1376,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
                 mgmtExecSvc, igfsExecSvc, dataStreamExecSvc, restExecSvc, affExecSvc, idxExecSvc,
callbackExecSvc,
                 qryExecSvc, schemaExecSvc, rebalanceExecSvc, rebalanceStripedExecSvc, customExecSvcs,
ctx.workersRegistry());
 
+            ctx.systemView().registerThreadPools(stripedExecSvc, dataStreamExecSvc);
+
             // Lifecycle bean notifications.
             notifyLifecycleBeans(AFTER_NODE_START);
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
index f565d4e4..fc94a7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/StripedExecutorMXBeanAdapter.java
@@ -51,7 +51,7 @@ public class StripedExecutorMXBeanAdapter implements StripedExecutorMXBean
{
 
     /** {@inheritDoc} */
     @Override public int getStripesCount() {
-        return exec.stripes();
+        return exec.stripesCount();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/GridSystemViewManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/GridSystemViewManager.java
index ff45466..036717c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/GridSystemViewManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/GridSystemViewManager.java
@@ -40,7 +40,10 @@ import org.apache.ignite.internal.managers.systemview.walker.ComputeTaskViewWalk
 import org.apache.ignite.internal.managers.systemview.walker.ContinuousQueryViewWalker;
 import org.apache.ignite.internal.managers.systemview.walker.ScanQueryViewWalker;
 import org.apache.ignite.internal.managers.systemview.walker.ServiceViewWalker;
+import org.apache.ignite.internal.managers.systemview.walker.StripedExecutorTaskViewWalker;
 import org.apache.ignite.internal.managers.systemview.walker.TransactionViewWalker;
+import org.apache.ignite.internal.util.StripedExecutor;
+import org.apache.ignite.internal.util.StripedExecutor.Stripe;
 import org.apache.ignite.spi.systemview.ReadOnlySystemViewRegistry;
 import org.apache.ignite.spi.systemview.SystemViewExporterSpi;
 import org.apache.ignite.spi.systemview.view.CacheGroupView;
@@ -51,12 +54,14 @@ import org.apache.ignite.spi.systemview.view.ComputeTaskView;
 import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
 import org.apache.ignite.spi.systemview.view.ScanQueryView;
 import org.apache.ignite.spi.systemview.view.ServiceView;
+import org.apache.ignite.spi.systemview.view.StripedExecutorTaskView;
 import org.apache.ignite.spi.systemview.view.SystemView;
 import org.apache.ignite.spi.systemview.view.SystemViewRowAttributeWalker;
 import org.apache.ignite.spi.systemview.view.TransactionView;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
 import static org.apache.ignite.internal.util.IgniteUtils.notifyListeners;
 
 /**
@@ -67,6 +72,18 @@ import static org.apache.ignite.internal.util.IgniteUtils.notifyListeners;
  */
 public class GridSystemViewManager extends GridManagerAdapter<SystemViewExporterSpi>
     implements ReadOnlySystemViewRegistry {
+    /** Name of the system view for a system {@link StripedExecutor} queue view. */
+    public static final String SYS_POOL_QUEUE_VIEW = metricName("striped", "threadpool",
"queue");
+
+    /** Description of the system view for a system {@link StripedExecutor} queue view. */
+    public static final String SYS_POOL_QUEUE_VIEW_DESC = "Striped thread pool task queue";
+
+    /** Name of the system view for a data streamer {@link StripedExecutor} queue view. */
+    public static final String STREAM_POOL_QUEUE_VIEW = metricName("datastream", "threadpool",
"queue");
+
+    /** Description of the system view for a data streamer {@link StripedExecutor} queue
view. */
+    public static final String STREAM_POOL_QUEUE_VIEW_DESC = "Datastream thread pool task
queue";
+
     /** Registered system views. */
     private final ConcurrentHashMap<String, SystemView<?>> systemViews = new
ConcurrentHashMap<>();
 
@@ -91,6 +108,7 @@ public class GridSystemViewManager extends GridManagerAdapter<SystemViewExporter
         registerWalker(ContinuousQueryView.class, new ContinuousQueryViewWalker());
         registerWalker(ClusterNodeView.class, new ClusterNodeViewWalker());
         registerWalker(ScanQueryView.class, new ScanQueryViewWalker());
+        registerWalker(StripedExecutorTaskView.class, new StripedExecutorTaskViewWalker());
     }
 
     /** {@inheritDoc} */
@@ -107,6 +125,26 @@ public class GridSystemViewManager extends GridManagerAdapter<SystemViewExporter
     }
 
     /**
+     * Registers system views for a striped thread pools.
+     *
+     * @param stripedExecSvc Striped executor.
+     * @param dataStreamExecSvc Data streamer executor service.
+     */
+    public void registerThreadPools(StripedExecutor stripedExecSvc, StripedExecutor dataStreamExecSvc)
{
+        ctx.systemView().registerInnerCollectionView(SYS_POOL_QUEUE_VIEW, SYS_POOL_QUEUE_VIEW_DESC,
+            StripedExecutorTaskView.class,
+            Arrays.asList(stripedExecSvc.stripes()),
+            Stripe::queue,
+            StripedExecutorTaskView::new);
+
+        ctx.systemView().registerInnerCollectionView(STREAM_POOL_QUEUE_VIEW, STREAM_POOL_QUEUE_VIEW_DESC,
+            StripedExecutorTaskView.class,
+            Arrays.asList(dataStreamExecSvc.stripes()),
+            Stripe::queue,
+            StripedExecutorTaskView::new);
+    }
+
+    /**
      * Registers {@link SystemView} instance.
      *
      * @param sysView System view.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/StripedExecutorTaskViewWalker.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/StripedExecutorTaskViewWalker.java
new file mode 100644
index 0000000..03d1eeb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/walker/StripedExecutorTaskViewWalker.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.systemview.walker;
+
+import org.apache.ignite.spi.systemview.view.StripedExecutorTaskView;
+import org.apache.ignite.spi.systemview.view.SystemViewRowAttributeWalker;
+
+/**
+ * Generated by {@code org.apache.ignite.codegen.SystemViewRowAttributeWalkerGenerator}.
+ * {@link StripedExecutorTaskView} attributes walker.
+ * 
+ * @see StripedExecutorTaskView
+ */
+public class StripedExecutorTaskViewWalker implements SystemViewRowAttributeWalker<StripedExecutorTaskView>
{
+    /** {@inheritDoc} */
+    @Override public void visitAll(AttributeVisitor v) {
+        v.accept(0, "stripeIndex", int.class);
+        v.accept(1, "description", String.class);
+        v.accept(2, "threadName", String.class);
+        v.accept(3, "taskName", String.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void visitAll(StripedExecutorTaskView row, AttributeWithValueVisitor
v) {
+        v.acceptInt(0, "stripeIndex", row.stripeIndex());
+        v.accept(1, "description", String.class, row.description());
+        v.accept(2, "threadName", String.class, row.threadName());
+        v.accept(3, "taskName", String.class, row.taskName());
+    }
+
+    /** {@inheritDoc} */
+    @Override public int count() {
+        return 4;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index aaf4a44..2eeb330 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2437,7 +2437,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      */
     private int semaphorePertmits(StripedExecutor exec) {
         // 4 task per-stripe by default.
-        int permits = exec.stripes() * 4;
+        int permits = exec.stripesCount() * 4;
 
         long maxMemory = Runtime.getRuntime().maxMemory();
 
@@ -2515,7 +2515,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         assert exec != null;
         assert semaphore != null;
 
-        int stripes = exec.stripes();
+        int stripes = exec.stripesCount();
 
         int stripe = U.stripeIdx(stripes, grpId, partId);
 
@@ -3052,7 +3052,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         // Sort and split all dirty pages set to several stripes.
         GridMultiCollectionWrapper<FullPageId> pages = splitAndSortCpPagesIfNeeded(
-            new IgniteBiTuple<>(res, pagesNum), exec.stripes());
+            new IgniteBiTuple<>(res, pagesNum), exec.stripesCount());
 
         // Identity stores set for future fsync.
         Collection<PageStore> updStores = new GridConcurrentHashSet<>();
@@ -3064,7 +3064,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
         for (int i = 0; i < pages.collectionsSize(); i++) {
             // Calculate stripe index.
-            int stripeIdx = i % exec.stripes();
+            int stripeIdx = i % exec.stripesCount();
 
             // Inner collection index.
             int innerIdx = i;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
index 88a7609..c8b8e24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/GridMetricManager.java
@@ -443,7 +443,7 @@ public class GridMetricManager extends GridManagerAdapter<MetricExporterSpi>
imp
             "True if possible starvation in striped pool is detected.");
 
         mreg.register("StripesCount",
-            svc::stripes,
+            svc::stripesCount,
             "Stripes count.");
 
         mreg.register("Shutdown",
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 0a5ec6a..bf5220a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -31,7 +30,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -182,11 +180,18 @@ public class StripedExecutor implements ExecutorService {
     /**
      * @return Stripes count.
      */
-    public int stripes() {
+    public int stripesCount() {
         return stripes.length;
     }
 
     /**
+     * @return Stripes of this executor.
+     */
+    public Stripe[] stripes() {
+        return stripes;
+    }
+
+    /**
      * Execute command.
      *
      * @param idx Index.
@@ -306,7 +311,7 @@ public class StripedExecutor implements ExecutorService {
      * @return Completed tasks per stripe count.
      */
     public long[] stripesCompletedTasks() {
-        long[] res = new long[stripes()];
+        long[] res = new long[stripesCount()];
 
         for (int i = 0; i < res.length; i++)
             res[i] = stripes[i].completedCnt;
@@ -318,7 +323,7 @@ public class StripedExecutor implements ExecutorService {
      * @return Number of active tasks per stripe.
      */
     public boolean[] stripesActiveStatuses() {
-        boolean[] res = new boolean[stripes()];
+        boolean[] res = new boolean[stripesCount()];
 
         for (int i = 0; i < res.length; i++)
             res[i] = stripes[i].active;
@@ -344,7 +349,7 @@ public class StripedExecutor implements ExecutorService {
      * @return Size of queue per stripe.
      */
     public int[] stripesQueueSizes() {
-        int[] res = new int[stripes()];
+        int[] res = new int[stripesCount()];
 
         for (int i = 0; i < res.length; i++)
             res[i] = stripes[i].queueSize();
@@ -425,11 +430,11 @@ public class StripedExecutor implements ExecutorService {
         CountDownLatch awaitLatch;
 
         if (stripes.length == 0) {
-            awaitLatch = new CountDownLatch(stripes());
+            awaitLatch = new CountDownLatch(stripesCount());
 
             // We have to ensure that all asynchronous updates are done.
             // StripedExecutor guarantees ordering inside stripe - it would enough to await
"finishing" tasks.
-            range(0, stripes()).forEach(idx -> execute(idx, awaitLatch::countDown));
+            range(0, stripesCount()).forEach(idx -> execute(idx, awaitLatch::countDown));
         }
         else {
             awaitLatch = new CountDownLatch(stripes.length);
@@ -445,7 +450,7 @@ public class StripedExecutor implements ExecutorService {
                 U.log(log, "Await stripes executor complete tasks" +
                     ", awaitLatch=" + awaitLatch.getCount() +
                     ", stripes=" + (stripes.length == 0 ?
-                    Arrays.toString(range(0, stripes()).toArray()) : Arrays.toString(stripes))
+
+                    Arrays.toString(range(0, stripesCount()).toArray()) : Arrays.toString(stripes))
+
                     ", queueSize=" + Arrays.toString(stripesQueueSizes()) +
                     ", activeStatus=" + Arrays.toString(stripesActiveStatuses()));
             }
@@ -460,7 +465,7 @@ public class StripedExecutor implements ExecutorService {
     /**
      * Stripe.
      */
-    private abstract static class Stripe extends GridWorker {
+    public abstract static class Stripe extends GridWorker {
         /** */
         private final String igniteInstanceName;
 
@@ -598,6 +603,18 @@ public class StripedExecutor implements ExecutorService {
          */
         abstract String queueToString();
 
+        /**
+         * @return Stripe queue.
+         */
+        public abstract Queue<Runnable> queue();
+
+        /**
+         * @return Stripe index.
+         */
+        public int index() {
+            return idx;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(Stripe.class, this);
@@ -742,60 +759,8 @@ public class StripedExecutor implements ExecutorService {
         }
 
         /** {@inheritDoc} */
-        @Override int queueSize() {
-            return queue.size();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(StripeConcurrentQueue.class, this, super.toString());
-        }
-    }
-
-    /**
-     * Stripe.
-     */
-    private static class StripeConcurrentQueueNoPark extends Stripe {
-        /** Queue. */
-        private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
-
-        /**
-         * @param igniteInstanceName Ignite instance name.
-         * @param poolName Pool name.
-         * @param idx Stripe index.
-         * @param log Logger.
-         * @param errHnd Critical failure handler.
-         * @param gridWorkerLsnr listener to link with stripe worker.
-         */
-        public StripeConcurrentQueueNoPark(
-            String igniteInstanceName,
-            String poolName,
-            int idx,
-            IgniteLogger log,
-            IgniteInClosure<Throwable> errHnd,
-            GridWorkerListener gridWorkerLsnr
-        ) {
-            super(igniteInstanceName,
-                poolName,
-                idx,
-                log,
-                errHnd,
-                gridWorkerLsnr);
-        }
-
-        /** {@inheritDoc} */
-        @Override Runnable take() {
-            for (;;) {
-                Runnable r = queue.poll();
-
-                if (r != null)
-                    return r;
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override void execute(Runnable cmd) {
-            queue.add(cmd);
+        @Override public Queue<Runnable> queue() {
+            return queue;
         }
 
         /** {@inheritDoc} */
@@ -804,70 +769,8 @@ public class StripedExecutor implements ExecutorService {
         }
 
         /** {@inheritDoc} */
-        @Override String queueToString() {
-            return String.valueOf(queue);
-        }
-
-        /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(StripeConcurrentQueueNoPark.class, this, super.toString());
-        }
-    }
-
-    /**
-     * Stripe.
-     */
-    private static class StripeConcurrentBlockingQueue extends Stripe {
-        /** Queue. */
-        private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
-
-        /**
-         * @param igniteInstanceName Ignite instance name.
-         * @param poolName Pool name.
-         * @param idx Stripe index.
-         * @param log Logger.
-         * @param errHnd Critical failure handler.
-         * @param gridWorkerLsnr listener to link with stripe worker.
-         */
-        public StripeConcurrentBlockingQueue(
-            String igniteInstanceName,
-            String poolName,
-            int idx,
-            IgniteLogger log,
-            IgniteInClosure<Throwable> errHnd,
-            GridWorkerListener gridWorkerLsnr
-        ) {
-            super(igniteInstanceName,
-                poolName,
-                idx,
-                log,
-                errHnd,
-                gridWorkerLsnr);
-        }
-
-        /** {@inheritDoc} */
-        @Override Runnable take() throws InterruptedException {
-            return queue.take();
-        }
-
-        /** {@inheritDoc} */
-        @Override void execute(Runnable cmd) {
-            queue.add(cmd);
-        }
-
-        /** {@inheritDoc} */
-        @Override int queueSize() {
-            return queue.size();
-        }
-
-        /** {@inheritDoc} */
-        @Override String queueToString() {
-            return String.valueOf(queue);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString());
+            return S.toString(StripeConcurrentQueue.class, this, super.toString());
         }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/StripedExecutorTaskView.java
b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/StripedExecutorTaskView.java
new file mode 100644
index 0000000..51f3979
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/StripedExecutorTaskView.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.systemview.view;
+
+import org.apache.ignite.internal.managers.systemview.walker.Order;
+import org.apache.ignite.internal.util.StripedExecutor;
+import org.apache.ignite.internal.util.StripedExecutor.Stripe;
+
+import static org.apache.ignite.internal.util.IgniteUtils.toStringSafe;
+
+/**
+ * {@link StripedExecutor} task representation for a {@link SystemView}.
+ */
+public class StripedExecutorTaskView {
+    /** Stripe. */
+    private final Stripe stripe;
+
+    /** Task */
+    private final Runnable task;
+
+    /**
+     * @param stripe Stripe.
+     * @param task Task.
+     */
+    public StripedExecutorTaskView(Stripe stripe, Runnable task) {
+        this.stripe = stripe;
+        this.task = task;
+    }
+
+    /** @return Stripe index for task. */
+    @Order
+    public int stripeIndex() {
+        return stripe.index();
+    }
+
+    /** @return Task class name. */
+    @Order(3)
+    public String taskName() {
+        return task.getClass().getName();
+    }
+
+    /** @return Task {@code toString} representation. */
+    @Order(1)
+    public String description() {
+        return toStringSafe(task);
+    }
+
+    /** @return Name of the {@link Thread} executing the {@link #task}. */
+    @Order(2)
+    public String threadName() {
+        return stripe.name();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/JmxExporterSpiTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/metric/JmxExporterSpiTest.java
index 6739964..f9222fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/metric/JmxExporterSpiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/JmxExporterSpiTest.java
@@ -60,11 +60,13 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.client.thin.ProtocolVersion;
 import org.apache.ignite.internal.metric.SystemViewSelfTest.TestPredicate;
+import org.apache.ignite.internal.metric.SystemViewSelfTest.TestRunnable;
 import org.apache.ignite.internal.metric.SystemViewSelfTest.TestTransformer;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.HistogramMetric;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext;
 import org.apache.ignite.internal.processors.service.DummyService;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
@@ -75,6 +77,8 @@ import org.junit.Test;
 
 import static java.util.Arrays.stream;
 import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.managers.systemview.GridSystemViewManager.STREAM_POOL_QUEUE_VIEW;
+import static org.apache.ignite.internal.managers.systemview.GridSystemViewManager.SYS_POOL_QUEUE_VIEW;
 import static org.apache.ignite.internal.managers.systemview.ScanQuerySystemView.SCAN_QRY_SYS_VIEW;
 import static org.apache.ignite.internal.metric.SystemViewSelfTest.TEST_PREDICATE;
 import static org.apache.ignite.internal.metric.SystemViewSelfTest.TEST_TRANSFORMER;
@@ -720,7 +724,7 @@ public class JmxExporterSpiTest extends AbstractExporterSpiTest {
             qryRes1.close();
             qryRes2.close();
 
-            boolean res = waitForCondition(() -> qrySysView0.isEmpty(), 5_000);
+            boolean res = waitForCondition(() -> systemView(ignite, SCAN_QRY_SYS_VIEW).isEmpty(),
5_000);
 
             assertTrue(res);
         }
@@ -796,6 +800,63 @@ public class JmxExporterSpiTest extends AbstractExporterSpiTest {
     }
 
     /** */
+    @Test
+    public void testSysStripedExecutor() throws Exception {
+        checkStripeExecutorView(ignite.context().getStripedExecutorService(),
+            SYS_POOL_QUEUE_VIEW,
+            "sys");
+    }
+
+    /** */
+    @Test
+    public void testStreamerStripedExecutor() throws Exception {
+        checkStripeExecutorView(ignite.context().getDataStreamerExecutorService(),
+            STREAM_POOL_QUEUE_VIEW,
+            "data-streamer");
+    }
+
+    /**
+     * Checks striped executor system view.
+     *
+     * @param execSvc Striped executor.
+     * @param viewName System view.
+     * @param poolName Executor name.
+     */
+    private void checkStripeExecutorView(StripedExecutor execSvc, String viewName, String
poolName) throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        execSvc.execute(0, new TestRunnable(latch, 0));
+        execSvc.execute(0, new TestRunnable(latch, 1));
+        execSvc.execute(1, new TestRunnable(latch, 2));
+        execSvc.execute(1, new TestRunnable(latch, 3));
+
+        try {
+            boolean res = waitForCondition(() -> systemView(viewName).size() == 2, 5_000);
+
+            assertTrue(res);
+
+            TabularDataSupport view = systemView(viewName);
+
+            CompositeData row0 = view.get(new Object[] {0});
+
+            assertEquals(0, row0.get("stripeIndex"));
+            assertEquals(TestRunnable.class.getSimpleName() + '1', row0.get("description"));
+            assertEquals(poolName + "-stripe-0", row0.get("threadName"));
+            assertEquals(TestRunnable.class.getName(), row0.get("taskName"));
+
+            CompositeData row1 = view.get(new Object[] {1});
+
+            assertEquals(1, row1.get("stripeIndex"));
+            assertEquals(TestRunnable.class.getSimpleName() + '3', row1.get("description"));
+            assertEquals(poolName + "-stripe-1", row1.get("threadName"));
+            assertEquals(TestRunnable.class.getName(), row1.get("taskName"));
+        }
+        finally {
+            latch.countDown();
+        }
+    }
+
+    /** */
     private void createTestHistogram(MetricRegistry mreg) {
         long[] bounds = new long[] {50, 500};
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java
index 98cdbea..22bf15a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/metric/SystemViewSelfTest.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import javax.cache.Cache;
@@ -56,6 +57,7 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.client.thin.ProtocolVersion;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext;
 import org.apache.ignite.internal.processors.service.DummyService;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteCallable;
@@ -71,6 +73,7 @@ import org.apache.ignite.spi.systemview.view.ComputeTaskView;
 import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
 import org.apache.ignite.spi.systemview.view.ScanQueryView;
 import org.apache.ignite.spi.systemview.view.ServiceView;
+import org.apache.ignite.spi.systemview.view.StripedExecutorTaskView;
 import org.apache.ignite.spi.systemview.view.SystemView;
 import org.apache.ignite.spi.systemview.view.TransactionView;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -81,6 +84,8 @@ import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 import static org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.NODES_SYS_VIEW;
+import static org.apache.ignite.internal.managers.systemview.GridSystemViewManager.STREAM_POOL_QUEUE_VIEW;
+import static org.apache.ignite.internal.managers.systemview.GridSystemViewManager.SYS_POOL_QUEUE_VIEW;
 import static org.apache.ignite.internal.managers.systemview.ScanQuerySystemView.SCAN_QRY_SYS_VIEW;
 import static org.apache.ignite.internal.processors.cache.ClusterCachesInfo.CACHES_VIEW;
 import static org.apache.ignite.internal.processors.cache.ClusterCachesInfo.CACHE_GRPS_VIEW;
@@ -893,6 +898,66 @@ public class SystemViewSelfTest extends GridCommonAbstractTest {
     }
 
     /** */
+    @Test
+    public void testStripedExecutors() throws Exception {
+        try (IgniteEx g = startGrid(0)) {
+            checkStripeExecutorView(g.context().getStripedExecutorService(),
+                g.context().systemView().view(SYS_POOL_QUEUE_VIEW),
+                "sys");
+
+            checkStripeExecutorView(g.context().getDataStreamerExecutorService(),
+                g.context().systemView().view(STREAM_POOL_QUEUE_VIEW),
+                "data-streamer");
+        }
+    }
+
+    /**
+     * Checks striped executor system view.
+     *
+     * @param execSvc Striped executor.
+     * @param view System view.
+     * @param poolName Executor name.
+     */
+    private void checkStripeExecutorView(StripedExecutor execSvc, SystemView<StripedExecutorTaskView>
view,
+        String poolName) throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        execSvc.execute(0, new TestRunnable(latch, 0));
+        execSvc.execute(0, new TestRunnable(latch, 1));
+        execSvc.execute(1, new TestRunnable(latch, 2));
+        execSvc.execute(1, new TestRunnable(latch, 3));
+
+        try {
+            boolean res = waitForCondition(() -> view.size() == 2, 5_000);
+
+            assertTrue(res);
+
+            Iterator<StripedExecutorTaskView> iter = view.iterator();
+
+            assertTrue(iter.hasNext());
+
+            StripedExecutorTaskView row0 = iter.next();
+
+            assertEquals(0, row0.stripeIndex());
+            assertEquals(TestRunnable.class.getSimpleName() + '1', row0.description());
+            assertEquals(poolName + "-stripe-0", row0.threadName());
+            assertEquals(TestRunnable.class.getName(), row0.taskName());
+
+            assertTrue(iter.hasNext());
+
+            StripedExecutorTaskView row1 = iter.next();
+
+            assertEquals(1, row1.stripeIndex());
+            assertEquals(TestRunnable.class.getSimpleName() + '3', row1.description());
+            assertEquals(poolName + "-stripe-1", row1.threadName());
+            assertEquals(TestRunnable.class.getName(), row1.taskName());
+        }
+        finally {
+            latch.countDown();
+        }
+    }
+
+    /** */
     public static class TestPredicate implements IgniteBiPredicate<Integer, Integer>
{
         /** {@inheritDoc} */
         @Override public boolean apply(Integer integer, Integer integer2) {
@@ -925,4 +990,34 @@ public class SystemViewSelfTest extends GridCommonAbstractTest {
             return true;
         }
     }
+
+    /** Test runnable. */
+    public static class TestRunnable implements Runnable {
+        /** */
+        private final CountDownLatch latch;
+
+        /** */
+        private final int idx;
+
+        /** */
+        public TestRunnable(CountDownLatch latch, int idx) {
+            this.latch = latch;
+            this.idx = idx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                latch.await(5, TimeUnit.SECONDS);
+            }
+            catch (InterruptedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return getClass().getSimpleName() + idx;
+        }
+    }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
index bd64a7b..dc39ec2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
@@ -48,8 +48,10 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.metric.AbstractExporterSpiTest;
 import org.apache.ignite.internal.metric.SystemViewSelfTest.TestPredicate;
+import org.apache.ignite.internal.metric.SystemViewSelfTest.TestRunnable;
 import org.apache.ignite.internal.metric.SystemViewSelfTest.TestTransformer;
 import org.apache.ignite.internal.processors.service.DummyService;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.spi.metric.sql.SqlViewMetricExporterSpi;
@@ -413,7 +415,9 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
             "TABLE_COLUMNS",
             "VIEW_COLUMNS",
             "TRANSACTIONS",
-            "CONTINUOUS_QUERIES"
+            "CONTINUOUS_QUERIES",
+            "STRIPED_THREADPOOL_QUEUE",
+            "DATASTREAM_THREADPOOL_QUEUE"
         ));
 
         Set<String> actViews = new HashSet<>();
@@ -774,6 +778,63 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
         assertTrue(found1 && found2);
     }
 
+    /** */
+    @Test
+    public void testStripedExecutor() throws Exception {
+        checkStripeExecutorView(ignite0.context().getStripedExecutorService(),
+            "STRIPED_THREADPOOL_QUEUE",
+            "sys");
+    }
+
+    /** */
+    @Test
+    public void testStreamerExecutor() throws Exception {
+        checkStripeExecutorView(ignite0.context().getDataStreamerExecutorService(),
+            "DATASTREAM_THREADPOOL_QUEUE",
+            "data-streamer");
+    }
+
+    /**
+     * Checks striped executor system view.
+     *
+     * @param execSvc Striped executor.
+     * @param view System view name.
+     * @param poolName Executor name.
+     */
+    private void checkStripeExecutorView(StripedExecutor execSvc, String view, String poolName)
throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        execSvc.execute(0, new TestRunnable(latch, 0));
+        execSvc.execute(0, new TestRunnable(latch, 1));
+        execSvc.execute(1, new TestRunnable(latch, 2));
+        execSvc.execute(1, new TestRunnable(latch, 3));
+
+        try {
+            boolean res = waitForCondition(() -> execute(ignite0, "SELECT * FROM SYS."
+ view).size() == 2, 5_000);
+
+            assertTrue(res);
+
+            List<List<?>> stripedQueue = execute(ignite0, "SELECT * FROM SYS."
+ view);
+
+            List<?> row0 = stripedQueue.get(0);
+
+            assertEquals(0, row0.get(0));
+            assertEquals(TestRunnable.class.getSimpleName() + '1', row0.get(1));
+            assertEquals(poolName + "-stripe-0", row0.get(2));
+            assertEquals(TestRunnable.class.getName(), row0.get(3));
+
+            List<?> row1 = stripedQueue.get(1);
+
+            assertEquals(1, row1.get(0));
+            assertEquals(TestRunnable.class.getSimpleName() + '3', row1.get(1));
+            assertEquals(poolName + "-stripe-1", row1.get(2));
+            assertEquals(TestRunnable.class.getName(), row1.get(3));
+        }
+        finally {
+            latch.countDown();
+        }
+    }
+
     /**
      * Execute query on given node.
      *


Mime
View raw message