ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [6/9] incubator-ignite git commit: sp-2 streaming cleanup
Date Thu, 19 Mar 2015 16:50:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsAdapter.java
deleted file mode 100644
index 2739ad1..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsAdapter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.streamer.*;
-
-/**
- * Streamer window metrics adapter.
- */
-public class StreamerWindowMetricsAdapter implements StreamerWindowMetrics {
-    /** Window name. */
-    private String name;
-
-    /** Window size. */
-    private int size;
-
-    /** Window eviction queue size. */
-    private int evictionQueueSize;
-
-    /**
-     * @param m Metrics to copy.
-     */
-    public StreamerWindowMetricsAdapter(StreamerWindowMetrics m) {
-        // Preserve alphabetic order for maintenance.
-        evictionQueueSize = m.evictionQueueSize();
-        name = m.name();
-        size = m.size();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return size;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int evictionQueueSize() {
-        return evictionQueueSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(StreamerWindowMetricsAdapter.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsHolder.java
deleted file mode 100644
index 44e7e90..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsHolder.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.streamer.*;
-
-/**
- * Streamer window metrics holder.
- */
-public class StreamerWindowMetricsHolder implements StreamerWindowMetrics {
-    /** Window instance. */
-    private StreamerWindow window;
-
-    /**
-     * @param window Streamer window.
-     */
-    public StreamerWindowMetricsHolder(StreamerWindow window) {
-        this.window = window;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return window.name();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return window.size();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int evictionQueueSize() {
-        return window.evictionQueueSize();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/package-info.java
deleted file mode 100644
index c537a3b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <img alt="icon" class="javadocimg" src="{@docRoot}/img/cube.gif"/>
- * TODO.
- */
-package org.apache.ignite.internal.processors.streamer;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java
deleted file mode 100644
index 8cb7133..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer.task;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.processors.closure.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Streamer broadcast task.
- */
-public class GridStreamerBroadcastTask extends GridPeerDeployAwareTaskAdapter<Void, Void> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Closure. */
-    private IgniteInClosure<StreamerContext> clo;
-
-    /** Streamer. */
-    private String streamer;
-
-    /**
-     * @param clo Closure.
-     * @param streamer Streamer.
-     */
-    public GridStreamerBroadcastTask(IgniteInClosure<StreamerContext> clo, @Nullable String streamer) {
-        super(U.peerDeployAware(clo));
-
-        this.clo = clo;
-        this.streamer = streamer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
-        Map<ComputeJob, ClusterNode> res = U.newHashMap(subgrid.size());
-
-        for (ClusterNode node : subgrid)
-            res.put(new StreamerBroadcastJob(clo, streamer), node);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Void reduce(List<ComputeJobResult> results) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
-        // No failover.
-        if (res.getException() != null)
-            throw res.getException();
-
-        return ComputeJobResultPolicy.WAIT;
-    }
-
-    /**
-     * Streamer broadcast job.
-     */
-    private static class StreamerBroadcastJob extends ComputeJobAdapter implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Injected grid. */
-        @IgniteInstanceResource
-        private Ignite g;
-
-        /** Closure. */
-        private IgniteInClosure<StreamerContext> clo;
-
-        /** Streamer. */
-        private String streamer;
-
-        /**
-         * Empty constructor required by {@link Externalizable}.
-         */
-        public StreamerBroadcastJob() {
-            // No-op.
-        }
-
-        /**
-         * @param clo Closure.
-         * @param streamer Streamer.
-         */
-        private StreamerBroadcastJob(IgniteInClosure<StreamerContext> clo, String streamer) {
-            this.clo = clo;
-            this.streamer = streamer;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object execute() {
-            IgniteStreamer s = g.streamer(streamer);
-
-            assert s != null;
-
-            clo.apply(s.context());
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeObject(clo);
-            U.writeString(out, streamer);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            clo = (IgniteInClosure<StreamerContext>)in.readObject();
-            streamer = U.readString(in);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java
deleted file mode 100644
index 38ed703..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer.task;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.processors.closure.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Streamer query task.
- */
-public class GridStreamerQueryTask<R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Query closure. */
-    private IgniteClosure<StreamerContext, R> qryClos;
-
-    /** Streamer. */
-    private String streamer;
-
-    /**
-     * @param qryClos Query closure.
-     * @param streamer Streamer.
-     */
-    public GridStreamerQueryTask(IgniteClosure<StreamerContext, R> qryClos, @Nullable String streamer) {
-        super(U.peerDeployAware(qryClos));
-
-        this.qryClos = qryClos;
-        this.streamer = streamer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
-        Map<ComputeJob, ClusterNode> res = U.newHashMap(subgrid.size());
-
-        for (ClusterNode node : subgrid)
-            res.put(new QueryJob<>(qryClos, streamer), node);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<R> reduce(List<ComputeJobResult> results) {
-        Collection<R> res = new ArrayList<>(results.size());
-
-        for (ComputeJobResult jobRes : results)
-            res.add(jobRes.<R>getData());
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
-        // No failover for this task.
-        if (res.getException() != null)
-            throw res.getException();
-
-        return ComputeJobResultPolicy.WAIT;
-    }
-
-    /**
-     * Query job.
-     */
-    private static class QueryJob<R> extends ComputeJobAdapter implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Injected grid. */
-        @IgniteInstanceResource
-        private Ignite g;
-
-        /** Query closure. */
-        private IgniteClosure<StreamerContext, R> qryClos;
-
-        /** Streamer. */
-        private String streamer;
-
-        /**
-         * Empty constructor required by {@link Externalizable}.
-         */
-        public QueryJob() {
-            // No-op.
-        }
-
-        /**
-         * @param qryClos Query closure.
-         * @param streamer Streamer.
-         */
-        private QueryJob(IgniteClosure<StreamerContext, R> qryClos, String streamer) {
-            this.qryClos = qryClos;
-            this.streamer = streamer;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object execute() {
-            IgniteStreamer s = g.streamer(streamer);
-
-            assert s != null;
-
-            return qryClos.apply(s.context());
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeObject(qryClos);
-            U.writeString(out, streamer);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            qryClos = (IgniteClosure<StreamerContext, R>)in.readObject();
-            streamer = U.readString(in);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java
deleted file mode 100644
index e9bd436..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer.task;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.processors.closure.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Streamer query task.
- */
-@ComputeTaskNoResultCache
-public class GridStreamerReduceTask<R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Query closure. */
-    private IgniteClosure<StreamerContext, R1> clos;
-
-    /** Reducer. */
-    private IgniteReducer<R1, R2> rdc;
-
-    /** Streamer. */
-    private String streamer;
-
-    /**
-     * @param clos Query closure.
-     * @param rdc Query reducer.
-     * @param streamer Streamer.
-     */
-    public GridStreamerReduceTask(IgniteClosure<StreamerContext, R1> clos, IgniteReducer<R1, R2> rdc,
-        @Nullable String streamer) {
-        super(U.peerDeployAware(clos));
-
-        this.clos = clos;
-        this.rdc = rdc;
-        this.streamer = streamer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
-        Map<ComputeJob, ClusterNode> res = U.newHashMap(subgrid.size());
-
-        for (ClusterNode node : subgrid)
-            res.put(new ReduceJob<>(clos, streamer), node);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public R2 reduce(List<ComputeJobResult> results) {
-        return rdc.reduce();
-    }
-
-    /** {@inheritDoc} */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
-        // No failover for this task.
-        if (res.getException() != null)
-            throw res.getException();
-
-        rdc.collect(res.<R1>getData());
-
-        return ComputeJobResultPolicy.WAIT;
-    }
-
-    /**
-     * Query job.
-     */
-    private static class ReduceJob<R> extends ComputeJobAdapter implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Injected grid. */
-        @IgniteInstanceResource
-        private Ignite g;
-
-        /** Query closure. */
-        private IgniteClosure<StreamerContext, R> qryClos;
-
-        /** Streamer. */
-        private String streamer;
-
-        /**
-         * Empty constructor required by {@link Externalizable}.
-         */
-        public ReduceJob() {
-            // No-op.
-        }
-
-        /**
-         * @param qryClos Query closure.
-         * @param streamer Streamer.
-         */
-        private ReduceJob(IgniteClosure<StreamerContext, R> qryClos, String streamer) {
-            this.qryClos = qryClos;
-            this.streamer = streamer;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object execute() {
-            IgniteStreamer s = g.streamer(streamer);
-
-            assert s != null;
-
-            return qryClos.apply(s.context());
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeObject(qryClos);
-            U.writeString(out, streamer);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            qryClos = (IgniteClosure<StreamerContext, R>)in.readObject();
-            streamer = U.readString(in);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 7804c9d..f095438 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.mxbean.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.streamer.*;
 import org.apache.ignite.internal.transactions.*;
 import org.apache.ignite.internal.util.io.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
@@ -7056,28 +7055,6 @@ public abstract class IgniteUtils {
     }
 
     /**
-     * Checks if given node has specified streamer started.
-     *
-     * @param n Node to check.
-     * @param streamerName Streamer name to check.
-     * @return {@code True} if given node has specified streamer started.
-     */
-    public static boolean hasStreamer(ClusterNode n, @Nullable String streamerName) {
-        assert n != null;
-
-        GridStreamerAttributes[] attrs = n.attribute(ATTR_STREAMER);
-
-        if (attrs != null) {
-            for (GridStreamerAttributes attr : attrs) {
-                if (F.eq(streamerName, attr.name()))
-                    return true;
-            }
-        }
-
-        return false;
-    }
-
-    /**
      * Gets cache mode or a cache on given node or {@code null} if cache is not
      * present on given node.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java
index 9a7458e..f6243c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java
@@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.visor.cache.*;
-import org.apache.ignite.internal.visor.streamer.*;
 
 import java.io.*;
 import java.util.*;
@@ -75,9 +74,6 @@ public class VisorGridConfiguration implements Serializable {
     /** Igfss. */
     private Iterable<VisorIgfsConfiguration> igfss;
 
-    /** Streamers. */
-    private Iterable<VisorStreamerConfiguration> streamers;
-
     /** Environment. */
     private Map<String, String> env;
 
@@ -112,7 +108,6 @@ public class VisorGridConfiguration implements Serializable {
         userAttrs = c.getUserAttributes();
         caches = VisorCacheConfiguration.list(ignite, c.getCacheConfiguration());
         igfss = VisorIgfsConfiguration.list(c.getFileSystemConfiguration());
-        streamers = VisorStreamerConfiguration.list(c.getStreamerConfiguration());
         env = new HashMap<>(System.getenv());
         sysProps = IgniteSystemProperties.snapshot();
         atomic = VisorAtomicConfiguration.from(c.getAtomicConfiguration());
@@ -213,13 +208,6 @@ public class VisorGridConfiguration implements Serializable {
     }
 
     /**
-     * @return Streamers.
-     */
-    public Iterable<VisorStreamerConfiguration> streamers() {
-        return streamers;
-    }
-
-    /**
      * @return Environment.
      */
     public Map<String, String> env() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index 6dc27f1..a01bb1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -27,8 +27,6 @@ import org.apache.ignite.internal.visor.*;
 import org.apache.ignite.internal.visor.cache.*;
 import org.apache.ignite.internal.visor.compute.*;
 import org.apache.ignite.internal.visor.igfs.*;
-import org.apache.ignite.internal.visor.streamer.*;
-import org.apache.ignite.streamer.*;
 
 import java.util.*;
 import java.util.concurrent.*;
@@ -176,34 +174,6 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
         }
     }
 
-    /**
-     * Collect streamers.
-     *
-     * @param res Job result.
-     */
-    protected void streamers(VisorNodeDataCollectorJobResult res) {
-        try {
-            StreamerConfiguration[] cfgs = ignite.configuration().getStreamerConfiguration();
-
-            if (cfgs != null) {
-                for (StreamerConfiguration cfg : cfgs) {
-                    long start0 = U.currentTimeMillis();
-
-                    try {
-                        res.streamers().add(VisorStreamer.from(ignite.streamer(cfg.getName())));
-                    }
-                    finally {
-                        if (debug)
-                            log(ignite.log(), "Collected streamer: " + cfg.getName(), getClass(), start0);
-                    }
-                }
-            }
-        }
-        catch (Throwable streamersEx) {
-            res.streamersEx(streamersEx);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override protected VisorNodeDataCollectorJobResult run(VisorNodeDataCollectorTaskArg arg) {
         return run(new VisorNodeDataCollectorJobResult(), arg);
@@ -239,11 +209,6 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
         if (debug)
             start0 = log(ignite.log(), "Collected igfs", getClass(), start0);
 
-        streamers(res);
-
-        if (debug)
-            log(ignite.log(), "Collected streamers", getClass(), start0);
-
         res.errorCount(ignite.context().exceptionRegistry().errorCount());
 
         return res;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java
index d711e06..b1bc44e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.visor.node;
 import org.apache.ignite.internal.visor.cache.*;
 import org.apache.ignite.internal.visor.event.*;
 import org.apache.ignite.internal.visor.igfs.*;
-import org.apache.ignite.internal.visor.streamer.*;
 
 import java.io.*;
 import java.util.*;
@@ -62,9 +61,6 @@ public class VisorNodeDataCollectorJobResult implements Serializable {
     /** Exception while collecting node IGFSs. */
     private Throwable igfssEx;
 
-    /** Node streamers. */
-    private final Collection<VisorStreamer> streamers = new ArrayList<>();
-
     /** Exception while collecting node streamers. */
     private Throwable streamersEx;
 
@@ -184,13 +180,6 @@ public class VisorNodeDataCollectorJobResult implements Serializable {
     }
 
     /**
-     * @return Collection of streamers metrics.
-     */
-    public Collection<VisorStreamer> streamers() {
-        return streamers;
-    }
-
-    /**
      * @return Exception caught during collecting streamers metrics.
      */
     public Throwable streamersEx() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
index 26f5be3..723b1a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java
@@ -123,9 +123,6 @@ public class VisorNodeDataCollectorTask extends VisorMultiNodeTask<VisorNodeData
         if (jobRes.cachesEx() != null)
             taskRes.cachesEx().put(nid, jobRes.cachesEx());
 
-        if (!jobRes.streamers().isEmpty())
-            taskRes.streamers().put(nid, jobRes.streamers());
-
         if (jobRes.streamersEx() != null)
             taskRes.streamersEx().put(nid, jobRes.streamersEx());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
index 7826a9c..bacfbc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.visor.node;
 import org.apache.ignite.internal.visor.cache.*;
 import org.apache.ignite.internal.visor.event.*;
 import org.apache.ignite.internal.visor.igfs.*;
-import org.apache.ignite.internal.visor.streamer.*;
 
 import java.io.*;
 import java.util.*;
@@ -68,9 +67,6 @@ public class VisorNodeDataCollectorTaskResult implements Serializable {
     /** Exceptions caught during collecting IGFS from nodes. */
     private final Map<UUID, Throwable> igfssEx = new HashMap<>();
 
-    /** All streamers collected from nodes. */
-    private final Map<UUID, Collection<VisorStreamer>> streamers = new HashMap<>();
-
     /** Exceptions caught during collecting streamers from nodes. */
     private final Map<UUID, Throwable> streamersEx = new HashMap<>();
 
@@ -90,7 +86,6 @@ public class VisorNodeDataCollectorTaskResult implements Serializable {
                 igfss.isEmpty() &&
                 igfsEndpoints.isEmpty() &&
                 igfssEx.isEmpty() &&
-                streamers.isEmpty() &&
                 streamersEx.isEmpty();
     }
 
@@ -172,13 +167,6 @@ public class VisorNodeDataCollectorTaskResult implements Serializable {
     }
 
     /**
-     * @return All streamers collected from nodes.
-     */
-    public Map<UUID, Collection<VisorStreamer>> streamers() {
-        return streamers;
-    }
-
-    /**
      * @return Exceptions caught during collecting streamers from nodes.
      */
     public Map<UUID, Throwable> streamersEx() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamer.java
deleted file mode 100644
index bd878a8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamer.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.visor.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Data transfer object for {@link org.apache.ignite.IgniteStreamer}.
- */
-public class VisorStreamer implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Streamer name. */
-    private String name;
-
-    /** Metrics. */
-    private VisorStreamerMetrics metrics;
-
-    /** Stages. */
-    private Collection<VisorStreamerStageMetrics> stages;
-
-    /**
-     * @param s Streamer.
-     * @return Data transfer object for given streamer.
-     */
-    public static VisorStreamer from(IgniteStreamer s) {
-        assert s != null;
-
-        VisorStreamer streamer = new VisorStreamer();
-
-        streamer.name(s.name());
-        streamer.metrics(VisorStreamerMetrics.from(s));
-        streamer.stages(VisorStreamerStageMetrics.stages(s));
-
-        return streamer;
-    }
-
-    /**
-     * @return Streamer name.
-     */
-    public String name() {
-        return name;
-    }
-
-    /**
-     * @param name New streamer name.
-     */
-    public void name(String name) {
-        this.name = name;
-    }
-
-    /**
-     * @return Metrics.
-     */
-    public VisorStreamerMetrics metrics() {
-        return metrics;
-    }
-
-    /**
-     * @param metrics New metrics.
-     */
-    public void metrics(VisorStreamerMetrics metrics) {
-        this.metrics = metrics;
-    }
-
-    /**
-     * @return Stages.
-     */
-    public Collection<VisorStreamerStageMetrics> stages() {
-        return stages;
-    }
-
-    /**
-     * @param stages New stages.
-     */
-    public void stages(Collection<VisorStreamerStageMetrics> stages) {
-        this.stages = stages;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(VisorStreamer.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java
deleted file mode 100644
index ab236a2..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.visor.streamer;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.streamer.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*;
-
-/**
- * Data transfer object for streamer configuration properties.
- */
-public class VisorStreamerConfiguration implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Streamer name. */
-    private String name;
-
-    /** Events router. */
-    private String router;
-
-    /** Flag indicating whether event should be processed at least once. */
-    private boolean atLeastOnce;
-
-    /** Maximum number of failover attempts to try. */
-    private int maxFailoverAttempts;
-
-    /** Maximum number of concurrent events to be processed. */
-    private int maxConcurrentSessions;
-
-    /** Streamer thread pool size. */
-    private int poolSize;
-
-    /**
-     * @param scfg Streamer configuration.
-     * @return Data transfer object for streamer configuration properties.
-     */
-    public static VisorStreamerConfiguration from(StreamerConfiguration scfg) {
-        VisorStreamerConfiguration cfg = new VisorStreamerConfiguration();
-
-        cfg.name(scfg.getName());
-        cfg.router(compactClass(scfg.getRouter()));
-        cfg.atLeastOnce(scfg.isAtLeastOnce());
-        cfg.maximumFailoverAttempts(scfg.getMaximumFailoverAttempts());
-        cfg.maximumConcurrentSessions(scfg.getMaximumConcurrentSessions());
-        cfg.threadPoolSize(scfg.getThreadPoolSize());
-
-        return cfg;
-    }
-
-    /**
-     * Construct data transfer object for streamer configurations properties.
-     *
-     * @param streamers streamer configurations.
-     * @return streamer configurations properties.
-     */
-    public static Iterable<VisorStreamerConfiguration> list(StreamerConfiguration[] streamers) {
-        if (streamers == null)
-            return Collections.emptyList();
-
-        final Collection<VisorStreamerConfiguration> cfgs = new ArrayList<>(streamers.length);
-
-        for (StreamerConfiguration streamer : streamers)
-            cfgs.add(from(streamer));
-
-        return cfgs;
-    }
-
-    /**
-     * @return Streamer name.
-     */
-    @Nullable public String name() {
-        return name;
-    }
-
-    /**
-     * @param name New streamer name.
-     */
-    public void name(@Nullable String name) {
-        this.name = name;
-    }
-
-    /**
-     * @return Events router.
-     */
-    @Nullable public String router() {
-        return router;
-    }
-
-    /**
-     * @param router New events router.
-     */
-    public void router(@Nullable String router) {
-        this.router = router;
-    }
-
-    /**
-     * @return Flag indicating whether event should be processed at least once.
-     */
-    public boolean atLeastOnce() {
-        return atLeastOnce;
-    }
-
-    /**
-     * @param atLeastOnce New flag indicating whether event should be processed at least once.
-     */
-    public void atLeastOnce(boolean atLeastOnce) {
-        this.atLeastOnce = atLeastOnce;
-    }
-
-    /**
-     * @return Maximum number of failover attempts to try.
-     */
-    public int maximumFailoverAttempts() {
-        return maxFailoverAttempts;
-    }
-
-    /**
-     * @param maxFailoverAttempts New maximum number of failover attempts to try.
-     */
-    public void maximumFailoverAttempts(int maxFailoverAttempts) {
-        this.maxFailoverAttempts = maxFailoverAttempts;
-    }
-
-    /**
-     * @return Maximum number of concurrent events to be processed.
-     */
-    public int maximumConcurrentSessions() {
-        return maxConcurrentSessions;
-    }
-
-    /**
-     * @param maxConcurrentSessions New maximum number of concurrent events to be processed.
-     */
-    public void maximumConcurrentSessions(int maxConcurrentSessions) {
-        this.maxConcurrentSessions = maxConcurrentSessions;
-    }
-
-    /**
-     * @return Streamer thread pool size.
-     */
-    public int threadPoolSize() {
-        return poolSize;
-    }
-
-    /**
-     * @param poolSize New streamer thread pool size.
-     */
-    public void threadPoolSize(int poolSize) {
-        this.poolSize = poolSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(VisorStreamerConfiguration.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetrics.java
deleted file mode 100644
index bdf6e0b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetrics.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.visor.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.streamer.*;
-
-import java.io.*;
-
-/**
- * Data transfer object for {@link org.apache.ignite.streamer.StreamerMetrics}.
- */
-public class VisorStreamerMetrics implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Active stages. */
-    private int active;
-
-    /** Waiting stages. */
-    private int waiting;
-
-    /** Stages execution capacity. */
-    private int cap;
-
-    /** Pipeline minimum execution time. */
-    private long pipelineMinExecTm;
-
-    /** Pipeline average execution time. */
-    private long pipelineAvgExecTm;
-
-    /** Pipeline maximum execution time. */
-    private long pipelineMaxExecTm;
-
-    /** Minimum number of unique nodes in pipeline execution. */
-    private int pipelineMinExecNodes;
-
-    /** Average number of unique nodes in pipeline execution. */
-    private int pipelineAvgExecNodes;
-
-    /** Maximum number of unique nodes in pipeline execution. */
-    private int pipelineMaxExecNodes;
-
-    /** Query minimum execution time. */
-    private long qryMinExecTm;
-
-    /** Query average execution time. */
-    private long qryAvgExecTm;
-
-    /** Query maximum execution time. */
-    private long qryMaxExecTm;
-
-    /** Minimum number of unique nodes in query execution. */
-    private int qryMinExecNodes;
-
-    /** Average number of unique nodes in query execution. */
-    private int qryAvgExecNodes;
-
-    /** Maximum number of unique nodes in query execution. */
-    private int qryMaxExecNodes;
-
-    /** Current window size. */
-    private int windowSize;
-
-    /**
-     * @param streamer Source streamer.
-     * @return Data transfer streamer for given streamer.
-     */
-    public static VisorStreamerMetrics from(IgniteStreamer streamer) {
-        assert streamer != null;
-
-        StreamerMetrics m = streamer.metrics();
-
-        int windowSz = 0;
-
-        for (StreamerWindowMetrics wm : m.windowMetrics())
-            windowSz += wm.size();
-
-        VisorStreamerMetrics metrics = new VisorStreamerMetrics();
-
-        metrics.active(m.stageActiveExecutionCount());
-        metrics.waiting(m.stageWaitingExecutionCount());
-        metrics.capacity(m.executorServiceCapacity());
-
-        metrics.pipelineMinExecutionTime(m.pipelineMinimumExecutionTime());
-        metrics.pipelineAvgExecutionTime(m.pipelineAverageExecutionTime());
-        metrics.pipelineMaxExecutionTime(m.pipelineMaximumExecutionTime());
-
-        metrics.pipelineMinExecutionNodes(m.pipelineMinimumExecutionNodes());
-        metrics.pipelineAvgExecutionNodes(m.pipelineAverageExecutionNodes());
-        metrics.pipelineMaxExecutionNodes(m.pipelineMaximumExecutionNodes());
-
-        metrics.queryMinExecutionTime(m.queryMinimumExecutionTime());
-        metrics.queryAvgExecutionTime(m.queryAverageExecutionTime());
-        metrics.queryMaxExecutionTime(m.queryMaximumExecutionTime());
-
-        metrics.queryMinExecutionNodes(m.queryMinimumExecutionNodes());
-        metrics.queryAvgExecutionNodes(m.queryAverageExecutionNodes());
-        metrics.queryMaxExecutionNodes(m.queryMaximumExecutionNodes());
-
-        metrics.windowSize(windowSz);
-
-        return metrics;
-    }
-
-    /**
-     * @return Active stages.
-     */
-    public int active() {
-        return active;
-    }
-
-    /**
-     * @param active New active stages.
-     */
-    public void active(int active) {
-        this.active = active;
-    }
-
-    /**
-     * @return Waiting stages.
-     */
-    public int waiting() {
-        return waiting;
-    }
-
-    /**
-     * @param waiting New waiting stages.
-     */
-    public void waiting(int waiting) {
-        this.waiting = waiting;
-    }
-
-    /**
-     * @return Stages execution capacity.
-     */
-    public int capacity() {
-        return cap;
-    }
-
-    /**
-     * @param cap New stages execution capacity.
-     */
-    public void capacity(int cap) {
-        this.cap = cap;
-    }
-
-    /**
-     * @return Pipeline minimum execution time.
-     */
-    public long pipelineMinExecutionTime() {
-        return pipelineMinExecTm;
-    }
-
-    /**
-     * @param pipelineMinExecTm New pipeline minimum execution time.
-     */
-    public void pipelineMinExecutionTime(long pipelineMinExecTm) {
-        this.pipelineMinExecTm = pipelineMinExecTm;
-    }
-
-    /**
-     * @return Pipeline average execution time.
-     */
-    public long pipelineAvgExecutionTime() {
-        return pipelineAvgExecTm;
-    }
-
-    /**
-     * @param pipelineAvgExecTm New pipeline average execution time.
-     */
-    public void pipelineAvgExecutionTime(long pipelineAvgExecTm) {
-        this.pipelineAvgExecTm = pipelineAvgExecTm;
-    }
-
-    /**
-     * @return Pipeline maximum execution time.
-     */
-    public long pipelineMaxExecutionTime() {
-        return pipelineMaxExecTm;
-    }
-
-    /**
-     * @param pipelineMaxExecTm New pipeline maximum execution time.
-     */
-    public void pipelineMaxExecutionTime(long pipelineMaxExecTm) {
-        this.pipelineMaxExecTm = pipelineMaxExecTm;
-    }
-
-    /**
-     * @return Minimum number of unique nodes in pipeline execution.
-     */
-    public int pipelineMinExecutionNodes() {
-        return pipelineMinExecNodes;
-    }
-
-    /**
-     * @param pipelineMinExecNodes New minimum number of unique nodes in pipeline execution.
-     */
-    public void pipelineMinExecutionNodes(int pipelineMinExecNodes) {
-        this.pipelineMinExecNodes = pipelineMinExecNodes;
-    }
-
-    /**
-     * @return Average number of unique nodes in pipeline execution.
-     */
-    public int pipelineAvgExecutionNodes() {
-        return pipelineAvgExecNodes;
-    }
-
-    /**
-     * @param pipelineAvgExecNodes New average number of unique nodes in pipeline execution.
-     */
-    public void pipelineAvgExecutionNodes(int pipelineAvgExecNodes) {
-        this.pipelineAvgExecNodes = pipelineAvgExecNodes;
-    }
-
-    /**
-     * @return Maximum number of unique nodes in pipeline execution.
-     */
-    public int pipelineMaxExecutionNodes() {
-        return pipelineMaxExecNodes;
-    }
-
-    /**
-     * @param pipelineMaxExecNodes New maximum number of unique nodes in pipeline execution.
-     */
-    public void pipelineMaxExecutionNodes(int pipelineMaxExecNodes) {
-        this.pipelineMaxExecNodes = pipelineMaxExecNodes;
-    }
-
-    /**
-     * @return Query minimum execution time.
-     */
-    public long queryMinExecutionTime() {
-        return qryMinExecTm;
-    }
-
-    /**
-     * @param qryMinExecTime New query minimum execution time.
-     */
-    public void queryMinExecutionTime(long qryMinExecTime) {
-        qryMinExecTm = qryMinExecTime;
-    }
-
-    /**
-     * @return Query average execution time.
-     */
-    public long queryAvgExecutionTime() {
-        return qryAvgExecTm;
-    }
-
-    /**
-     * @param qryAvgExecTime New query average execution time.
-     */
-    public void queryAvgExecutionTime(long qryAvgExecTime) {
-        qryAvgExecTm = qryAvgExecTime;
-    }
-
-    /**
-     * @return Query maximum execution time.
-     */
-    public long queryMaxExecutionTime() {
-        return qryMaxExecTm;
-    }
-
-    /**
-     * @param qryMaxExecTime New query maximum execution time.
-     */
-    public void queryMaxExecutionTime(long qryMaxExecTime) {
-        qryMaxExecTm = qryMaxExecTime;
-    }
-
-    /**
-     * @return Minimum number of unique nodes in query execution.
-     */
-    public int queryMinExecutionNodes() {
-        return qryMinExecNodes;
-    }
-
-    /**
-     * @param qryMinExecNodes New minimum number of unique nodes in query execution.
-     */
-    public void queryMinExecutionNodes(int qryMinExecNodes) {
-        this.qryMinExecNodes = qryMinExecNodes;
-    }
-
-    /**
-     * @return Average number of unique nodes in query execution.
-     */
-    public int queryAvgExecutionNodes() {
-        return qryAvgExecNodes;
-    }
-
-    /**
-     * @param qryAvgExecNodes New average number of unique nodes in query execution.
-     */
-    public void queryAvgExecutionNodes(int qryAvgExecNodes) {
-        this.qryAvgExecNodes = qryAvgExecNodes;
-    }
-
-    /**
-     * @return Maximum number of unique nodes in query execution.
-     */
-    public int queryMaxExecutionNodes() {
-        return qryMaxExecNodes;
-    }
-
-    /**
-     * @param qryMaxExecNodes New maximum number of unique nodes in query execution.
-     */
-    public void queryMaxExecutionNodes(int qryMaxExecNodes) {
-        this.qryMaxExecNodes = qryMaxExecNodes;
-    }
-
-    /**
-     * @return Current window size.
-     */
-    public int windowSize() {
-        return windowSize;
-    }
-
-    /**
-     * @param windowSize New current window size.
-     */
-    public void windowSize(int windowSize) {
-        this.windowSize = windowSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(VisorStreamerMetrics.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetricsResetTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetricsResetTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetricsResetTask.java
deleted file mode 100644
index c0ee2b1..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetricsResetTask.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.visor.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.task.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.visor.*;
-
-import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*;
-
-/**
- * Task for reset metrics for specified streamer.
- */
-@GridInternal
-public class VisorStreamerMetricsResetTask extends VisorOneNodeTask<String, Void> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override protected VisorStreamerMetricsResetJob job(String arg) {
-        return new VisorStreamerMetricsResetJob(arg, debug);
-    }
-
-    /**
-     * Job that reset streamer metrics.
-     */
-    private static class VisorStreamerMetricsResetJob extends VisorJob<String, Void> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * @param arg Streamer name.
-         * @param debug Debug flag.
-         */
-        private VisorStreamerMetricsResetJob(String arg, boolean debug) {
-            super(arg, debug);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected Void run(String streamerName) {
-            try {
-                IgniteStreamer streamer = ignite.streamer(streamerName);
-
-                streamer.resetMetrics();
-
-                return null;
-            }
-            catch (IllegalArgumentException iae) {
-                throw new IgniteException("Failed to reset metrics for streamer: " + escapeName(streamerName) +
-                    " on node: " + ignite.localNode().id(), iae);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(VisorStreamerMetricsResetJob.class, this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerResetTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerResetTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerResetTask.java
deleted file mode 100644
index 60ce94d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerResetTask.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.visor.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.task.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.visor.*;
-
-import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*;
-
-/**
- * Task for reset specified streamer.
- */
-@GridInternal
-public class VisorStreamerResetTask extends VisorOneNodeTask<String, Void> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override protected VisorStreamerResetJob job(String arg) {
-        return new VisorStreamerResetJob(arg, debug);
-    }
-
-    /**
-     * Job that reset streamer.
-     */
-    private static class VisorStreamerResetJob extends VisorJob<String, Void> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * @param arg Streamer name.
-         * @param debug Debug flag.
-         */
-        private VisorStreamerResetJob(String arg, boolean debug) {
-            super(arg, debug);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected Void run(String streamerName) {
-            try {
-                IgniteStreamer streamer = ignite.streamer(streamerName);
-
-                streamer.reset();
-
-                return null;
-            }
-            catch (IllegalArgumentException iae) {
-                throw new IgniteException("Failed to reset streamer: " + escapeName(streamerName)
-                    + " on node: " + ignite.localNode().id(), iae);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(VisorStreamerResetJob.class, this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerStageMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerStageMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerStageMetrics.java
deleted file mode 100644
index 2de597f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerStageMetrics.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.visor.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.streamer.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Data transfer object for {@link org.apache.ignite.streamer.StreamerStageMetrics}.
- */
-public class VisorStreamerStageMetrics implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Stage name. */
-    private String name;
-
-    /** Minimum execution time. */
-    private long minExecTm;
-
-    /** Average execution time. */
-    private long avgExecTm;
-
-    /** Maximum execution time. */
-    private long maxExecTm;
-
-    /** Minimum waiting time. */
-    private long minWaitingTm;
-
-    /** Average waiting time. */
-    private long avgWaitingTm;
-
-    /** Maximum waiting time. */
-    private long maxWaitingTm;
-
-    /** Executed count. */
-    private long executed;
-
-    /** Failures count. */
-    private int failures;
-
-    /** If executing. */
-    private boolean executing;
-
-    /** Throughput. */
-    private long throughput = -1;
-
-    /** Failures frequency. */
-    private int failuresFreq = -1;
-
-    /** Create data transfer object for given metrics. */
-    public static VisorStreamerStageMetrics from(StreamerStageMetrics m) {
-        assert m != null;
-
-        VisorStreamerStageMetrics metrics = new VisorStreamerStageMetrics();
-
-        metrics.name(m.name());
-
-        metrics.minExecutionTime(m.minimumExecutionTime());
-        metrics.avgExecutionTime(m.averageExecutionTime());
-        metrics.maxExecutionTime(m.maximumExecutionTime());
-
-        metrics.minWaitingTime(m.minimumWaitingTime());
-        metrics.avgWaitingTime(m.averageWaitingTime());
-        metrics.maxWaitingTime(m.maximumWaitingTime());
-
-        metrics.executed(m.totalExecutionCount());
-        metrics.failures(m.failuresCount());
-        metrics.executing(m.executing());
-
-        return metrics;
-    }
-
-    /** Create data transfer objects for all stages. */
-    public static Collection<VisorStreamerStageMetrics> stages(IgniteStreamer streamer) {
-        assert streamer != null;
-
-        Collection<VisorStreamerStageMetrics> res = new ArrayList<>();
-
-        for (StreamerStageMetrics m : streamer.metrics().stageMetrics())
-            res.add(from(m));
-
-        return res;
-    }
-
-    /**
-     * @return Stage name.
-     */
-    public String name() {
-        return name;
-    }
-
-    /**
-     * @param name New stage name.
-     */
-    public void name(String name) {
-        this.name = name;
-    }
-
-    /**
-     * @return Minimum execution time.
-     */
-    public long minExecutionTime() {
-        return minExecTm;
-    }
-
-    /**
-     * @param minExecTm New minimum execution time.
-     */
-    public void minExecutionTime(long minExecTm) {
-        this.minExecTm = minExecTm;
-    }
-
-    /**
-     * @return Average execution time.
-     */
-    public long avgExecutionTime() {
-        return avgExecTm;
-    }
-
-    /**
-     * @param avgExecTm New average execution time.
-     */
-    public void avgExecutionTime(long avgExecTm) {
-        this.avgExecTm = avgExecTm;
-    }
-
-    /**
-     * @return Maximum execution time.
-     */
-    public long maxExecutionTime() {
-        return maxExecTm;
-    }
-
-    /**
-     * @param maxExecTm New maximum execution time.
-     */
-    public void maxExecutionTime(long maxExecTm) {
-        this.maxExecTm = maxExecTm;
-    }
-
-    /**
-     * @return Minimum waiting time.
-     */
-    public long minWaitingTime() {
-        return minWaitingTm;
-    }
-
-    /**
-     * @param minWaitingTm New minimum waiting time.
-     */
-    public void minWaitingTime(long minWaitingTm) {
-        this.minWaitingTm = minWaitingTm;
-    }
-
-    /**
-     * @return Average waiting time.
-     */
-    public long avgWaitingTime() {
-        return avgWaitingTm;
-    }
-
-    /**
-     * @param avgWaitingTm New average waiting time.
-     */
-    public void avgWaitingTime(long avgWaitingTm) {
-        this.avgWaitingTm = avgWaitingTm;
-    }
-
-    /**
-     * @return Maximum waiting time.
-     */
-    public long maxWaitingTime() {
-        return maxWaitingTm;
-    }
-
-    /**
-     * @param maxWaitingTm New maximum waiting time.
-     */
-    public void maxWaitingTime(long maxWaitingTm) {
-        this.maxWaitingTm = maxWaitingTm;
-    }
-
-    /**
-     * @return Executed count.
-     */
-    public long executed() {
-        return executed;
-    }
-
-    /**
-     * @param executed New executed count.
-     */
-    public void executed(long executed) {
-        this.executed = executed;
-    }
-
-    /**
-     * @return Failures count.
-     */
-    public int failures() {
-        return failures;
-    }
-
-    /**
-     * @param failures New failures count.
-     */
-    public void failures(int failures) {
-        this.failures = failures;
-    }
-
-    /**
-     * @return If executing.
-     */
-    public boolean executing() {
-        return executing;
-    }
-
-    /**
-     * @param executing New if executing.
-     */
-    public void executing(boolean executing) {
-        this.executing = executing;
-    }
-
-    /**
-     * @return Throughput.
-     */
-    public long throughput() {
-        return throughput;
-    }
-
-    /**
-     * @param throughput New throughput.
-     */
-    public void throughput(long throughput) {
-        this.throughput = throughput;
-    }
-
-    /**
-     * @return Failures frequency.
-     */
-    public int failuresFrequency() {
-        return failuresFreq;
-    }
-
-    /**
-     * @param failuresFreq New failures frequency.
-     */
-    public void failuresFrequency(int failuresFreq) {
-        this.failuresFreq = failuresFreq;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(VisorStreamerStageMetrics.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java
deleted file mode 100644
index d0e895f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer;
-
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Streamer configuration.
- */
-public class StreamerConfiguration {
-    /** By default maximum number of concurrent sessions is unlimited. */
-    public static final int DFLT_MAX_CONCURRENT_SESSIONS = -1;
-
-    /** Default value for maximum failover attempts. */
-    public static final int DFLT_MAX_FAILOVER_ATTEMPTS = 3;
-
-    /** Name. */
-    private String name;
-
-    /** Window. */
-    private Collection<StreamerWindow> win;
-
-    /** Router. */
-    private StreamerEventRouter router;
-
-    /** Stages. */
-    @GridToStringInclude
-    private Collection<StreamerStage> stages;
-
-    /** At least once flag. */
-    private boolean atLeastOnce;
-
-    /** Maximum number of failover attempts. */
-    private int maxFailoverAttempts = DFLT_MAX_FAILOVER_ATTEMPTS;
-
-    /** Maximum number of concurrent sessions to be processed. */
-    private int maxConcurrentSessions = DFLT_MAX_CONCURRENT_SESSIONS;
-
-    /** Streamer thread pool size. */
-    private int poolSize = Runtime.getRuntime().availableProcessors();
-
-    /**
-     *
-     */
-    public StreamerConfiguration() {
-        // No-op.
-    }
-
-    /**
-     * @param c Configuration to copy.
-     */
-    public StreamerConfiguration(StreamerConfiguration c) {
-        atLeastOnce = c.isAtLeastOnce();
-        poolSize = c.getThreadPoolSize();
-        maxConcurrentSessions = c.getMaximumConcurrentSessions();
-        maxFailoverAttempts = c.getMaximumFailoverAttempts();
-        name = c.getName();
-        router = c.getRouter();
-        stages = c.getStages();
-        win = c.getWindows();
-    }
-
-    /**
-     * Gets streamer name. Must be unique within grid.
-     *
-     * @return Streamer name, if {@code null} then default streamer is returned.
-     */
-    @Nullable public String getName() {
-        return name;
-    }
-
-    /**
-     * Sets the name of the streamer.
-     *
-     * @param name Name.
-     */
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    /**
-     * Gets streamer event router.
-     *
-     * @return Event router, if {@code null} then events will be executed locally.
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable public StreamerEventRouter getRouter() {
-        return router;
-    }
-
-    /**
-     * Sets router for streamer.
-     *
-     * @param router Router.
-     */
-    public void setRouter(StreamerEventRouter router) {
-        this.router = router;
-    }
-
-    /**
-     * Gets collection of streamer event windows. At least one window should be configured. Each window
-     * must have unique name.
-     *
-     * @return Streamer windows.
-     */
-    public Collection<StreamerWindow> getWindows() {
-        return win;
-    }
-
-    /**
-     * Sets collection of streamer windows.
-     *
-     * @param win Window.
-     */
-    public void setWindows(Collection<StreamerWindow> win) {
-        this.win = win;
-    }
-
-    /**
-     * Gets collection of streamer stages. Streamer must have at least one stage to execute. Each stage
-     * must have unique name.
-     *
-     * @return Collection of streamer stages.
-     */
-    public Collection<StreamerStage> getStages() {
-        return stages;
-    }
-
-    /**
-     * Sets stages.
-     *
-     * @param stages Stages.
-     */
-    public void setStages(Collection<StreamerStage> stages) {
-        this.stages = stages;
-    }
-
-    /**
-     * Gets flag indicating whether streamer should track event execution sessions and failover event execution
-     * if any failure detected or any node on which execution happened has left the grid before successful response
-     * is received.
-     * <p>
-     * Setting this flag to {@code true} will guarantee that all pipeline stages will be executed at least once for
-     * each group of event submitted to streamer (or failure listener will be notified if failover cannot succeed).
-     * However, it does not guarantee that each stage will be executed at most once.
-     *
-     * @return {@code True} if event should be processed at least once,
-     *      or {@code false} if failures can be safely ignored.
-     */
-    public boolean isAtLeastOnce() {
-        return atLeastOnce;
-    }
-
-    /**
-     * @param atLeastOnce {@code True} to guarantee that event will be processed at least once.
-     */
-    public void setAtLeastOnce(boolean atLeastOnce) {
-        this.atLeastOnce = atLeastOnce;
-    }
-
-    /**
-     * Gets maximum number of failover attempts to try when pipeline execution has failed. This parameter
-     * is ignored if {@link #isAtLeastOnce()} is set to {@code false}.
-     * <p>
-     * If not set, default value is
-     *
-     * @return Maximum number of failover attempts to try.
-     */
-    public int getMaximumFailoverAttempts() {
-        return maxFailoverAttempts;
-    }
-
-    /**
-     * Sets maximum number of failover attempts.
-
-     * @param maxFailoverAttempts Maximum number of failover attempts.
-     * @see #getMaximumFailoverAttempts()
-     */
-    public void setMaximumFailoverAttempts(int maxFailoverAttempts) {
-        this.maxFailoverAttempts = maxFailoverAttempts;
-    }
-
-    /**
-     * Gets maximum number of concurrent events to be processed by streamer. This property is taken into
-     * account when {@link #isAtLeastOnce()} is set to {@code true}. If not positive, number of sessions
-     * will not be limited by any value.
-     *
-     * @return Maximum number of concurrent events to be processed. If number of concurrent events is greater
-     *      then this value, caller will be blocked until enough responses are received.
-     */
-    public int getMaximumConcurrentSessions() {
-        return maxConcurrentSessions;
-    }
-
-    /**
-     * Sets maximum number of concurrent sessions.
-     *
-     * @param maxConcurrentSessions Maximum number of concurrent sessions.
-     * @see #getMaximumConcurrentSessions()
-     */
-    public void setMaximumConcurrentSessions(int maxConcurrentSessions) {
-        this.maxConcurrentSessions = maxConcurrentSessions;
-    }
-
-    /**
-     * Gets streamer pool size. Defines a thread pool size in which streamer stages will be executed.
-     * <p>
-     * If not specified, thread pool executor with max pool size equal to number of cores will be created.
-     *
-     * @return Streamer thread pool size.
-     */
-    public int getThreadPoolSize() {
-        return poolSize;
-    }
-
-    /**
-     * Sets streamer pool size.
-     *
-     * @param poolSize Pool size.
-     * @see #getThreadPoolSize()
-     */
-    public void setThreadPoolSize(int poolSize) {
-        this.poolSize = poolSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(StreamerConfiguration.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/StreamerContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerContext.java b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerContext.java
deleted file mode 100644
index 7ce4cce..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerContext.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.lang.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Streamer context. Provides access to streamer local store, configured windows and various methods
- * to run streamer queries.
- */
-public interface StreamerContext {
-    /**
-     * Gets instance of dynamic grid projection including all nodes on which this streamer is running.
-     *
-     * @return Projection with all nodes on which streamer is configured.
-     */
-    public ClusterGroup projection();
-
-    /**
-     * Gets streamer local space. Note that all updates to this space will be local.
-     *
-     * @return Streamer local space.
-     */
-    public <K, V> ConcurrentMap<K, V> localSpace();
-
-    /**
-     * Gets default event window, i.e. window that is on the first place in streamer configuration.
-     *
-     * @return Default window.
-     */
-    public <E> StreamerWindow<E> window();
-
-    /**
-     * Gets streamer event window by window name, if no window with such
-     * name was configured {@link IllegalArgumentException} will be thrown.
-     *
-     * @param winName Window name.
-     * @return Window instance.
-     */
-    public <E> StreamerWindow<E> window(String winName);
-
-    /**
-     * For context passed to {@link StreamerStage#run(StreamerContext, Collection)} this method will
-     * return next stage name in execution pipeline. For context obtained from streamer object, this method will
-     * return first stage name.
-     *
-     * @return Next stage name depending on invocation context.
-     */
-    public String nextStageName();
-
-    /**
-     * Queries all streamer nodes deployed within grid. Given closure will be executed on each node on which streamer
-     * is configured. Streamer context local for that node will be passed to closure during execution. All results
-     * returned by closure will be added to result collection.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @return Result received from all streamers.
-     * @throws IgniteException If query execution failed.
-     */
-    public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo) throws IgniteException;
-
-    /**
-     * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes
-     * on which streamer is configured. Streamer context local for that node will be passed to closure during
-     * execution. All results returned by closure will be added to result collection.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on
-     *      which this streamer is running will be queried.
-     * @return Result received from all streamers.
-     * @throws IgniteException If query execution failed.
-     */
-    public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo, Collection<ClusterNode> nodes)
-        throws IgniteException;
-
-    /**
-     * Queries all streamer nodes deployed within grid. Given closure will be executed on each streamer node
-     * in the grid. No result is collected.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @throws IgniteException If closure execution failed.
-     */
-    public void broadcast(IgniteInClosure<StreamerContext> clo) throws IgniteException;
-
-    /**
-     * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes on
-     * which streamer is configured. No result is collected.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on
-     *      which this streamer is running will be queried.
-     * @throws IgniteException If closure execution failed.
-     */
-    public void broadcast(IgniteInClosure<StreamerContext> clo, Collection<ClusterNode> nodes) throws IgniteException;
-
-    /**
-     * Queries all streamer nodes deployed within grid. Given closure will be executed on each streamer node in
-     * the grid. Streamer context local for that node will be passed to closure during execution. Results returned
-     * by closure will be passed to given reducer.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @param rdc Reducer to reduce results received from remote nodes.
-     * @return Reducer result.
-     * @throws IgniteException If query execution failed.
-     */
-    public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc) throws IgniteException;
-
-    /**
-     * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes on which
-     * streamer is configured. Streamer context local for that node will be passed to closure during execution.
-     * Results returned by closure will be passed to given reducer.
-     *
-     * @param clo Function to be executed on individual nodes.
-     * @param rdc Reducer to reduce results received from remote nodes.
-     * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on
-     *      which this streamer is running will be queried.
-     * @return Reducer result.
-     * @throws IgniteException If query execution failed.
-     */
-    public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc,
-        Collection<ClusterNode> nodes) throws IgniteException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouter.java
deleted file mode 100644
index 1994a6a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer;
-
-import org.apache.ignite.cluster.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Streamer event router. Pluggable component that determines event execution flow across the grid.
- * Each time a group of events is submitted to streamer or returned to streamer by a stage, event
- * router will be used to select execution node for next stage.
- */
-public interface StreamerEventRouter {
-    /**
-     * Selects a node for given event that should be processed by a stage with given name.
-     *
-     * @param ctx Streamer context.
-     * @param stageName Stage name.
-     * @param evt Event to route.
-     * @return Node to route to. If this method returns {@code null} then the whole pipeline execution
-     *      will be terminated. All running and ongoing stages for pipeline execution will be
-     *      cancelled.
-     */
-    @Nullable public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt);
-
-    /**
-     * Selects a node for given events that should be processed by a stage with given name.
-     *
-     * @param ctx Streamer context.
-     * @param stageName Stage name to route events.
-     * @param evts Events.
-     * @return Events to node mapping. If this method returns {@code null} then the whole pipeline execution
-     *      will be terminated. All running and ongoing stages for pipeline execution will be
-     *      cancelled.
-     */
-    @Nullable public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName,
-        Collection<T> evts);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouterAdapter.java b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouterAdapter.java
deleted file mode 100644
index 6ab4eda..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouterAdapter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streamer;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.util.*;
-
-/**
- * Streamer adapter for event routers.
- */
-public abstract class StreamerEventRouterAdapter implements StreamerEventRouter {
-    /** {@inheritDoc} */
-    @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName,
-        Collection<T> evts) {
-        if (evts.size() == 1) {
-            ClusterNode route = route(ctx, stageName, F.first(evts));
-
-            if (route == null)
-                return null;
-
-            return Collections.singletonMap(route, evts);
-        }
-
-        Map<ClusterNode, Collection<T>> map = new GridLeanMap<>();
-
-        for (T e : evts) {
-            ClusterNode n = route(ctx, stageName, e);
-
-            if (n == null)
-                return null;
-
-            Collection<T> mapped = map.get(n);
-
-            if (mapped == null)
-                map.put(n, mapped = new ArrayList<>());
-
-            mapped.add(e);
-        }
-
-        return map;
-    }
-}


Mime
View raw message