ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/32] incubator-ignite git commit: # Renaming
Date Fri, 05 Dec 2014 10:03:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java
new file mode 100644
index 0000000..eed6408
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java
@@ -0,0 +1,416 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.streamer.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
+
+import java.util.*;
+
+/**
+ * Metrics holder.
+ *
+ * Note that for current active stages we use maximum active stages over last second.
+ */
+public class StreamerMetricsHolder implements StreamerMetrics {
+    /** Max active stages over last minute. */
+    private GridAtomicInteger stageActiveMaxLastSec = new GridAtomicInteger();
+
+    /** Last stage max update ts. */
+    private volatile long lastStageSampleTs;
+
+    /** Number of running stages. */
+    private LongAdder stageActiveCnt = new LongAdder();
+
+    /** Number of waiting stages. */
+    private LongAdder stageWaitingCnt = new LongAdder();
+
+    /** Total number of stages executed. */
+    private LongAdder stageTotalCnt = new LongAdder();
+
+    /** Max exec time. */
+    private GridAtomicLong pipelineMaxExecTime = new GridAtomicLong();
+
+    /** Min exec time. */
+    private GridAtomicLong pipelineMinExecTime = new GridAtomicLong(Long.MAX_VALUE);
+
+    /** Pipeline average exec time sampler. */
+    private LongAdder pipelineSumExecTime = new LongAdder();
+
+    /** Max exec nodes. */
+    private GridAtomicInteger pipelineMaxExecNodes = new GridAtomicInteger();
+
+    /** Min exec nodes. */
+    private GridAtomicInteger pipelineMinExecNodes = new GridAtomicInteger(Integer.MAX_VALUE);
+
+    /** Avg exec nodes. */
+    private LongAdder pipelineSumExecNodes = new LongAdder();
+
+    /** Total number of pipelines finished on this node. */
+    private LongAdder pipelineTotalCnt = new LongAdder();
+
+    /** Query max exec time. */
+    private GridAtomicLong qryMaxExecTime = new GridAtomicLong();
+
+    /** Query min exec time. */
+    private GridAtomicLong qryMinExecTime = new GridAtomicLong(Long.MAX_VALUE);
+
+    /** Query average exec time sampler. */
+    private LongAdder qrySumExecTime = new LongAdder();
+
+    /** Query max exec nodes. */
+    private GridAtomicInteger qryMaxExecNodes = new GridAtomicInteger();
+
+    /** Query min exec nodes. */
+    private GridAtomicInteger qryMinExecNodes = new GridAtomicInteger(Integer.MAX_VALUE);
+
+    /** Query avg exec nodes. */
+    private LongAdder qrySumExecNodes = new LongAdder();
+
+    /** Total number of queries finished on this node. */
+    private LongAdder qryTotalCnt = new LongAdder();
+
+    /** Current active sessions. */
+    private LongAdder curActiveSessions = new LongAdder();
+
+    /** Max active sessions. */
+    private GridAtomicInteger maxActiveSessions = new GridAtomicInteger();
+
+    /** Failures count. */
+    private LongAdder failuresCnt = new LongAdder();
+
+    /** Stages metrics. */
+    private final GridStreamerStageMetricsHolder[] stageMetrics;
+
+    /** Stage metrics map. */
+    private final Map<String, GridStreamerStageMetrics> stageMetricsMap;
+
+    /** Window metrics map. */
+    private final Map<String, GridStreamerWindowMetrics> windowMetricsMap;
+
+    /** Executor service capacity. */
+    private final int execSvcCap;
+
+    /**
+     * @param stageMetrics Array of stage metrics holders.
+     * @param windowMetrics Array of window metrics holders.
+     * @param execSvcCap Executor service capacity.
+     */
+    public StreamerMetricsHolder(
+        GridStreamerStageMetricsHolder[] stageMetrics,
+        GridStreamerWindowMetricsHolder[] windowMetrics,
+        int execSvcCap
+    ) {
+        this.execSvcCap = execSvcCap;
+        this.stageMetrics = stageMetrics;
+
+        Map<String, GridStreamerStageMetrics> map = new LinkedHashMap<>();
+
+        for (GridStreamerStageMetricsHolder holder : stageMetrics)
+            map.put(holder.name(), holder);
+
+        stageMetricsMap = Collections.unmodifiableMap(map);
+
+        Map<String, GridStreamerWindowMetrics> map0 = new LinkedHashMap<>();
+
+        for (GridStreamerWindowMetricsHolder holder : windowMetrics)
+            map0.put(holder.name(), holder);
+
+        windowMetricsMap = Collections.unmodifiableMap(map0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int stageActiveExecutionCount() {
+        return stageActiveMaxLastSec.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int stageWaitingExecutionCount() {
+        return stageWaitingCnt.intValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long stageTotalExecutionCount() {
+        return stageTotalCnt.longValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long pipelineMaximumExecutionTime() {
+        return pipelineMaxExecTime.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long pipelineMinimumExecutionTime() {
+        long min = pipelineMinExecTime.get();
+
+        return min == Long.MAX_VALUE ? 0 : min;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long pipelineAverageExecutionTime() {
+        long totalTime = pipelineSumExecTime.sum();
+
+        long execs = pipelineTotalCnt.sum();
+
+        return execs == 0 ? 0 : totalTime / execs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int pipelineMaximumExecutionNodes() {
+        return pipelineMaxExecNodes.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int pipelineMinimumExecutionNodes() {
+        int min = pipelineMinExecNodes.get();
+
+        return min == Integer.MAX_VALUE ? 0 : min;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int pipelineAverageExecutionNodes() {
+        long totalNodes = pipelineSumExecNodes.sum();
+
+        long execs = pipelineTotalCnt.sum();
+
+        return execs == 0 ? 0 : (int)(totalNodes / execs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long queryMaximumExecutionTime() {
+        return qryMaxExecTime.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long queryMinimumExecutionTime() {
+        long min = qryMinExecTime.get();
+
+        return min == Long.MAX_VALUE ? 0 : min;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long queryAverageExecutionTime() {
+        long totalTime = qrySumExecTime.sum();
+
+        long execs = qryTotalCnt.sum();
+
+        return execs == 0 ? 0 : totalTime / execs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int queryMaximumExecutionNodes() {
+        return qryMaxExecNodes.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int queryMinimumExecutionNodes() {
+        int min = qryMinExecNodes.get();
+
+        return min == Integer.MAX_VALUE ? 0 : min;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int queryAverageExecutionNodes() {
+        long totalNodes = qrySumExecNodes.sum();
+
+        long execs = qryTotalCnt.sum();
+
+        return execs == 0 ? 0 : (int)(totalNodes / execs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int currentActiveSessions() {
+        return curActiveSessions.intValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int maximumActiveSessions() {
+        return maxActiveSessions.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int failuresCount() {
+        return failuresCnt.intValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int executorServiceCapacity() {
+        return execSvcCap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridStreamerStageMetrics stageMetrics(String stageName) {
+        GridStreamerStageMetrics metrics = stageMetricsMap.get(stageName);
+
+        if (metrics == null)
+            throw new IllegalArgumentException("Streamer stage is not configured: " + stageName);
+
+        return metrics;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridStreamerStageMetrics> stageMetrics() {
+        return stageMetricsMap.values();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridStreamerWindowMetrics windowMetrics(String winName) {
+        GridStreamerWindowMetrics metrics = windowMetricsMap.get(winName);
+
+        if (metrics == null)
+            throw new IllegalArgumentException("Streamer window is not configured: " + winName);
+
+        return metrics;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridStreamerWindowMetrics> windowMetrics() {
+        return windowMetricsMap.values();
+    }
+
+    /**
+     * Stage scheduled callback.
+     */
+    public void onStageScheduled() {
+        stageWaitingCnt.increment();
+    }
+
+    /**
+     * Stage execution started callback.
+     *
+     * @param idx Stage index.
+     * @param waitTime Stage wait time.
+     */
+    public void onStageExecutionStarted(int idx, long waitTime) {
+        if (waitTime < 0)
+            waitTime = 0;
+
+        stageActiveCnt.increment();
+        stageWaitingCnt.decrement();
+
+        stageMetrics[idx].onExecutionStarted(waitTime);
+
+        sampleCurrentStages();
+    }
+
+    /**
+     * Stage execution finished callback.
+     *
+     * @param idx Stage index.
+     * @param execTime Stage execution time.
+     */
+    public void onStageExecutionFinished(int idx, long execTime) {
+        if (execTime < 0)
+            execTime = 0;
+
+        stageActiveCnt.decrement();
+
+        stageTotalCnt.increment();
+
+        stageMetrics[idx].onExecutionFinished(execTime);
+
+        sampleCurrentStages();
+    }
+
+    /**
+     * Pipeline completed callback.
+     *
+     * @param execTime Pipeline execution time.
+     * @param execNodes Pipeline execution nodes.
+     */
+    public void onPipelineCompleted(long execTime, int execNodes) {
+        if (execTime < 0)
+            execTime = 0;
+
+        pipelineMaxExecTime.setIfGreater(execTime);
+        pipelineMinExecTime.setIfLess(execTime);
+        pipelineSumExecTime.add(execTime);
+
+        pipelineMaxExecNodes.setIfGreater(execNodes);
+        pipelineMinExecNodes.setIfLess(execNodes);
+        pipelineSumExecNodes.add(execNodes);
+
+        pipelineTotalCnt.increment();
+    }
+
+    /**
+     * Query completed callback.
+     *
+     * @param execTime Query execution time.
+     * @param execNodes Query execution nodes.
+     */
+    public void onQueryCompleted(long execTime, int execNodes) {
+        if (execTime < 0)
+            execTime = 0;
+
+        qryMaxExecTime.setIfGreater(execTime);
+        qryMinExecTime.setIfLess(execTime);
+        qrySumExecTime.add(execTime);
+
+        qryMaxExecNodes.setIfGreater(execNodes);
+        qryMinExecNodes.setIfLess(execNodes);
+        qrySumExecNodes.add(execNodes);
+
+        qryTotalCnt.increment();
+    }
+
+    /**
+     * Session started callback.
+     */
+    public void onSessionStarted() {
+        curActiveSessions.increment();
+
+        maxActiveSessions.setIfGreater(curActiveSessions.intValue());
+    }
+
+    /**
+     * Session finished callback.
+     */
+    public void onSessionFinished() {
+        curActiveSessions.decrement();
+    }
+
+    /**
+     * Session failed callback.
+     */
+    public void onSessionFailed() {
+        curActiveSessions.decrement();
+
+        failuresCnt.increment();
+    }
+
+    /**
+     * Stage failure callback.
+     *
+     * @param idx Stage index.
+     */
+    public void onStageFailure(int idx) {
+        stageMetrics[idx].onFailure();
+    }
+
+    /**
+     * Samples current sessions.
+     */
+    public void sampleCurrentStages() {
+        long now = U.currentTimeMillis();
+
+        int cur = (int)stageActiveCnt.sum();
+
+        if (now - lastStageSampleTs > 1000) {
+            stageActiveMaxLastSec.set(cur);
+
+            lastStageSampleTs = now;
+        }
+        else
+            stageActiveMaxLastSec.setIfGreater(cur);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerBroadcastTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerBroadcastTask.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerBroadcastTask.java
index c33941b..d3df3fa 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerBroadcastTask.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerBroadcastTask.java
@@ -31,7 +31,7 @@ public class GridStreamerBroadcastTask extends GridPeerDeployAwareTaskAdapter<Vo
     private static final long serialVersionUID = 0L;
 
     /** Closure. */
-    private IgniteInClosure<GridStreamerContext> clo;
+    private IgniteInClosure<StreamerContext> clo;
 
     /** Streamer. */
     private String streamer;
@@ -40,7 +40,7 @@ public class GridStreamerBroadcastTask extends GridPeerDeployAwareTaskAdapter<Vo
      * @param clo Closure.
      * @param streamer Streamer.
      */
-    public GridStreamerBroadcastTask(IgniteInClosure<GridStreamerContext> clo, @Nullable String streamer) {
+    public GridStreamerBroadcastTask(IgniteInClosure<StreamerContext> clo, @Nullable String streamer) {
         super(U.peerDeployAware(clo));
 
         this.clo = clo;
@@ -84,7 +84,7 @@ public class GridStreamerBroadcastTask extends GridPeerDeployAwareTaskAdapter<Vo
         private Ignite g;
 
         /** Closure. */
-        private IgniteInClosure<GridStreamerContext> clo;
+        private IgniteInClosure<StreamerContext> clo;
 
         /** Streamer. */
         private String streamer;
@@ -100,7 +100,7 @@ public class GridStreamerBroadcastTask extends GridPeerDeployAwareTaskAdapter<Vo
          * @param clo Closure.
          * @param streamer Streamer.
          */
-        private StreamerBroadcastJob(IgniteInClosure<GridStreamerContext> clo, String streamer) {
+        private StreamerBroadcastJob(IgniteInClosure<StreamerContext> clo, String streamer) {
             this.clo = clo;
             this.streamer = streamer;
         }
@@ -124,7 +124,7 @@ public class GridStreamerBroadcastTask extends GridPeerDeployAwareTaskAdapter<Vo
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            clo = (IgniteInClosure<GridStreamerContext>)in.readObject();
+            clo = (IgniteInClosure<StreamerContext>)in.readObject();
             streamer = U.readString(in);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerQueryTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerQueryTask.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerQueryTask.java
index 63372ea..8970e51 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerQueryTask.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerQueryTask.java
@@ -31,7 +31,7 @@ public class GridStreamerQueryTask<R> extends GridPeerDeployAwareTaskAdapter<Voi
     private static final long serialVersionUID = 0L;
 
     /** Query closure. */
-    private IgniteClosure<GridStreamerContext, R> qryClos;
+    private IgniteClosure<StreamerContext, R> qryClos;
 
     /** Streamer. */
     private String streamer;
@@ -40,7 +40,7 @@ public class GridStreamerQueryTask<R> extends GridPeerDeployAwareTaskAdapter<Voi
      * @param qryClos Query closure.
      * @param streamer Streamer.
      */
-    public GridStreamerQueryTask(IgniteClosure<GridStreamerContext, R> qryClos, @Nullable String streamer) {
+    public GridStreamerQueryTask(IgniteClosure<StreamerContext, R> qryClos, @Nullable String streamer) {
         super(U.peerDeployAware(qryClos));
 
         this.qryClos = qryClos;
@@ -89,7 +89,7 @@ public class GridStreamerQueryTask<R> extends GridPeerDeployAwareTaskAdapter<Voi
         private Ignite g;
 
         /** Query closure. */
-        private IgniteClosure<GridStreamerContext, R> qryClos;
+        private IgniteClosure<StreamerContext, R> qryClos;
 
         /** Streamer. */
         private String streamer;
@@ -105,7 +105,7 @@ public class GridStreamerQueryTask<R> extends GridPeerDeployAwareTaskAdapter<Voi
          * @param qryClos Query closure.
          * @param streamer Streamer.
          */
-        private QueryJob(IgniteClosure<GridStreamerContext, R> qryClos, String streamer) {
+        private QueryJob(IgniteClosure<StreamerContext, R> qryClos, String streamer) {
             this.qryClos = qryClos;
             this.streamer = streamer;
         }
@@ -127,7 +127,7 @@ public class GridStreamerQueryTask<R> extends GridPeerDeployAwareTaskAdapter<Voi
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            qryClos = (IgniteClosure<GridStreamerContext, R>)in.readObject();
+            qryClos = (IgniteClosure<StreamerContext, R>)in.readObject();
             streamer = U.readString(in);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerReduceTask.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerReduceTask.java
index 8919a3b..48e55ce 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerReduceTask.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/task/GridStreamerReduceTask.java
@@ -32,7 +32,7 @@ public class GridStreamerReduceTask<R1, R2> extends GridPeerDeployAwareTaskAdapt
     private static final long serialVersionUID = 0L;
 
     /** Query closure. */
-    private IgniteClosure<GridStreamerContext, R1> clos;
+    private IgniteClosure<StreamerContext, R1> clos;
 
     /** Reducer. */
     private IgniteReducer<R1, R2> rdc;
@@ -45,7 +45,7 @@ public class GridStreamerReduceTask<R1, R2> extends GridPeerDeployAwareTaskAdapt
      * @param rdc Query reducer.
      * @param streamer Streamer.
      */
-    public GridStreamerReduceTask(IgniteClosure<GridStreamerContext, R1> clos, IgniteReducer<R1, R2> rdc,
+    public GridStreamerReduceTask(IgniteClosure<StreamerContext, R1> clos, IgniteReducer<R1, R2> rdc,
         @Nullable String streamer) {
         super(U.peerDeployAware(clos));
 
@@ -93,7 +93,7 @@ public class GridStreamerReduceTask<R1, R2> extends GridPeerDeployAwareTaskAdapt
         private Ignite g;
 
         /** Query closure. */
-        private IgniteClosure<GridStreamerContext, R> qryClos;
+        private IgniteClosure<StreamerContext, R> qryClos;
 
         /** Streamer. */
         private String streamer;
@@ -109,7 +109,7 @@ public class GridStreamerReduceTask<R1, R2> extends GridPeerDeployAwareTaskAdapt
          * @param qryClos Query closure.
          * @param streamer Streamer.
          */
-        private ReduceJob(IgniteClosure<GridStreamerContext, R> qryClos, String streamer) {
+        private ReduceJob(IgniteClosure<StreamerContext, R> qryClos, String streamer) {
             this.qryClos = qryClos;
             this.streamer = streamer;
         }
@@ -131,7 +131,7 @@ public class GridStreamerReduceTask<R1, R2> extends GridPeerDeployAwareTaskAdapt
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            qryClos = (IgniteClosure<GridStreamerContext, R>)in.readObject();
+            qryClos = (IgniteClosure<StreamerContext, R>)in.readObject();
             streamer = U.readString(in);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/kernal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/node/VisorNodeDataCollectorJob.java
index a6edffc..594d75e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/node/VisorNodeDataCollectorJob.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/node/VisorNodeDataCollectorJob.java
@@ -120,10 +120,10 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
     /** Collect streamers. */
     private void streamers(VisorNodeDataCollectorJobResult res) {
         try {
-            GridStreamerConfiguration[] cfgs = g.configuration().getStreamerConfiguration();
+            StreamerConfiguration[] cfgs = g.configuration().getStreamerConfiguration();
 
             if (cfgs != null) {
-                for (GridStreamerConfiguration cfg : cfgs)
+                for (StreamerConfiguration cfg : cfgs)
                     res.streamers().add(VisorStreamer.from(g.streamer(cfg.getName())));
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerConfiguration.java
index 420505f..1d09d73 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerConfiguration.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerConfiguration.java
@@ -47,7 +47,7 @@ public class VisorStreamerConfiguration implements Serializable {
      * @param scfg Streamer configuration.
      * @return Data transfer object for streamer configuration properties.
      */
-    public static VisorStreamerConfiguration from(GridStreamerConfiguration scfg) {
+    public static VisorStreamerConfiguration from(StreamerConfiguration scfg) {
         VisorStreamerConfiguration cfg = new VisorStreamerConfiguration();
 
         cfg.name(scfg.getName());
@@ -66,13 +66,13 @@ public class VisorStreamerConfiguration implements Serializable {
      * @param streamers streamer configurations.
      * @return streamer configurations properties.
      */
-    public static Iterable<VisorStreamerConfiguration> list(GridStreamerConfiguration[] streamers) {
+    public static Iterable<VisorStreamerConfiguration> list(StreamerConfiguration[] streamers) {
         if (streamers == null)
             return Collections.emptyList();
 
         final Collection<VisorStreamerConfiguration> cfgs = new ArrayList<>(streamers.length);
 
-        for (GridStreamerConfiguration streamer : streamers)
+        for (StreamerConfiguration streamer : streamers)
             cfgs.add(from(streamer));
 
         return cfgs;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
index 4c8521e..08b6efe 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
@@ -16,7 +16,7 @@ import org.gridgain.grid.util.typedef.internal.*;
 import java.io.*;
 
 /**
- * Data transfer object for {@link GridStreamerMetrics}.
+ * Data transfer object for {@link org.gridgain.grid.streamer.StreamerMetrics}.
  */
 public class VisorStreamerMetrics implements Serializable {
     /** */
@@ -77,7 +77,7 @@ public class VisorStreamerMetrics implements Serializable {
     public static VisorStreamerMetrics from(IgniteStreamer streamer) {
         assert streamer != null;
 
-        GridStreamerMetrics m = streamer.metrics();
+        StreamerMetrics m = streamer.metrics();
 
         int windowSz = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerConfiguration.java
deleted file mode 100644
index d15e906..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerConfiguration.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.tostring.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Streamer configuration.
- */
-public class GridStreamerConfiguration {
-    /** By default maximum number of concurrent sessions is unlimited. */
-    public static final int DFLT_MAX_CONCURRENT_SESSIONS = -1;
-
-    /** Default value for maximum failover attempts. */
-    public static final int DFLT_MAX_FAILOVER_ATTEMPTS = 3;
-
-    /** Name. */
-    private String name;
-
-    /** Window. */
-    private Collection<GridStreamerWindow> win;
-
-    /** Router. */
-    private GridStreamerEventRouter router;
-
-    /** Stages. */
-    @GridToStringInclude
-    private Collection<GridStreamerStage> stages;
-
-    /** At least once flag. */
-    private boolean atLeastOnce;
-
-    /** Maximum number of failover attempts. */
-    private int maxFailoverAttempts = DFLT_MAX_FAILOVER_ATTEMPTS;
-
-    /** Maximum number of concurrent sessions to be processed. */
-    private int maxConcurrentSessions = DFLT_MAX_CONCURRENT_SESSIONS;
-
-    /** Streamer executor service. */
-    private ExecutorService execSvc;
-
-    /** Executor service shutdown flag. */
-    private boolean execSvcShutdown;
-
-    /**
-     *
-     */
-    public GridStreamerConfiguration() {
-        // No-op.
-    }
-
-    /**
-     * @param c Configuration to copy.
-     */
-    public GridStreamerConfiguration(GridStreamerConfiguration c) {
-        atLeastOnce = c.isAtLeastOnce();
-        execSvc = c.getExecutorService();
-        execSvcShutdown = c.isExecutorServiceShutdown();
-        maxConcurrentSessions = c.getMaximumConcurrentSessions();
-        maxFailoverAttempts = c.getMaximumFailoverAttempts();
-        name = c.getName();
-        router = c.getRouter();
-        stages = c.getStages();
-        win = c.getWindows();
-    }
-
-    /**
-     * Gets streamer name. Must be unique within grid.
-     *
-     * @return Streamer name, if {@code null} then default streamer is returned.
-     */
-    @Nullable public String getName() {
-        return name;
-    }
-
-    /**
-     * Sets the name of the streamer.
-     *
-     * @param name Name.
-     */
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    /**
-     * Gets streamer event router.
-     *
-     * @return Event router, if {@code null} then events will be executed locally.
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable public GridStreamerEventRouter getRouter() {
-        return router;
-    }
-
-    /**
-     * Sets router for streamer.
-     *
-     * @param router Router.
-     */
-    public void setRouter(GridStreamerEventRouter router) {
-        this.router = router;
-    }
-
-    /**
-     * Gets collection of streamer event windows. At least one window should be configured. Each window
-     * must have unique name.
-     *
-     * @return Streamer windows.
-     */
-    public Collection<GridStreamerWindow> getWindows() {
-        return win;
-    }
-
-    /**
-     * Sets collection of streamer windows.
-     *
-     * @param win Window.
-     */
-    public void setWindows(Collection<GridStreamerWindow> win) {
-        this.win = win;
-    }
-
-    /**
-     * Gets collection of streamer stages. Streamer must have at least one stage to execute. Each stage
-     * must have unique name.
-     *
-     * @return Collection of streamer stages.
-     */
-    public Collection<GridStreamerStage> getStages() {
-        return stages;
-    }
-
-    /**
-     * Sets stages.
-     *
-     * @param stages Stages.
-     */
-    public void setStages(Collection<GridStreamerStage> stages) {
-        this.stages = stages;
-    }
-
-    /**
-     * Gets flag indicating whether streamer should track event execution sessions and failover event execution
-     * if any failure detected or any node on which execution happened has left the grid before successful response
-     * is received.
-     * <p>
-     * Setting this flag to {@code true} will guarantee that all pipeline stages will be executed at least once for
-     * each group of event submitted to streamer (or failure listener will be notified if failover cannot succeed).
-     * However, it does not guarantee that each stage will be executed at most once.
-     *
-     * @return {@code True} if event should be processed at least once,
-     *      or {@code false} if failures can be safely ignored.
-     */
-    public boolean isAtLeastOnce() {
-        return atLeastOnce;
-    }
-
-    /**
-     * @param atLeastOnce {@code True} to guarantee that event will be processed at least once.
-     */
-    public void setAtLeastOnce(boolean atLeastOnce) {
-        this.atLeastOnce = atLeastOnce;
-    }
-
-    /**
-     * Gets maximum number of failover attempts to try when pipeline execution has failed. This parameter
-     * is ignored if {@link #isAtLeastOnce()} is set to {@code false}.
-     * <p>
-     * If not set, default value is
-     *
-     * @return Maximum number of failover attempts to try.
-     */
-    public int getMaximumFailoverAttempts() {
-        return maxFailoverAttempts;
-    }
-
-    /**
-     * Sets maximum number of failover attempts.
-
-     * @param maxFailoverAttempts Maximum number of failover attempts.
-     * @see #getMaximumFailoverAttempts()
-     */
-    public void setMaximumFailoverAttempts(int maxFailoverAttempts) {
-        this.maxFailoverAttempts = maxFailoverAttempts;
-    }
-
-    /**
-     * Gets maximum number of concurrent events to be processed by streamer. This property is taken into
-     * account when {@link #isAtLeastOnce()} is set to {@code true}. If not positive, number of sessions
-     * will not be limited by any value.
-     *
-     * @return Maximum number of concurrent events to be processed. If number of concurrent events is greater
-     *      then this value, caller will be blocked until enough responses are received.
-     */
-    public int getMaximumConcurrentSessions() {
-        return maxConcurrentSessions;
-    }
-
-    /**
-     * Sets maximum number of concurrent sessions.
-     *
-     * @param maxConcurrentSessions Maximum number of concurrent sessions.
-     * @see #getMaximumConcurrentSessions()
-     */
-    public void setMaximumConcurrentSessions(int maxConcurrentSessions) {
-        this.maxConcurrentSessions = maxConcurrentSessions;
-    }
-
-    /**
-     * Gets streamer executor service. Defines a thread pool in which streamer stages will be executed.
-     * <p>
-     * If not specified, thread pool executor with max pool size equal to number of cores will be created.
-     *
-     * @return Streamer executor service.
-     */
-    public ExecutorService getExecutorService() {
-        return execSvc;
-    }
-
-    /**
-     * Sets streamer executor service.
-     *
-     * @param execSvc Executor service to use.
-     * @see #getExecutorService()
-     */
-    public void setExecutorService(ExecutorService execSvc) {
-        this.execSvc = execSvc;
-    }
-
-    /**
-     * Flag indicating whether streamer executor service should be shut down on GridGain stop. If this flag
-     * is {@code true}, executor service will be shut down regardless of whether executor was specified externally
-     * or it was created by GridGain.
-     *
-     * @return {@code True} if executor service should be shut down on GridGain stop.
-     */
-    public boolean isExecutorServiceShutdown() {
-        return execSvcShutdown;
-    }
-
-    /**
-     * Sets flag indicating whether executor service should be shut down on GridGain stop.
-     *
-     * @param execSvcShutdown {@code True} if executor service should be shut down on GridGain stop.
-     * @see #isExecutorServiceShutdown()
-     */
-    public void setExecutorServiceShutdown(boolean execSvcShutdown) {
-        this.execSvcShutdown = execSvcShutdown;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridStreamerConfiguration.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerContext.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerContext.java
deleted file mode 100644
index 6cdf293..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerContext.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Streamer context. Provides access to streamer local store, configured windows and various methods
- * to run streamer queries.
- */
-public interface GridStreamerContext {
-    /**
-     * Gets instance of dynamic grid projection including all nodes on which this streamer is running.
-     *
-     * @return Projection with all nodes on which streamer is configured.
-     */
-    public ClusterGroup projection();
-
-    /**
-     * Gets streamer local space. Note that all updates to this space will be local.
-     *
-     * @return Streamer local space.
-     */
-    public <K, V> ConcurrentMap<K, V> localSpace();
-
-    /**
-     * Gets default event window, i.e. window that is on the first place in streamer configuration.
-     *
-     * @return Default window.
-     */
-    public <E> GridStreamerWindow<E> window();
-
-    /**
-     * Gets streamer event window by window name, if no window with such
-     * name was configured {@link IllegalArgumentException} will be thrown.
-     *
-     * @param winName Window name.
-     * @return Window instance.
-     */
-    public <E> GridStreamerWindow<E> window(String winName);
-
-    /**
-     * For context passed to {@link GridStreamerStage#run(GridStreamerContext, Collection)} this method will
-     * return next stage name in execution pipeline. For context obtained from streamer object, this method will
-     * return first stage name.
-     *
-     * @return Next stage name depending on invocation context.
-     */
-    public String nextStageName();
-
-    /**
-     * Queries all streamer nodes deployed within grid. Given closure will be executed on each node on which streamer
-     * is configured. Streamer context local for that node will be passed to closure during execution. All results
-     * returned by closure will be added to result collection.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @return Result received from all streamers.
-     * @throws GridException If query execution failed.
-     */
-    public <R> Collection<R> query(IgniteClosure<GridStreamerContext, R> clo) throws GridException;
-
-    /**
-     * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes
-     * on which streamer is configured. Streamer context local for that node will be passed to closure during
-     * execution. All results returned by closure will be added to result collection.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on
-     *      which this streamer is running will be queried.
-     * @return Result received from all streamers.
-     * @throws GridException If query execution failed.
-     */
-    public <R> Collection<R> query(IgniteClosure<GridStreamerContext, R> clo, Collection<ClusterNode> nodes)
-        throws GridException;
-
-    /**
-     * Queries all streamer nodes deployed within grid. Given closure will be executed on each streamer node
-     * in the grid. No result is collected.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @throws GridException If closure execution failed.
-     */
-    public void broadcast(IgniteInClosure<GridStreamerContext> clo) throws GridException;
-
-    /**
-     * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes on
-     * which streamer is configured. No result is collected.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on
-     *      which this streamer is running will be queried.
-     * @throws GridException If closure execution failed.
-     */
-    public void broadcast(IgniteInClosure<GridStreamerContext> clo, Collection<ClusterNode> nodes) throws GridException;
-
-    /**
-     * Queries all streamer nodes deployed within grid. Given closure will be executed on each streamer node in
-     * the grid. Streamer context local for that node will be passed to closure during execution. Results returned
-     * by closure will be passed to given reducer.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @param rdc Reducer to reduce results received from remote nodes.
-     * @return Reducer result.
-     * @throws GridException If query execution failed.
-     */
-    public <R1, R2> R2 reduce(IgniteClosure<GridStreamerContext, R1> clo, IgniteReducer<R1, R2> rdc) throws GridException;
-
-    /**
-     * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes on which
-     * streamer is configured. Streamer context local for that node will be passed to closure during execution.
-     * Results returned by closure will be passed to given reducer.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @param rdc Reducer to reduce results received from remote nodes.
-     * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on
-     *      which this streamer is running will be queried.
-     * @return Reducer result.
-     * @throws GridException If query execution failed.
-     */
-    public <R1, R2> R2 reduce(IgniteClosure<GridStreamerContext, R1> clo, IgniteReducer<R1, R2> rdc,
-        Collection<ClusterNode> nodes) throws GridException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerEventRouter.java
deleted file mode 100644
index 6eaf5c9..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerEventRouter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.apache.ignite.cluster.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Streamer event router. Pluggable component that determines event execution flow across the grid.
- * Each time a group of events is submitted to streamer or returned to streamer by a stage, event
- * router will be used to select execution node for next stage.
- */
-public interface GridStreamerEventRouter {
-    /**
-     * Selects a node for given event that should be processed by a stage with given name.
-     *
-     * @param ctx Streamer context.
-     * @param stageName Stage name.
-     * @param evt Event to route.
-     * @return Node to route to. If this method returns {@code null} then the whole pipeline execution
-     *      will be terminated. All running and ongoing stages for pipeline execution will be
-     *      cancelled.
-     */
-    @Nullable public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt);
-
-    /**
-     * Selects a node for given events that should be processed by a stage with given name.
-     *
-     * @param ctx Streamer context.
-     * @param stageName Stage name to route events.
-     * @param evts Events.
-     * @return Events to node mapping. If this method returns {@code null} then the whole pipeline execution
-     *      will be terminated. All running and ongoing stages for pipeline execution will be
-     *      cancelled.
-     */
-    @Nullable public <T> Map<ClusterNode, Collection<T>> route(GridStreamerContext ctx, String stageName,
-        Collection<T> evts);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerEventRouterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerEventRouterAdapter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerEventRouterAdapter.java
deleted file mode 100644
index 0462980..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerEventRouterAdapter.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.apache.ignite.cluster.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.typedef.*;
-
-import java.util.*;
-
-/**
- * Streamer adapter for event routers.
- */
-public abstract class GridStreamerEventRouterAdapter implements GridStreamerEventRouter {
-    /** {@inheritDoc} */
-    @Override public <T> Map<ClusterNode, Collection<T>> route(GridStreamerContext ctx, String stageName,
-        Collection<T> evts) {
-        if (evts.size() == 1) {
-            ClusterNode route = route(ctx, stageName, F.first(evts));
-
-            if (route == null)
-                return null;
-
-            return Collections.singletonMap(route, evts);
-        }
-
-        Map<ClusterNode, Collection<T>> map = new GridLeanMap<>();
-
-        for (T e : evts) {
-            ClusterNode n = route(ctx, stageName, e);
-
-            if (n == null)
-                return null;
-
-            Collection<T> mapped = map.get(n);
-
-            if (mapped == null)
-                map.put(n, mapped = new ArrayList<>());
-
-            mapped.add(e);
-        }
-
-        return map;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerFailureListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerFailureListener.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerFailureListener.java
deleted file mode 100644
index a658407..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerFailureListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import java.util.*;
-
-/**
- * Streamer failure listener. Asynchronous callback passed to user in case of any failure determined by streamer.
- *
- * @see org.apache.ignite.IgniteStreamer#addStreamerFailureListener(GridStreamerFailureListener)
- *
- */
-public interface GridStreamerFailureListener {
-    /**
-     * Callback invoked when unrecoverable failure is detected by streamer.
-     * <p>
-     * If {@link GridStreamerConfiguration#isAtLeastOnce()} is set to {@code false}, then this callback
-     * will be invoked on node on which failure occurred. If {@link GridStreamerConfiguration#isAtLeastOnce()}
-     * is set to {@code true}, then this callback will be invoked on node on which
-     * {@link org.apache.ignite.IgniteStreamer#addEvents(Collection)} or its variant was called. Callback will be called if maximum
-     * number of failover attempts exceeded or failover cannot be performed (for example, if router
-     * returned {@code null}).
-     *
-     * @param stageName Failed stage name.
-     * @param evts Failed set of events.
-     * @param err Error cause.
-     */
-    public void onFailure(String stageName, Collection<Object> evts, Throwable err);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerMBean.java
deleted file mode 100644
index e9d2515..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerMBean.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.apache.ignite.mbean.*;
-import org.jetbrains.annotations.*;
-
-/**
- * Streamer MBean interface.
- */
-@IgniteMBeanDescription("MBean that provides access to streamer description and metrics.")
-public interface GridStreamerMBean {
-    /**
-     * Gets streamer name.
-     *
-     * @return Streamer name.
-     */
-    @IgniteMBeanDescription("Streamer name.")
-    @Nullable public String getName();
-
-    /**
-     * Gets {@code atLeastOnce} configuration flag.
-     *
-     * @return {@code True} if {@code atLeastOnce} is configured.
-     */
-    @IgniteMBeanDescription("True if atLeastOnce is configured.")
-    public boolean isAtLeastOnce();
-
-    /**
-     * Gets size of stage futures map. This map is maintained only when {@code atLeastOnce} configuration
-     * flag is set to true.
-     *
-     * @return Stage future map size.
-     */
-    @IgniteMBeanDescription("Stage future map size.")
-    public int getStageFutureMapSize();
-
-    /**
-     * Gets size of batch futures map.
-     *
-     * @return Batch future map size.
-     */
-    @IgniteMBeanDescription("Batch future map size.")
-    public int getBatchFutureMapSize();
-
-    /**
-     * Gets number of stages currently being executed in streamer pool.
-     *
-     * @return Number of stages. Cannot be more than pool thread count.
-     */
-    @IgniteMBeanDescription("Number of stages currently being executed in streamer pool.")
-    public int getStageActiveExecutionCount();
-
-    /**
-     * Gets number of event batches currently waiting to be executed.
-     *
-     * @return Number of event batches waiting to be processed.
-     */
-    @IgniteMBeanDescription("Number of event batches currently waiting to be executed.")
-    public int getStageWaitingExecutionCount();
-
-    /**
-     * Gets total number of stages executed since last reset.
-     *
-     * @return Total number of stages executed since last reset.
-     */
-    @IgniteMBeanDescription("Total number of stages executed since last reset.")
-    public long getStageTotalExecutionCount();
-
-    /**
-     * Gets pipeline maximum execution time, i.e. time between execution start and time when last stage in pipeline
-     * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
-     * recorded independently.
-     *
-     * @return Pipeline maximum execution time.
-     */
-    @IgniteMBeanDescription("Pipeline maximum execution time.")
-    public long getPipelineMaximumExecutionTime();
-
-    /**
-     * Gets pipeline minimum execution time, i.e. time between execution start and time when last stage in pipeline
-     * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
-     * recorded independently.
-     *
-     * @return Pipeline minimum execution time.
-     */
-    @IgniteMBeanDescription("Pipeline minimum execution time.")
-    public long getPipelineMinimumExecutionTime();
-
-    /**
-     * Gets pipeline average execution time, i.e. time between execution start and time when last stage in pipeline
-     * returned empty map. If pipeline execution was split, metrics for each split will be recorded independently.
-     *
-     * @return Pipeline average execution time.
-     */
-    @IgniteMBeanDescription("Pipeline average execution time.")
-    public long getPipelineAverageExecutionTime();
-
-    /**
-     * Gets maximum number of unique nodes participated in pipeline execution. If pipeline execution was split,
-     * metrics for each split will be recorded independently.
-     *
-     * @return Maximum number of unique nodes in pipeline execution.
-     */
-    @IgniteMBeanDescription("Maximum number of unique nodes participated in pipeline execution.")
-    public int getPipelineMaximumExecutionNodes();
-
-    /**
-     * Gets minimum number of unique nodes participated in pipeline execution. If pipeline execution was split,
-     * metrics for each split will be recorded independently.
-     *
-     * @return Minimum number of unique nodes in pipeline execution.
-     */
-    @IgniteMBeanDescription("Minimum number of unique nodes participated in pipeline execution.")
-    public int getPipelineMinimumExecutionNodes();
-
-    /**
-     * Gets average number of unique nodes participated in pipeline execution. If pipeline execution was split,
-     * metrics for each split will be recorded independently.
-     *
-     * @return Average number of unique nodes in pipeline execution.
-     */
-    @IgniteMBeanDescription("Average number of unique nodes participated in pipeline execution.")
-    public int getPipelineAverageExecutionNodes();
-
-    /**
-     * Gets number of current active sessions. Since event execution sessions are tracked only when
-     * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
-     * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
-     *
-     * @return Number of current active sessions.
-     */
-    @IgniteMBeanDescription("Number of current active sessions.")
-    public int getCurrentActiveSessions();
-
-    /**
-     * Gets maximum number of active sessions since last reset. Since event execution sessions are tracked only when
-     * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
-     * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
-     *
-     * @return Maximum active sessions since last reset.
-     */
-    @IgniteMBeanDescription("Maximum number of active sessions since last reset.")
-    public int getMaximumActiveSessions();
-
-    /**
-     * Gets number of failures since last reset. If {@code atLeastOnce} flag is set to steamer configuration,
-     * then only root node failures will be counted. Otherwise each node will count failures independently.
-     *
-     * @return Failures count.
-     */
-    @IgniteMBeanDescription("Number of failures since last reset.")
-    public int getFailuresCount();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerMetrics.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerMetrics.java
deleted file mode 100644
index b553521..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerMetrics.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Streamer metrics.
- */
-public interface GridStreamerMetrics {
-    /**
-     * Gets number of stages currently being executed in streamer pool.
-     *
-     * @return Number of stages. Cannot be more than pool thread count.
-     */
-    public int stageActiveExecutionCount();
-
-    /**
-     * Gets number of event batches currently waiting to be executed.
-     *
-     * @return Number of event batches waiting to be processed.
-     */
-    public int stageWaitingExecutionCount();
-
-    /**
-     * Gets total number of stages executed since last reset.
-     *
-     * @return Total number of stages executed since last reset.
-     */
-    public long stageTotalExecutionCount();
-
-    /**
-     * Gets pipeline maximum execution time, i.e. time between execution start and time when last stage in pipeline
-     * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
-     * recorded independently.
-     *
-     * @return Pipeline maximum execution time.
-     */
-    public long pipelineMaximumExecutionTime();
-
-    /**
-     * Gets pipeline minimum execution time, i.e. time between execution start and time when last stage in pipeline
-     * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
-     * recorded independently.
-     *
-     * @return Pipeline minimum execution time.
-     */
-    public long pipelineMinimumExecutionTime();
-
-    /**
-     * Gets pipeline average execution time, i.e. time between execution start and time when last stage in pipeline
-     * returned empty map. If pipeline execution was split, metrics for each split will be recorded independently.
-     *
-     * @return Pipeline average execution time.
-     */
-    public long pipelineAverageExecutionTime();
-
-    /**
-     * Gets maximum number of unique nodes participated in pipeline execution. If pipeline execution was split,
-     * metrics for each split will be recorded independently.
-     *
-     * @return Maximum number of unique nodes in pipeline execution.
-     */
-    public int pipelineMaximumExecutionNodes();
-
-    /**
-     * Gets minimum number of unique nodes participated in pipeline execution. If pipeline execution was split,
-     * metrics for each split will be recorded independently.
-     *
-     * @return Minimum number of unique nodes in pipeline execution.
-     */
-    public int pipelineMinimumExecutionNodes();
-
-    /**
-     * Gets average number of unique nodes participated in pipeline execution. If pipeline execution was split,
-     * metrics for each split will be recorded independently.
-     *
-     * @return Average number of unique nodes in pipeline execution.
-     */
-    public int pipelineAverageExecutionNodes();
-
-    /**
-     * Gets query maximum execution time. If query execution was split to different nodes, metrics for each split
-     * will be recorded independently.
-     *
-     * @return Query maximum execution time.
-     */
-    public long queryMaximumExecutionTime();
-
-    /**
-     * Gets query minimum execution time. If query execution was split to different nodes, metrics for each split
-     * will be recorded independently.
-     *
-     * @return Query minimum execution time.
-     */
-    public long queryMinimumExecutionTime();
-
-    /**
-     * Gets query average execution time. If query execution was split to different nodes, metrics for each split
-     * will be recorded independently.
-     *
-     * @return Query average execution time.
-     */
-    public long queryAverageExecutionTime();
-
-    /**
-     * Gets maximum number of unique nodes participated in query execution. If query execution was split,
-     * metrics for each split will be recorded independently.
-     *
-     * @return Maximum number of unique nodes in query execution.
-     */
-    public int queryMaximumExecutionNodes();
-
-    /**
-     * Gets minimum number of unique nodes participated in query execution. If query execution was split,
-     * metrics for each split will be recorded independently.
-     *
-     * @return Minimum number of unique nodes in query execution.
-     */
-    public int queryMinimumExecutionNodes();
-
-    /**
-     * Gets average number of unique nodes participated in query execution. If query execution was split,
-     * metrics for each split will be recorded independently.
-     *
-     * @return Average number of unique nodes in query execution.
-     */
-    public int queryAverageExecutionNodes();
-
-    /**
-     * Gets number of current active sessions. Since event execution sessions are tracked only when
-     * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
-     * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
-     *
-     * @return Number of current active sessions.
-     */
-    public int currentActiveSessions();
-
-    /**
-     * Gets maximum number of active sessions since last reset. Since event execution sessions are tracked only when
-     * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
-     * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
-     *
-     * @return Maximum active sessions since last reset.
-     */
-    public int maximumActiveSessions();
-
-    /**
-     * Gets number of failures. If {@code atLeastOnce} flag is set to steamer configuration, then only root node
-     * failures will be counted. Otherwise each node will count failures independently.
-     *
-     * @return Failures count.
-     */
-    public int failuresCount();
-
-    /**
-     * Gets maximum number of threads in executor service.
-     *
-     * @return Maximum number of threads in executor service.
-     */
-    public int executorServiceCapacity();
-
-    /**
-     * Gets current stage metrics, if stage with given name is not configured
-     * then {@link IllegalArgumentException} will be thrown.
-     *
-     * @param stageName Stage name.
-     * @return Stage metrics.
-     */
-    public GridStreamerStageMetrics stageMetrics(String stageName);
-
-    /**
-     * Gets metrics for all stages. Stage metrics order is the same as stage order in configuration.
-     *
-     * @return Stage metrics collection.
-     */
-    public Collection<GridStreamerStageMetrics> stageMetrics();
-
-    /**
-     * Gets current window metrics, if window with given name is not configured
-     * then {@link IllegalArgumentException} will be thrown.
-     *
-     * @param winName Window name.
-     * @return Window metrics.
-     */
-    public GridStreamerWindowMetrics windowMetrics(String winName);
-
-    /**
-     * Gets metrics for all windows.
-     *
-     * @return Collection of window metrics.
-     */
-    public Collection<GridStreamerWindowMetrics> windowMetrics();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java
index 795cab6..b994f4e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java
@@ -18,12 +18,12 @@ import java.util.*;
  * Streamer stage is a component that determines event processing flow. User logic related to
  * any particular event processing is implemented by streamer stage. A stage takes events as
  * an input and returns groups of events mapped to different stages as an output. Events for
- * every returned stage will be passed to {@link GridStreamerEventRouter} which will determine
+ * every returned stage will be passed to {@link StreamerEventRouter} which will determine
  * on which node the stage should be executed.
  * <p>
  * Generally, event stage execution graph if fully controlled by return values of
  * this method, while node execution graph is controlled by
- * {@link GridStreamerEventRouter#route(GridStreamerContext, String, Object)} method.
+ * {@link StreamerEventRouter#route(StreamerContext, String, Object)} method.
  */
 public interface GridStreamerStage<IN> {
     /**
@@ -36,18 +36,18 @@ public interface GridStreamerStage<IN> {
     /**
      * Stage execution routine. After the passed in events are processed, stage can emit
      * another set of events to be processed. The returned events can be mapped to different
-     * stages. Events for every returned stage will be passed to {@link GridStreamerEventRouter}
+     * stages. Events for every returned stage will be passed to {@link StreamerEventRouter}
      * which will determine on which node the stage should be executed.
      * <p>
      * Generally, event stage execution graph if fully controlled by return values of
      * this method, while node execution graph is controlled by
-     * {@link GridStreamerEventRouter#route(GridStreamerContext, String, Object)} method.
+     * {@link StreamerEventRouter#route(StreamerContext, String, Object)} method.
      *
      * @param ctx Streamer context.
      * @param evts Input events.
      * @return Map of stage name to collection of events.
      * @throws GridException If failed.
      */
-    @Nullable public Map<String, Collection<?>> run(GridStreamerContext ctx, Collection<IN> evts)
+    @Nullable public Map<String, Collection<?>> run(StreamerContext ctx, Collection<IN> evts)
         throws GridException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindow.java
index fe6ffde..461d618 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindow.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindow.java
@@ -33,7 +33,7 @@ import java.util.*;
  * <li>{@link GridStreamerBoundedTimeBatchWindow}</li>
  * </ul>
  * <p>
- * Streamer window is configured via {@link GridStreamerConfiguration#getWindows()} method.
+ * Streamer window is configured via {@link StreamerConfiguration#getWindows()} method.
  */
 public interface GridStreamerWindow<E> extends Iterable<E> {
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerConfiguration.java
new file mode 100644
index 0000000..2c7fe45
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerConfiguration.java
@@ -0,0 +1,267 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.tostring.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Streamer configuration.
+ */
+public class StreamerConfiguration {
+    /** By default maximum number of concurrent sessions is unlimited. */
+    public static final int DFLT_MAX_CONCURRENT_SESSIONS = -1;
+
+    /** Default value for maximum failover attempts. */
+    public static final int DFLT_MAX_FAILOVER_ATTEMPTS = 3;
+
+    /** Name. */
+    private String name;
+
+    /** Window. */
+    private Collection<GridStreamerWindow> win;
+
+    /** Router. */
+    private StreamerEventRouter router;
+
+    /** Stages. */
+    @GridToStringInclude
+    private Collection<GridStreamerStage> stages;
+
+    /** At least once flag. */
+    private boolean atLeastOnce;
+
+    /** Maximum number of failover attempts. */
+    private int maxFailoverAttempts = DFLT_MAX_FAILOVER_ATTEMPTS;
+
+    /** Maximum number of concurrent sessions to be processed. */
+    private int maxConcurrentSessions = DFLT_MAX_CONCURRENT_SESSIONS;
+
+    /** Streamer executor service. */
+    private ExecutorService execSvc;
+
+    /** Executor service shutdown flag. */
+    private boolean execSvcShutdown;
+
+    /**
+     *
+     */
+    public StreamerConfiguration() {
+        // No-op.
+    }
+
+    /**
+     * @param c Configuration to copy.
+     */
+    public StreamerConfiguration(StreamerConfiguration c) {
+        atLeastOnce = c.isAtLeastOnce();
+        execSvc = c.getExecutorService();
+        execSvcShutdown = c.isExecutorServiceShutdown();
+        maxConcurrentSessions = c.getMaximumConcurrentSessions();
+        maxFailoverAttempts = c.getMaximumFailoverAttempts();
+        name = c.getName();
+        router = c.getRouter();
+        stages = c.getStages();
+        win = c.getWindows();
+    }
+
+    /**
+     * Gets streamer name. Must be unique within grid.
+     *
+     * @return Streamer name, if {@code null} then default streamer is returned.
+     */
+    @Nullable public String getName() {
+        return name;
+    }
+
+    /**
+     * Sets the name of the streamer.
+     *
+     * @param name Name.
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Gets streamer event router.
+     *
+     * @return Event router, if {@code null} then events will be executed locally.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable public StreamerEventRouter getRouter() {
+        return router;
+    }
+
+    /**
+     * Sets router for streamer.
+     *
+     * @param router Router.
+     */
+    public void setRouter(StreamerEventRouter router) {
+        this.router = router;
+    }
+
+    /**
+     * Gets collection of streamer event windows. At least one window should be configured. Each window
+     * must have unique name.
+     *
+     * @return Streamer windows.
+     */
+    public Collection<GridStreamerWindow> getWindows() {
+        return win;
+    }
+
+    /**
+     * Sets collection of streamer windows.
+     *
+     * @param win Window.
+     */
+    public void setWindows(Collection<GridStreamerWindow> win) {
+        this.win = win;
+    }
+
+    /**
+     * Gets collection of streamer stages. Streamer must have at least one stage to execute. Each stage
+     * must have unique name.
+     *
+     * @return Collection of streamer stages.
+     */
+    public Collection<GridStreamerStage> getStages() {
+        return stages;
+    }
+
+    /**
+     * Sets stages.
+     *
+     * @param stages Stages.
+     */
+    public void setStages(Collection<GridStreamerStage> stages) {
+        this.stages = stages;
+    }
+
+    /**
+     * Gets flag indicating whether streamer should track event execution sessions and failover event execution
+     * if any failure detected or any node on which execution happened has left the grid before successful response
+     * is received.
+     * <p>
+     * Setting this flag to {@code true} will guarantee that all pipeline stages will be executed at least once for
+     * each group of event submitted to streamer (or failure listener will be notified if failover cannot succeed).
+     * However, it does not guarantee that each stage will be executed at most once.
+     *
+     * @return {@code True} if event should be processed at least once,
+     *      or {@code false} if failures can be safely ignored.
+     */
+    public boolean isAtLeastOnce() {
+        return atLeastOnce;
+    }
+
+    /**
+     * @param atLeastOnce {@code True} to guarantee that event will be processed at least once.
+     */
+    public void setAtLeastOnce(boolean atLeastOnce) {
+        this.atLeastOnce = atLeastOnce;
+    }
+
+    /**
+     * Gets maximum number of failover attempts to try when pipeline execution has failed. This parameter
+     * is ignored if {@link #isAtLeastOnce()} is set to {@code false}.
+     * <p>
+     * If not set, default value is
+     *
+     * @return Maximum number of failover attempts to try.
+     */
+    public int getMaximumFailoverAttempts() {
+        return maxFailoverAttempts;
+    }
+
+    /**
+     * Sets maximum number of failover attempts.
+
+     * @param maxFailoverAttempts Maximum number of failover attempts.
+     * @see #getMaximumFailoverAttempts()
+     */
+    public void setMaximumFailoverAttempts(int maxFailoverAttempts) {
+        this.maxFailoverAttempts = maxFailoverAttempts;
+    }
+
+    /**
+     * Gets maximum number of concurrent events to be processed by streamer. This property is taken into
+     * account when {@link #isAtLeastOnce()} is set to {@code true}. If not positive, number of sessions
+     * will not be limited by any value.
+     *
+     * @return Maximum number of concurrent events to be processed. If number of concurrent events is greater
+     *      then this value, caller will be blocked until enough responses are received.
+     */
+    public int getMaximumConcurrentSessions() {
+        return maxConcurrentSessions;
+    }
+
+    /**
+     * Sets maximum number of concurrent sessions.
+     *
+     * @param maxConcurrentSessions Maximum number of concurrent sessions.
+     * @see #getMaximumConcurrentSessions()
+     */
+    public void setMaximumConcurrentSessions(int maxConcurrentSessions) {
+        this.maxConcurrentSessions = maxConcurrentSessions;
+    }
+
+    /**
+     * Gets streamer executor service. Defines a thread pool in which streamer stages will be executed.
+     * <p>
+     * If not specified, thread pool executor with max pool size equal to number of cores will be created.
+     *
+     * @return Streamer executor service.
+     */
+    public ExecutorService getExecutorService() {
+        return execSvc;
+    }
+
+    /**
+     * Sets streamer executor service.
+     *
+     * @param execSvc Executor service to use.
+     * @see #getExecutorService()
+     */
+    public void setExecutorService(ExecutorService execSvc) {
+        this.execSvc = execSvc;
+    }
+
+    /**
+     * Flag indicating whether streamer executor service should be shut down on GridGain stop. If this flag
+     * is {@code true}, executor service will be shut down regardless of whether executor was specified externally
+     * or it was created by GridGain.
+     *
+     * @return {@code True} if executor service should be shut down on GridGain stop.
+     */
+    public boolean isExecutorServiceShutdown() {
+        return execSvcShutdown;
+    }
+
+    /**
+     * Sets flag indicating whether executor service should be shut down on GridGain stop.
+     *
+     * @param execSvcShutdown {@code True} if executor service should be shut down on GridGain stop.
+     * @see #isExecutorServiceShutdown()
+     */
+    public void setExecutorServiceShutdown(boolean execSvcShutdown) {
+        this.execSvcShutdown = execSvcShutdown;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StreamerConfiguration.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerContext.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerContext.java
new file mode 100644
index 0000000..f8828a3
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerContext.java
@@ -0,0 +1,134 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Streamer context. Provides access to streamer local store, configured windows and various methods
+ * to run streamer queries.
+ */
+public interface StreamerContext {
+    /**
+     * Gets instance of dynamic grid projection including all nodes on which this streamer is running.
+     *
+     * @return Projection with all nodes on which streamer is configured.
+     */
+    public ClusterGroup projection();
+
+    /**
+     * Gets streamer local space. Note that all updates to this space will be local.
+     *
+     * @return Streamer local space.
+     */
+    public <K, V> ConcurrentMap<K, V> localSpace();
+
+    /**
+     * Gets default event window, i.e. window that is on the first place in streamer configuration.
+     *
+     * @return Default window.
+     */
+    public <E> GridStreamerWindow<E> window();
+
+    /**
+     * Gets streamer event window by window name, if no window with such
+     * name was configured {@link IllegalArgumentException} will be thrown.
+     *
+     * @param winName Window name.
+     * @return Window instance.
+     */
+    public <E> GridStreamerWindow<E> window(String winName);
+
+    /**
+     * For context passed to {@link GridStreamerStage#run(StreamerContext, Collection)} this method will
+     * return next stage name in execution pipeline. For context obtained from streamer object, this method will
+     * return first stage name.
+     *
+     * @return Next stage name depending on invocation context.
+     */
+    public String nextStageName();
+
+    /**
+     * Queries all streamer nodes deployed within grid. Given closure will be executed on each node on which streamer
+     * is configured. Streamer context local for that node will be passed to closure during execution. All results
+     * returned by closure will be added to result collection.
+     *
+     * @param clo Function to be executed on individual nodes.
+     * @return Result received from all streamers.
+     * @throws GridException If query execution failed.
+     */
+    public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo) throws GridException;
+
+    /**
+     * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes
+     * on which streamer is configured. Streamer context local for that node will be passed to closure during
+     * execution. All results returned by closure will be added to result collection.
+     *
+     * @param clo Function to be executed on individual nodes.
+     * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on
+     *      which this streamer is running will be queried.
+     * @return Result received from all streamers.
+     * @throws GridException If query execution failed.
+     */
+    public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo, Collection<ClusterNode> nodes)
+        throws GridException;
+
+    /**
+     * Queries all streamer nodes deployed within grid. Given closure will be executed on each streamer node
+     * in the grid. No result is collected.
+     *
+     * @param clo Function to be executed on individual nodes.
+     * @throws GridException If closure execution failed.
+     */
+    public void broadcast(IgniteInClosure<StreamerContext> clo) throws GridException;
+
+    /**
+     * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes on
+     * which streamer is configured. No result is collected.
+     *
+     * @param clo Function to be executed on individual nodes.
+     * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on
+     *      which this streamer is running will be queried.
+     * @throws GridException If closure execution failed.
+     */
+    public void broadcast(IgniteInClosure<StreamerContext> clo, Collection<ClusterNode> nodes) throws GridException;
+
+    /**
+     * Queries all streamer nodes deployed within grid. Given closure will be executed on each streamer node in
+     * the grid. Streamer context local for that node will be passed to closure during execution. Results returned
+     * by closure will be passed to given reducer.
+     *
+     * @param clo Function to be executed on individual nodes.
+     * @param rdc Reducer to reduce results received from remote nodes.
+     * @return Reducer result.
+     * @throws GridException If query execution failed.
+     */
+    public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc) throws GridException;
+
+    /**
+     * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes on which
+     * streamer is configured. Streamer context local for that node will be passed to closure during execution.
+     * Results returned by closure will be passed to given reducer.
+     *
+     * @param clo Function to be executed on individual nodes.
+     * @param rdc Reducer to reduce results received from remote nodes.
+     * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on
+     *      which this streamer is running will be queried.
+     * @return Reducer result.
+     * @throws GridException If query execution failed.
+     */
+    public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc,
+        Collection<ClusterNode> nodes) throws GridException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerEventRouter.java
new file mode 100644
index 0000000..ba748f6
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerEventRouter.java
@@ -0,0 +1,47 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import org.apache.ignite.cluster.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Streamer event router. Pluggable component that determines event execution flow across the grid.
+ * Each time a group of events is submitted to streamer or returned to streamer by a stage, event
+ * router will be used to select execution node for next stage.
+ */
+public interface StreamerEventRouter {
+    /**
+     * Selects a node for given event that should be processed by a stage with given name.
+     *
+     * @param ctx Streamer context.
+     * @param stageName Stage name.
+     * @param evt Event to route.
+     * @return Node to route to. If this method returns {@code null} then the whole pipeline execution
+     *      will be terminated. All running and ongoing stages for pipeline execution will be
+     *      cancelled.
+     */
+    @Nullable public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt);
+
+    /**
+     * Selects a node for given events that should be processed by a stage with given name.
+     *
+     * @param ctx Streamer context.
+     * @param stageName Stage name to route events.
+     * @param evts Events.
+     * @return Events to node mapping. If this method returns {@code null} then the whole pipeline execution
+     *      will be terminated. All running and ongoing stages for pipeline execution will be
+     *      cancelled.
+     */
+    @Nullable public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName,
+        Collection<T> evts);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerEventRouterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerEventRouterAdapter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerEventRouterAdapter.java
new file mode 100644
index 0000000..aee0eaa
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerEventRouterAdapter.java
@@ -0,0 +1,52 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import org.apache.ignite.cluster.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+
+import java.util.*;
+
+/**
+ * Streamer adapter for event routers.
+ */
+public abstract class StreamerEventRouterAdapter implements StreamerEventRouter {
+    /** {@inheritDoc} */
+    @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName,
+        Collection<T> evts) {
+        if (evts.size() == 1) {
+            ClusterNode route = route(ctx, stageName, F.first(evts));
+
+            if (route == null)
+                return null;
+
+            return Collections.singletonMap(route, evts);
+        }
+
+        Map<ClusterNode, Collection<T>> map = new GridLeanMap<>();
+
+        for (T e : evts) {
+            ClusterNode n = route(ctx, stageName, e);
+
+            if (n == null)
+                return null;
+
+            Collection<T> mapped = map.get(n);
+
+            if (mapped == null)
+                map.put(n, mapped = new ArrayList<>());
+
+            mapped.add(e);
+        }
+
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerFailureListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerFailureListener.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerFailureListener.java
new file mode 100644
index 0000000..da34702
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerFailureListener.java
@@ -0,0 +1,36 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import java.util.*;
+
+/**
+ * Streamer failure listener. Asynchronous callback passed to user in case of any failure determined by streamer.
+ *
+ * @see org.apache.ignite.IgniteStreamer#addStreamerFailureListener(StreamerFailureListener)
+ *
+ */
+public interface StreamerFailureListener {
+    /**
+     * Callback invoked when unrecoverable failure is detected by streamer.
+     * <p>
+     * If {@link StreamerConfiguration#isAtLeastOnce()} is set to {@code false}, then this callback
+     * will be invoked on node on which failure occurred. If {@link StreamerConfiguration#isAtLeastOnce()}
+     * is set to {@code true}, then this callback will be invoked on node on which
+     * {@link org.apache.ignite.IgniteStreamer#addEvents(Collection)} or its variant was called. Callback will be called if maximum
+     * number of failover attempts exceeded or failover cannot be performed (for example, if router
+     * returned {@code null}).
+     *
+     * @param stageName Failed stage name.
+     * @param evts Failed set of events.
+     * @param err Error cause.
+     */
+    public void onFailure(String stageName, Collection<Object> evts, Throwable err);
+}


Mime
View raw message