ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [08/14] incubator-ignite git commit: sp-2 streaming cleanup
Date Fri, 20 Mar 2015 09:48:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
index 178c604..4cbd11a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
@@ -528,11 +528,6 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
     }
 
     /** {@inheritDoc} */
-    @Override public final ClusterGroup forStreamer(@Nullable String streamerName, @Nullable String... streamerNames) {
-        return forPredicate(new StreamersFilter(streamerName, streamerNames));
-    }
-
-    /** {@inheritDoc} */
     @Override public ClusterGroup forCacheNodes(@Nullable String cacheName,
         Set<CacheDistributionMode> distributionModes) {
         return forPredicate(new CachesFilter(cacheName, distributionModes));
@@ -700,41 +695,6 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
 
     /**
      */
-    private static class StreamersFilter implements IgnitePredicate<ClusterNode> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Streamer name. */
-        private final String streamerName;
-
-        /** Streamer names. */
-        private final String[] streamerNames;
-
-        /**
-         * @param streamerName Streamer name.
-         * @param streamerNames Streamer names.
-         */
-        private StreamersFilter(@Nullable String streamerName, @Nullable String[] streamerNames) {
-            this.streamerName = streamerName;
-            this.streamerNames = streamerNames;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean apply(ClusterNode n) {
-            if (!U.hasStreamer(n, streamerName))
-                 return false;
-
-            if (!F.isEmpty(streamerNames))
-                for (String sn : streamerNames)
-                    if (!U.hasStreamer(n, sn))
-                        return false;
-
-            return true;
-        }
-    }
-
-    /**
-     */
     private static class AttributeFilter implements IgnitePredicate<ClusterNode> {
         /** */
         private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
index 960bacd..75386e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
@@ -213,11 +213,6 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
     }
 
     /** {@inheritDoc} */
-    @Override public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames) {
-        return cluster.forStreamer(streamerName, streamerNames);
-    }
-
-    /** {@inheritDoc} */
     @Override public ClusterGroup forRemotes() {
         return cluster.forRemotes();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index d69a809..f54d85f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.datastreamer.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.processors.rest.handlers.task.*;
-import org.apache.ignite.internal.processors.streamer.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -445,21 +444,6 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            case 79:
-                msg = new GridStreamerCancelRequest();
-
-                break;
-
-            case 80:
-                msg = new GridStreamerExecutionRequest();
-
-                break;
-
-            case 81:
-                msg = new GridStreamerResponse();
-
-                break;
-
             case 82:
                 msg = new JobStealingRequest();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java
index e81cf38..52c8b0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java
@@ -494,7 +494,6 @@ public class GridDeploymentPerLoaderStore extends GridDeploymentStoreAdapter {
                 ClassLoader ldr = classLoader();
 
                 ctx.cache().onUndeployed(ldr);
-                ctx.stream().onUndeployed(ldr);
 
                 // Clear optimized marshaller's cache.
                 if (ctx.config().getMarshaller() instanceof OptimizedMarshaller)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
index 1cfe4b8..b9c9522 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java
@@ -1257,7 +1257,6 @@ public class GridDeploymentPerVersionStore extends GridDeploymentStoreAdapter {
                 ClassLoader ldr = classLoader();
 
                 ctx.cache().onUndeployed(ldr);
-                ctx.stream().onUndeployed(ldr);
 
                 // Clear optimized marshaller's cache.
                 if (ctx.config().getMarshaller() instanceof OptimizedMarshaller)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java
deleted file mode 100644
index fb6cb85..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.streamer.*;
-import org.apache.ignite.streamer.index.*;
-import org.apache.ignite.streamer.window.*;
-import org.jetbrains.annotations.*;
-
-import javax.management.*;
-import java.util.*;
-
-import static org.apache.ignite.IgniteSystemProperties.*;
-import static org.apache.ignite.internal.IgniteNodeAttributes.*;
-
-/**
- *
- */
-public class GridStreamProcessor extends GridProcessorAdapter {
-    /** Streamers map. */
-    private Map<String, IgniteStreamerImpl> map;
-
-    /** Registered MBeans */
-    private Collection<ObjectName> mBeans;
-
-    /** MBean server. */
-    private final MBeanServer mBeanSrv;
-
-    /**
-     * @param ctx Kernal context.
-     */
-    public GridStreamProcessor(GridKernalContext ctx) {
-        super(ctx);
-
-        mBeanSrv = ctx.config().getMBeanServer();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        if (ctx.config().isDaemon())
-            return;
-
-        super.onKernalStart();
-
-        if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
-            for (ClusterNode n : ctx.discovery().remoteNodes())
-                checkStreamer(n);
-        }
-
-        for (IgniteStreamerImpl s : map.values()) {
-            try {
-                mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()), "Streamer",
-                    new StreamerMBeanAdapter(s), StreamerMBean.class));
-
-                if (log.isDebugEnabled())
-                    log.debug("Registered MBean for streamer: " + s.name());
-            }
-            catch (JMException e) {
-                U.error(log, "Failed to register streamer MBean: " + s.name(), e);
-            }
-
-            // Add mbeans for stages.
-            for (StreamerStage stage : s.configuration().getStages()) {
-                try {
-                    mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()), "Stage-" + stage.name(),
-                        new StreamerStageMBeanAdapter(stage.name(), stage.getClass().getName(), s),
-                        StreamerStageMBean.class));
-
-                    if (log.isDebugEnabled())
-                        log.debug("Registered MBean for streamer stage [streamer=" + s.name() +
-                            ", stage=" + stage.name() + ']');
-                }
-                catch (JMException e) {
-                    U.error(log, "Failed to register streamer stage MBean [streamer=" + s.name() +
-                        ", stage=" + stage.name() + ']', e);
-                }
-            }
-
-            // Add mbeans for windows.
-            for (StreamerWindow win : s.configuration().getWindows()) {
-                try {
-                    if (hasInterface(win.getClass(), StreamerWindowMBean.class)) {
-                        mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()),
-                            "Window-" + win.name(),
-                            (StreamerWindowMBean)win,
-                            StreamerWindowMBean.class));
-
-                        if (log.isDebugEnabled())
-                            log.debug("Registered MBean for streamer window [streamer=" + s.name() +
-                                ", window=" + win.name() + ']');
-                    }
-                }
-                catch (JMException e) {
-                    U.error(log, "Failed to register streamer window MBean [streamer=" + s.name() +
-                        ", window=" + win.name() + ']', e);
-                }
-
-                if (win instanceof StreamerWindowAdapter) {
-                    StreamerIndexProvider[] idxs = ((StreamerWindowAdapter)win).indexProviders();
-
-                    if (idxs != null && idxs.length > 0) {
-                        for (StreamerIndexProvider idx : idxs) {
-                            try {
-                                mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()),
-                                    "Window-" + win.name() + "-index-" + idx.name(), idx,
-                                    StreamerIndexProviderMBean.class));
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Registered MBean for streamer window index [streamer=" + s.name() +
-                                        ", window=" + win.name() + ", index=" + idx.name() + ']');
-                            }
-                            catch (JMException e) {
-                                U.error(log, "Failed to register streamer index MBean [streamer=" + s.name() +
-                                    ", window=" + win.name() + ", index=" + idx.name() + ']', e);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Check configuration identity on local and remote nodes.
-     *
-     * @param rmtNode Remote node to check.
-     * @throws IgniteCheckedException If configuration mismatch detected.
-     */
-    private void checkStreamer(ClusterNode rmtNode) throws IgniteCheckedException {
-        GridStreamerAttributes[] rmtAttrs = rmtNode.attribute(ATTR_STREAMER);
-        GridStreamerAttributes[] locAttrs = ctx.discovery().localNode().attribute(ATTR_STREAMER);
-
-        // If local or remote streamer is not configured, nothing to validate.
-        if (F.isEmpty(locAttrs) || F.isEmpty(rmtAttrs))
-            return;
-
-        for (GridStreamerAttributes rmtAttr : rmtAttrs) {
-            for (GridStreamerAttributes locAttr : locAttrs) {
-                if (!F.eq(rmtAttr.name(), locAttr.name()))
-                    continue;
-
-                if (rmtAttr.atLeastOnce() != locAttr.atLeastOnce())
-                    throw new IgniteCheckedException("Streamer atLeastOnce configuration flag mismatch (fix atLeastOnce flag " +
-                        "in streamer configuration or set " +
-                        "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " +
-                        "property) [streamer=" + locAttr.name() +
-                        ", locAtLeastOnce=" + locAttr.atLeastOnce() +
-                        ", rmtAtLeastOnce=" + rmtAttr.atLeastOnce() +
-                        ", rmtNodeId=" + rmtNode.id() + ']');
-
-                if (!rmtAttr.stages().equals(locAttr.stages()))
-                    throw new IgniteCheckedException("Streamer stages configuration mismatch (fix streamer stages " +
-                        "configuration or set " +
-                        "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " +
-                        "property) [streamer=" + locAttr.name() +
-                        ", locStages=" + locAttr.stages() +
-                        ", rmtStages=" + rmtAttr.stages() +
-                        ", rmtNodeId=" + rmtNode.id() + ']');
-
-                if (rmtAttr.atLeastOnce()) {
-                    if (rmtAttr.maxFailoverAttempts() != locAttr.maxFailoverAttempts())
-                        U.warn(log, "Streamer maxFailoverAttempts configuration property differs on local and remote " +
-                            "nodes (ignore this message if it is done on purpose) [streamer=" + locAttr.name() +
-                            ", locMaxFailoverAttempts=" + locAttr.maxFailoverAttempts() +
-                            ", rmtMaxFailoverAttempts=" + rmtAttr.maxFailoverAttempts() +
-                            ", rmtNodeId=" + rmtNode.id() + ']');
-
-                    if (rmtAttr.maxConcurrentSessions() != locAttr.maxConcurrentSessions())
-                        U.warn(log, "Streamer maxConcurrentSessions configuration property differs on local and " +
-                            "remote nodes (ignore this message if it is done on purpose) [streamer=" + locAttr.name() +
-                            ", locMaxConcurrentSessions=" + locAttr.maxConcurrentSessions() +
-                            ", rmtMaxConcurrentSessions=" + rmtAttr.maxConcurrentSessions() +
-                            ", rmtNodeId=" + rmtNode.id() + ']');
-                }
-            }
-        }
-    }
-
-    /**
-     * Traverses class hierarchy and checks if class implements given interface.
-     *
-     * @param cls Class to check.
-     * @param iface Interface to search for.
-     * @return {@code True} if at least one parent implements given interface.
-     */
-    private boolean hasInterface(Class<?> cls, Class<?> iface) {
-        while (cls != null) {
-            Class<?>[] interfaces = cls.getInterfaces();
-
-            for (Class<?> iface0 : interfaces) {
-                if (iface0.equals(iface))
-                    return true;
-            }
-
-            cls = cls.getSuperclass();
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        if (ctx.config().isDaemon())
-            return;
-
-        super.start();
-
-        StreamerConfiguration[] cfg = ctx.config().getStreamerConfiguration();
-
-        if (F.isEmpty(cfg)) {
-            map = Collections.emptyMap();
-
-            return;
-        }
-        else {
-            int len = cfg.length;
-
-            map = new HashMap<>(len, 1.0f);
-
-            mBeans = new ArrayList<>(len);
-        }
-
-        for (StreamerConfiguration c : cfg) {
-            IgniteStreamerImpl s = new IgniteStreamerImpl(ctx, c);
-
-            s.start();
-
-            IgniteStreamerImpl old = map.put(c.getName(), s);
-
-            if (old != null) {
-                old.stop(true);
-
-                throw new IgniteCheckedException("Duplicate streamer name found (check configuration and " +
-                    "assign unique name to each streamer): " + c.getName());
-            }
-        }
-
-        if (F.isEmpty(cfg))
-            return;
-
-        GridStreamerAttributes[] arr = new GridStreamerAttributes[cfg.length];
-
-        int i = 0;
-
-        for (StreamerConfiguration c : cfg)
-            arr[i++] = new GridStreamerAttributes(c);
-
-        ctx.addNodeAttribute(ATTR_STREAMER, arr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        if (ctx.config().isDaemon())
-            return;
-
-        super.onKernalStop(cancel);
-
-        if (!F.isEmpty(mBeans)) {
-            for (ObjectName name : mBeans) {
-                try {
-                    mBeanSrv.unregisterMBean(name);
-                }
-                catch (JMException e) {
-                    U.error(log, "Failed to unregister streamer MBean.", e);
-                }
-            }
-        }
-
-        for (IgniteStreamerImpl streamer : map.values())
-            streamer.onKernalStop(cancel);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) throws IgniteCheckedException {
-        if (ctx.config().isDaemon())
-            return;
-
-        super.stop(cancel);
-
-        for (IgniteStreamerImpl s : map.values())
-            s.stop(cancel);
-    }
-
-    /**
-     * @return Default no-name streamer.
-     */
-    public IgniteStreamer streamer() {
-        return streamer(null);
-    }
-
-    /**
-     * @param name Streamer name.
-     * @return Streamer for given name.
-     */
-    public IgniteStreamer streamer(@Nullable String name) {
-        IgniteStreamer streamer = map.get(name);
-
-        if (streamer == null)
-            throw new IllegalArgumentException("Streamer is not configured: " + name);
-
-        return streamer;
-    }
-
-    /**
-     * @return All configured streamers.
-     */
-    public Collection<IgniteStreamer> streamers() {
-        Collection<IgniteStreamerImpl> streamers = map.values();
-
-        Collection<IgniteStreamer> res = new ArrayList<>(streamers.size());
-
-        streamers.addAll(map.values());
-
-        return res;
-    }
-
-    /**
-     * Callback for undeployed class loaders.
-     *
-     * @param ldr Class loader.
-     */
-    public void onUndeployed(ClassLoader ldr) {
-        for (IgniteStreamerEx streamer : map.values())
-            streamer.onUndeploy(ldr);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerAttributes.java
deleted file mode 100644
index e4c3094..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerAttributes.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streamer.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- *
- */
-public class GridStreamerAttributes implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private String name;
-
-    /** Stages. */
-    private Collection<IgniteBiTuple<String, String>> stages;
-
-    /** At least once flag. */
-    private boolean atLeastOnce;
-
-    /** Max failover attempts. */
-    private int maxFailoverAttempts;
-
-    /** Max concurrent sessions. */
-    private int maxConcurrentSes;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridStreamerAttributes() {
-        // No-op.
-    }
-
-    /**
-     * @param cfg Streamer configuration.
-     */
-    public GridStreamerAttributes(StreamerConfiguration cfg) {
-        atLeastOnce = cfg.isAtLeastOnce();
-        maxConcurrentSes = cfg.getMaximumConcurrentSessions();
-        maxFailoverAttempts = cfg.getMaximumFailoverAttempts();
-        name = cfg.getName();
-
-        stages = new LinkedList<>();
-
-        if (!F.isEmpty(cfg.getStages())) {
-            for (StreamerStage stage : cfg.getStages())
-                stages.add(F.t(stage.name(), stage.getClass().getName()));
-        }
-    }
-
-    /**
-     * @return Name.
-     */
-    @Nullable public String name() {
-        return name;
-    }
-
-    /**
-     * @return Streamer stages.
-     */
-    public Collection<IgniteBiTuple<String, String>> stages() {
-        return stages;
-    }
-
-    /**
-     * @return At least once flag.
-     */
-    public boolean atLeastOnce() {
-        return atLeastOnce;
-    }
-
-    /**
-     * @return Max failover attempts.
-     */
-    public int maxFailoverAttempts() {
-        return maxFailoverAttempts;
-    }
-
-    /**
-     * @return Max concurrent sessions.
-     */
-    public int maxConcurrentSessions() {
-        return maxConcurrentSes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeBoolean(atLeastOnce);
-        out.writeInt(maxConcurrentSes);
-        out.writeInt(maxFailoverAttempts);
-        U.writeString(out, name);
-        U.writeCollection(out, stages);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        atLeastOnce = in.readBoolean();
-        maxConcurrentSes = in.readInt();
-        maxFailoverAttempts = in.readInt();
-        name = U.readString(in);
-        stages = U.readCollection(in);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java
deleted file mode 100644
index c7669b7..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.io.*;
-import java.nio.*;
-
-/**
- * Streamer cancel request.
- */
-public class GridStreamerCancelRequest implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Cancelled future ID. */
-    private IgniteUuid cancelledFutId;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridStreamerCancelRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param cancelledFutId Cancelled future ID.
-     */
-    public GridStreamerCancelRequest(IgniteUuid cancelledFutId) {
-        this.cancelledFutId = cancelledFutId;
-    }
-
-    /**
-     * @return Cancelled future ID.
-     */
-    public IgniteUuid cancelledFutureId() {
-        return cancelledFutId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeIgniteUuid("cancelledFutId", cancelledFutId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cancelledFutId = reader.readIgniteUuid("cancelledFutId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 79;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 1;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextDelegate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextDelegate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextDelegate.java
deleted file mode 100644
index 4421ad6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextDelegate.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streamer.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Context delegate allowing to override next stage name.
- */
-public class GridStreamerContextDelegate implements StreamerContext {
-    /** Context delegate. */
-    private StreamerContext delegate;
-
-    /** Next stage name. */
-    private String nextStageName;
-
-    /**
-     * @param delegate Delegate object.
-     * @param nextStageName Next stage name.
-     */
-    public GridStreamerContextDelegate(StreamerContext delegate, @Nullable String nextStageName) {
-        this.delegate = delegate;
-        this.nextStageName = nextStageName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterGroup projection() {
-        return delegate.projection();
-    }
-
-    /** {@inheritDoc} */
-    @Override public <K, V> ConcurrentMap<K, V> localSpace() {
-        return delegate.localSpace();
-    }
-
-    /** {@inheritDoc} */
-    @Override public <E> StreamerWindow<E> window() {
-        return delegate.window();
-    }
-
-    /** {@inheritDoc} */
-    @Override public <E> StreamerWindow<E> window(String winName) {
-        return delegate.window(winName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String nextStageName() {
-        return nextStageName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo) {
-        return delegate.query(clo);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo, Collection<ClusterNode> nodes) {
-        return delegate.query(clo, nodes);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void broadcast(IgniteInClosure<StreamerContext> clo) {
-        delegate.broadcast(clo);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void broadcast(IgniteInClosure<StreamerContext> clo, Collection<ClusterNode> nodes) {
-        delegate.broadcast(clo, nodes);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc) {
-        return delegate.reduce(clo, rdc);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc,
-        Collection<ClusterNode> nodes) {
-        return delegate.reduce(clo, rdc, nodes);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java
deleted file mode 100644
index c2a992d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.streamer.task.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streamer.*;
-import org.jdk8.backport.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Streamer context implementation.
- */
-public class GridStreamerContextImpl implements StreamerContext {
-    /** Kernal context. */
-    private GridKernalContext ctx;
-
-    /** Local space. */
-    private final ConcurrentMap<Object, Object> locSpace = new ConcurrentHashMap8<>();
-
-    /** Streamer projection. */
-    private AtomicReference<ClusterGroup> streamPrj = new AtomicReference<>();
-
-    /** Streamer. */
-    private IgniteStreamerEx streamer;
-
-    /** Next stage name. */
-    private String nextStageName;
-
-    /**
-     * @param ctx Kernal context.
-     * @param cfg Streamer configuration.
-     * @param streamer Streamer impl.
-     */
-    public GridStreamerContextImpl(GridKernalContext ctx, StreamerConfiguration cfg, IgniteStreamerEx streamer) {
-        assert ctx != null;
-        assert cfg != null;
-        assert streamer != null;
-
-        this.ctx = ctx;
-        this.streamer = streamer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterGroup projection() {
-        ctx.gateway().readLock();
-
-        try {
-            return projection0();
-        }
-        finally {
-            ctx.gateway().readUnlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public <K, V> ConcurrentMap<K, V> localSpace() {
-        return (ConcurrentMap<K, V>)locSpace;
-    }
-
-    /** {@inheritDoc} */
-    @Override public <E> StreamerWindow<E> window() {
-        return streamer.window();
-    }
-
-    /** {@inheritDoc} */
-    @Override public <E> StreamerWindow<E> window(String winName) {
-        StreamerWindow<E> window = streamer.window(winName);
-
-        if (window == null)
-            throw new IllegalArgumentException("Streamer window is not configured: " + winName);
-
-        return window;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String nextStageName() {
-        return nextStageName;
-    }
-
-    /**
-     * Sets next stage name for main context.
-     *
-     * @param nextStageName Next stage name.
-     */
-    public void nextStageName(String nextStageName) {
-        this.nextStageName = nextStageName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo) {
-        return query(clo, Collections.<ClusterNode>emptyList());
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo, Collection<ClusterNode> nodes) {
-        ctx.gateway().readLock();
-
-        try {
-            ClusterGroup prj = projection0();
-
-            if (!F.isEmpty(nodes))
-                prj = prj.forNodes(nodes);
-
-            long startTime = U.currentTimeMillis();
-
-            Collection<R> res = ctx.grid().compute(prj).execute(new GridStreamerQueryTask<>(clo, streamer.name()), null);
-
-            streamer.onQueryCompleted(U.currentTimeMillis() - startTime, prj.nodes().size());
-
-            return res;
-        }
-        finally {
-            ctx.gateway().readUnlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void broadcast(IgniteInClosure<StreamerContext> clo) {
-        broadcast(clo, Collections.<ClusterNode>emptyList());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void broadcast(IgniteInClosure<StreamerContext> clo, Collection<ClusterNode> nodes) {
-        ctx.gateway().readLock();
-
-        try {
-            ClusterGroup prj = projection0();
-
-            if (!F.isEmpty(nodes))
-                prj = prj.forNodes(nodes);
-
-            ctx.grid().compute(prj).execute(new GridStreamerBroadcastTask(clo, streamer.name()), null);
-        }
-        finally {
-            ctx.gateway().readUnlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc) {
-        return reduce(clo, rdc, Collections.<ClusterNode>emptyList());
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc,
-        Collection<ClusterNode> nodes) {
-        ctx.gateway().readLock();
-
-        try {
-            ClusterGroup prj = projection0();
-
-            if (!F.isEmpty(nodes))
-                prj = prj.forNodes(nodes);
-
-            return ctx.grid().compute(prj).execute(new GridStreamerReduceTask<>(clo, rdc, streamer.name()), null);
-        }
-        finally {
-            ctx.gateway().readUnlock();
-        }
-    }
-
-    /**
-     * @return Streamer projection without grabbing read lock.
-     */
-    private ClusterGroup projection0() {
-        ClusterGroup prj = streamPrj.get();
-
-        if (prj == null) {
-            prj = ctx.grid().cluster().forStreamer(streamer.name());
-
-            streamPrj.compareAndSet(null, prj);
-
-            prj = streamPrj.get();
-        }
-
-        return prj;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionBatch.java
deleted file mode 100644
index 055ed8f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionBatch.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Streamer execution batch which should be processed on one node.
- */
-public class GridStreamerExecutionBatch implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Execution ID (ID of root future). */
-    private IgniteUuid execId;
-
-    /** Execution start timestamp. */
-    private long execStartTs;
-
-    /** Originating future ID. */
-    private IgniteUuid futId;
-
-    /** Nodes participated in this execution. */
-    @GridToStringInclude
-    private Collection<UUID> execNodeIds;
-
-    /** Stage name. */
-    private String stageName;
-
-    /** Events to process. */
-    private Collection<Object> evts;
-
-    /** Deployment. This is transient field. */
-    private transient GridDeployment dep;
-
-    /**
-     * Empty constructor required by {@code Externalizable}.
-     */
-    public GridStreamerExecutionBatch() {
-        // No-op.
-    }
-
-    /**
-     * Execution batch.
-     *
-     * @param execId Execution ID.
-     * @param futId Future ID.
-     * @param execStartTs Execution start timestamp.
-     * @param execNodeIds Nodes participated in this execution.
-     * @param stageName Stage name to execute.
-     * @param evts Events to process.
-     */
-    public GridStreamerExecutionBatch(
-        IgniteUuid execId,
-        long execStartTs,
-        IgniteUuid futId,
-        Collection<UUID> execNodeIds,
-        String stageName,
-        Collection<Object> evts
-    ) {
-        assert stageName != null;
-
-        this.execId = execId;
-        this.futId = futId;
-        this.execStartTs = execStartTs;
-        this.execNodeIds = execNodeIds;
-        this.stageName = stageName;
-        this.evts = evts;
-    }
-
-    /**
-     * Sets deployment. Deployment is not marshalled.
-     *
-     * @param dep Deployment for batch.
-     */
-    public void deployment(GridDeployment dep) {
-        this.dep = dep;
-    }
-
-    /**
-     * @return Batch deployment, if any.
-     */
-    @Nullable GridDeployment deployment() {
-        return dep;
-    }
-
-    /**
-     * @return Execution ID.
-     */
-    public IgniteUuid executionId() {
-        return execId;
-    }
-
-    /**
-     * @return Execution start timestamp.
-     */
-    public long executionStartTimeStamp() {
-        return execStartTs;
-    }
-
-    /**
-     * @return Batch future ID.
-     */
-    public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Execution node IDs.
-     */
-    public Collection<UUID> executionNodeIds() {
-        return execNodeIds;
-    }
-
-    /**
-     * @return Stage name.
-     */
-    public String stageName() {
-        return stageName;
-    }
-
-    /**
-     * @return Events collection.
-     */
-    public Collection<Object> events() {
-        return evts;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeGridUuid(out, execId);
-        out.writeLong(execStartTs);
-        U.writeGridUuid(out, futId);
-        U.writeUuids(out, execNodeIds);
-        out.writeUTF(stageName);
-        U.writeCollection(out, evts);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        execId = U.readGridUuid(in);
-        execStartTs = in.readLong();
-        futId = U.readGridUuid(in);
-        execNodeIds = U.readUuids(in);
-        stageName = in.readUTF();
-        evts = U.readCollection(in);
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-        return S.toString(GridStreamerExecutionBatch.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java
deleted file mode 100644
index fc86c3b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- *
- */
-public class GridStreamerExecutionRequest implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Force local deployment flag. */
-    private boolean forceLocDep;
-
-    /** Serialized batch in case if P2P class loading is enabled. */
-    @GridToStringExclude
-    private byte[] batchBytes;
-
-    /** Deployment mode. */
-    private DeploymentMode depMode;
-
-    /** Deployment sample class name. */
-    private String sampleClsName;
-
-    /** Deployment user version. */
-    private String userVer;
-
-    /** Node class loader participants. */
-    @GridToStringInclude
-    @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
-    private Map<UUID, IgniteUuid> ldrParticipants;
-
-    /** Class loader ID. */
-    private IgniteUuid clsLdrId;
-
-    /**
-     *
-     */
-    public GridStreamerExecutionRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param forceLocDep Force local deployment flag.
-     * @param batchBytes Batch serialized bytes.
-     * @param depMode Deployment mode.
-     * @param sampleClsName Sample class name.
-     * @param userVer User version.
-     * @param ldrParticipants Loader participants.
-     * @param clsLdrId Class loader ID.
-     */
-    public GridStreamerExecutionRequest(
-        boolean forceLocDep,
-        byte[] batchBytes,
-        @Nullable DeploymentMode depMode,
-        @Nullable String sampleClsName,
-        @Nullable String userVer,
-        @Nullable Map<UUID, IgniteUuid> ldrParticipants,
-        @Nullable IgniteUuid clsLdrId
-    ) {
-        assert batchBytes != null;
-
-        this.forceLocDep = forceLocDep;
-        this.batchBytes = batchBytes;
-        this.depMode = depMode;
-        this.sampleClsName = sampleClsName;
-        this.userVer = userVer;
-        this.ldrParticipants = ldrParticipants;
-        this.clsLdrId = clsLdrId;
-    }
-
-    /**
-     * @return Force local deployment flag.
-     */
-    public boolean forceLocalDeployment() {
-        return forceLocDep;
-    }
-
-    /**
-     * @return Deployment mode.
-     */
-    public DeploymentMode deploymentMode() {
-        return depMode;
-    }
-
-    /**
-     * @return Deployment sample class name.
-     */
-    public String sampleClassName() {
-        return sampleClsName;
-    }
-
-    /**
-     * @return Deployment user version.
-     */
-    public String userVersion() {
-        return userVer;
-    }
-
-    /**
-     * @return Node class loader participants.
-     */
-    public Map<UUID, IgniteUuid> loaderParticipants() {
-        return ldrParticipants;
-    }
-
-    /**
-     * @return Class loader ID.
-     */
-    public IgniteUuid classLoaderId() {
-        return clsLdrId;
-    }
-
-    /**
-     * @return Serialized batch in case if P2P class loading is enabled.
-     */
-    public byte[] batchBytes() {
-        return batchBytes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridStreamerExecutionRequest.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray("batchBytes", batchBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeIgniteUuid("clsLdrId", clsLdrId))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 3:
-                if (!writer.writeBoolean("forceLocDep", forceLocDep))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeString("sampleClsName", sampleClsName))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeString("userVer", userVer))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                batchBytes = reader.readByteArray("batchBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                clsLdrId = reader.readIgniteUuid("clsLdrId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                byte depModeOrd;
-
-                depModeOrd = reader.readByte("depMode");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                depMode = DeploymentMode.fromOrdinal(depModeOrd);
-
-                reader.incrementState();
-
-            case 3:
-                forceLocDep = reader.readBoolean("forceLocDep");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 4:
-                ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                sampleClsName = reader.readString("sampleClsName");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                userVer = reader.readString("userVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 80;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 7;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java
deleted file mode 100644
index 36f8822..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
-
-import java.nio.*;
-
-/**
- *
- */
-public class GridStreamerResponse implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private IgniteUuid futId;
-
-    /** */
-    private byte[] errBytes;
-
-    /**
-     *
-     */
-    public GridStreamerResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param futId Future ID.
-     * @param errBytes Serialized error, if any.
-     */
-    public GridStreamerResponse(IgniteUuid futId, @Nullable byte[] errBytes) {
-        assert futId != null;
-
-        this.futId = futId;
-        this.errBytes = errBytes;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Serialized error.
-     */
-    public byte[] errorBytes() {
-        return errBytes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridStreamerResponse.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray("errBytes", errBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeIgniteUuid("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                errBytes = reader.readByteArray("errBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                futId = reader.readIgniteUuid("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 81;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 2;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerRouteFailedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerRouteFailedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerRouteFailedException.java
deleted file mode 100644
index f05557f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerRouteFailedException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.*;
-
-/**
- * Exception thrown when router did not return route map. In this case pipeline execution is stopped
- * and corresponding callback is executed on originating node.
- */
-public class GridStreamerRouteFailedException extends IgniteCheckedException {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * @param msg Error message.
-     */
-    public GridStreamerRouteFailedException(String msg) {
-        super(msg);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java
deleted file mode 100644
index d11ad39..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streamer.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Streamer execution future.
- */
-public class GridStreamerStageExecutionFuture extends GridFutureAdapter<Object> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Execution ID. */
-    private final IgniteUuid execId;
-
-    /** Execution start timestamp. */
-    private final long execStartTs;
-
-    /** Future ID. */
-    private final IgniteUuid futId;
-
-    /** Parent future ID. By the contract, global ID is sender node ID. */
-    private final IgniteUuid parentFutId;
-
-    /** Stage name. */
-    private final String stageName;
-
-    /** Events. */
-    private final Collection<Object> evts;
-
-    /** Failover attempts count. */
-    private int failoverAttemptCnt;
-
-    /** Child executions. */
-    private final ConcurrentMap<UUID, GridStreamerExecutionBatch> childExecs = new ConcurrentHashMap<>();
-
-    /** Nodes on which this pipeline is known to be executed. */
-    private final Set<UUID> execNodeIds = new GridConcurrentHashSet<>();
-
-    /** Streamer context. */
-    @GridToStringExclude
-    private final IgniteStreamerEx streamer;
-
-    /** Metrics holder. */
-    @GridToStringExclude
-    private StreamerMetricsHolder metricsHolder;
-
-    /**
-    * @param streamer Streamer extended context.
-    * @param execId Execution ID. If parent future ID is {@code null} then this is a root future
-    *       and execution ID must be {@code null}.
-    * @param failoverAttemptCnt Number of attempts this set of events was tried to failover.
-    * @param execStartTs Execution start timestamp.
-    * @param parentFutId Parent future ID.
-    * @param prevExecNodes Node IDs on which pipeline was already executed.
-    * @param stageName Stage name to run.
-    * @param evts Events to process.
-    */
-    public GridStreamerStageExecutionFuture(
-        IgniteStreamerEx streamer,
-        @Nullable IgniteUuid execId,
-        int failoverAttemptCnt,
-        long execStartTs,
-        @Nullable IgniteUuid parentFutId,
-        @Nullable Collection<UUID> prevExecNodes,
-        String stageName,
-        Collection<?> evts
-    ) {
-        assert streamer != null;
-        assert stageName != null;
-        assert evts != null;
-        assert !evts.isEmpty();
-        assert (execId == null && parentFutId == null) || (execId != null && parentFutId != null);
-
-        this.streamer = streamer;
-        futId = IgniteUuid.fromUuid(streamer.kernalContext().localNodeId());
-        this.parentFutId = parentFutId;
-
-        this.execId = parentFutId == null ? futId : execId;
-        this.failoverAttemptCnt = failoverAttemptCnt;
-        this.execStartTs = execStartTs;
-
-        this.stageName = stageName;
-        this.evts = (Collection<Object>)evts;
-
-        if (prevExecNodes != null)
-            execNodeIds.addAll(prevExecNodes);
-
-        log = streamer.kernalContext().log(GridStreamerStageExecutionFuture.class);
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public IgniteUuid id() {
-        return futId;
-    }
-
-    /**
-     * Sets metrics holder to update counters when future completes. Used to avoid unnecessary listener creation.
-     *
-     * @param metricsHolder Metrics holder.
-     */
-    public void metrics(StreamerMetricsHolder metricsHolder) {
-        assert metricsHolder != null;
-        assert rootExecution();
-
-        this.metricsHolder = metricsHolder;
-    }
-
-    /**
-     * @return Failover attempt count.
-     */
-    public int failoverAttemptCount() {
-        return failoverAttemptCnt;
-    }
-
-    /**
-     * @return Stage name.
-     */
-    public String stageName() {
-        return stageName;
-    }
-
-    /**
-     * @return Events collection.
-     */
-    public Collection<Object> events() {
-        return evts;
-    }
-
-    /**
-     * Sends execution requests to remote nodes or schedules local execution if events were mapped locally.
-     */
-    public void map() {
-        try {
-            // This will be a no-op when atLeastOnce is false, so this future will be discarded right
-            // after map() is executed.
-            streamer.onFutureMapped(this);
-
-            StreamerEventRouter evtRouter = streamer.eventRouter();
-
-            Map<ClusterNode, Collection<Object>> routeMap = evtRouter.route(streamer.context(), stageName, evts);
-
-            if (log.isDebugEnabled())
-                log.debug("Mapped stage to nodes [futId=" + futId + ", stageName=" + stageName +
-                    ", nodeIds=" + (routeMap != null ? U.nodeIds(routeMap.keySet()) : null) + ']');
-
-            if (F.isEmpty(routeMap)) {
-                U.error(log, "Failed to route events to nodes (will fail pipeline execution) " +
-                    "[streamer=" + streamer.name() + ", stageName=" + stageName + ", evts=" + evts + ']');
-
-                UUID locNodeId = streamer.kernalContext().localNodeId();
-
-                onFailed(locNodeId, new GridStreamerRouteFailedException("Failed to route " +
-                    "events to nodes (router returned null or empty route map) [locNodeId=" + locNodeId + ", " +
-                    "stageName=" + stageName + ']'));
-            }
-            else {
-                execNodeIds.addAll(U.nodeIds(routeMap.keySet()));
-
-                for (Map.Entry<ClusterNode, Collection<Object>> entry : routeMap.entrySet()) {
-                    ClusterNode node = entry.getKey();
-
-                    childExecs.put(node.id(), new GridStreamerExecutionBatch(
-                        execId,
-                        execStartTs,
-                        futId,
-                        execNodeIds,
-                        stageName,
-                        entry.getValue()));
-                }
-
-                // Send execution requests to nodes.
-                streamer.scheduleExecutions(this, childExecs);
-            }
-        }
-        catch (IgniteCheckedException e) {
-            onFailed(streamer.kernalContext().localNodeId(), e);
-        }
-    }
-
-    /**
-     * @return {@code True} if this future is a root execution future (i.e. initiated by streamer's addEvent call).
-     */
-    public boolean rootExecution() {
-        return parentFutId == null;
-    }
-
-    /**
-     * If not root future, will return parent node ID (may be local node ID).
-     *
-     * @return Sender node ID.
-     */
-    @Nullable public UUID senderNodeId() {
-        return parentFutId == null ? null : parentFutId.globalId();
-    }
-
-    /**
-     * If not root future, will return parent future ID.
-     *
-     * @return Parent future ID.
-     */
-    public IgniteUuid parentFutureId() {
-        return parentFutId;
-    }
-
-    /**
-     * @return Map of child executions.
-     */
-    public Map<UUID, GridStreamerExecutionBatch> childExecutions() {
-        return Collections.unmodifiableMap(childExecs);
-    }
-
-    /**
-     * @return Execution node IDs.
-     */
-    public Collection<UUID> executionNodeIds() {
-        return execNodeIds;
-    }
-
-    /**
-     * Callback invoked when child node reports execution is completed (successfully or not).
-     *
-     * @param childNodeId Child node ID.
-     * @param err Exception if execution failed.
-     */
-    public void onExecutionCompleted(UUID childNodeId, @Nullable Throwable err) {
-        if (log.isDebugEnabled())
-            log.debug("Completed child execution for node [fut=" + this +
-                ", childNodeId=" + childNodeId + ", err=" + err + ']');
-
-        if (err != null)
-            onFailed(childNodeId, err);
-        else {
-            childExecs.remove(childNodeId);
-
-            if (childExecs.isEmpty())
-                onDone();
-        }
-    }
-
-    /**
-     * Callback invoked when node leaves the grid. If left node is known to participate in
-     * pipeline execution, cancel all locally running stages.
-     *
-     * @param leftNodeId Node ID that has left the grid.
-     */
-    public void onNodeLeft(UUID leftNodeId) {
-        if (execNodeIds.contains(leftNodeId))
-            onFailed(leftNodeId, new ClusterTopologyCheckedException("Failed to wait for streamer pipeline future completion " +
-                "(execution node has left the grid). All running stages will be cancelled " +
-                "[fut=" + this + ", leftNodeId=" + leftNodeId + ']'));
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
-        if (super.onDone(res, err)) {
-            if (log.isDebugEnabled())
-                log.debug("Completed stage execution future [fut=" + this + ", err=" + err + ']');
-
-            if (rootExecution() && metricsHolder != null) {
-                if (err != null)
-                    metricsHolder.onSessionFinished();
-                else
-                    metricsHolder.onSessionFailed();
-            }
-
-            streamer.onFutureCompleted(this);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Failed callback.
-     *
-     * @param failedNodeId Failed node ID.
-     * @param err Error reason.
-     */
-    private void onFailed(UUID failedNodeId, Throwable err) {
-        if (log.isDebugEnabled())
-            log.debug("Pipeline execution failed on node [fut=" + this + ", failedNodeId=" + failedNodeId +
-                ", err=" + err + ']');
-
-        onDone(err);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean cancel() throws IgniteCheckedException {
-        if (!onCancelled())
-            return false;
-
-        if (log.isDebugEnabled())
-            log.debug("Cancelling streamer execution future: " + this);
-
-        streamer.onFutureCompleted(this);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-        return S.toString(GridStreamerStageExecutionFuture.class, this, "childNodes", childExecs.keySet());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerWindowIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerWindowIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerWindowIterator.java
deleted file mode 100644
index d9cd56f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerWindowIterator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Helper iterator extension which prevents regular element remove and adds removex() method tracking which element
- * was actually removed.
- */
-public abstract class GridStreamerWindowIterator<T> implements Iterator<T> {
-    /** {@inheritDoc} */
-    @Override public void remove() {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * Remove element from the underlying collection and return removed element.
-     *
-     * @return Removed element or {@code null} in case no deletion occurred.
-     */
-    @Nullable public abstract T removex();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java
deleted file mode 100644
index 0158fab..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.streamer;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.streamer.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Extended streamer context with methods intended for internal use.
- */
-public interface IgniteStreamerEx extends IgniteStreamer {
-    /**
-     * @return Kernal context.
-     */
-    public GridKernalContext kernalContext();
-
-    /**
-     * Gets streamer default window (the first one in configuration list).
-     *
-     * @return Streamer window.
-     */
-    public <E> StreamerWindow<E> window();
-
-    /**
-     * Gets streamer window by window name.
-     *
-     * @param windowName Window name.
-     * @return Streamer window.
-     */
-    @Nullable public <E> StreamerWindow<E> window(String windowName);
-
-    /**
-     * Called before execution requests are sent to remote nodes or scheduled for local execution.
-     *
-     * @param fut Future.
-     */
-    public void onFutureMapped(GridStreamerStageExecutionFuture fut);
-
-    /**
-     * Called when future is completed and parent should be notified, if any.
-     *
-     * @param fut Future.
-     */
-    public void onFutureCompleted(GridStreamerStageExecutionFuture fut);
-
-    /**
-     * @return Streamer event router.
-     */
-    public StreamerEventRouter eventRouter();
-
-    /**
-     * Schedules batch executions either on local or on remote nodes.
-     *
-     * @param fut Future.
-     * @param execs Executions grouped by node ID.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void scheduleExecutions(GridStreamerStageExecutionFuture fut, Map<UUID, GridStreamerExecutionBatch> execs)
-        throws IgniteCheckedException;
-
-    /**
-     * Callback for undeployed class loaders. All deployed events will be removed from window and local storage.
-     *
-     * @param undeployedLdr Undeployed class loader.
-     */
-    public void onUndeploy(ClassLoader undeployedLdr);
-
-    /**
-     * Callback executed when streamer query completes.
-     *
-     * @param time Consumed time.
-     * @param nodes Participating nodes count.
-     */
-    public void onQueryCompleted(long time, int nodes);
-}


Mime
View raw message