ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: zk
Date Fri, 24 Nov 2017 09:56:13 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 9dbcdd1c3 -> 33e84515e


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33e84515
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33e84515
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33e84515

Branch: refs/heads/ignite-zk
Commit: 33e84515e11e72bec9f52e483dfd15b6792d907f
Parents: 9dbcdd1
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Nov 24 12:56:06 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Nov 24 12:56:06 2017 +0300

----------------------------------------------------------------------
 .../continuous/ContinuousRoutineInfo.java       |  75 ++++++++
 .../ContinuousRoutinesCommonDiscoveryData.java  |  36 ++++
 .../continuous/ContinuousRoutinesInfo.java      |  69 +++++++
 ...tinuousRoutinesJoiningNodeDiscoveryData.java |  36 ++++
 .../continuous/GridContinuousProcessor.java     | 178 ++++++++++++++++++-
 5 files changed, 388 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/33e84515/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java
new file mode 100644
index 0000000..fb56505
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ *
+ */
+class ContinuousRoutineInfo implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final UUID srcNodeId;
+
+    /** */
+    final UUID routineId;
+
+    /** */
+    final byte[] hnd;
+
+    /** */
+    final byte[] nodeFilter;
+
+    /** */
+    final int bufSize;
+
+    /** */
+    final long interval;
+
+    /** */
+    final boolean autoUnsubscribe;
+
+    /**
+     * @param hnd
+     * @param nodeFilter
+     * @param bufSize
+     * @param interval
+     * @param autoUnsubscribe
+     */
+    ContinuousRoutineInfo(
+        UUID srcNodeId,
+        UUID routineId,
+        byte[] hnd,
+        byte[] nodeFilter,
+        int bufSize,
+        long interval,
+        boolean autoUnsubscribe)
+    {
+        this.srcNodeId = srcNodeId;
+        this.routineId = routineId;
+        this.hnd = hnd;
+        this.nodeFilter = nodeFilter;
+        this.bufSize = bufSize;
+        this.interval = interval;
+        this.autoUnsubscribe = autoUnsubscribe;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33e84515/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java
new file mode 100644
index 0000000..6fd62ba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ *
+ */
+public class ContinuousRoutinesCommonDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final List<ContinuousRoutineInfo> startedRoutines;
+
+    ContinuousRoutinesCommonDiscoveryData(List<ContinuousRoutineInfo> startedRoutines)
{
+        this.startedRoutines = startedRoutines;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33e84515/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
new file mode 100644
index 0000000..0d9ee44
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;
+
+/**
+ *
+ */
+class ContinuousRoutinesInfo {
+    /** */
+    private Map<UUID, ContinuousRoutineInfo> startedRoutines = new HashMap<>();
+
+    void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal()))
+            dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(),
+                new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values())));
+    }
+
+    void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(),
+            new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values())));
+    }
+
+    void addRoutineInfo(ContinuousRoutineInfo info) {
+        startedRoutines.put(info.routineId, info);
+    }
+
+    boolean routineExists(UUID routineId) {
+        return startedRoutines.containsKey(routineId);
+    }
+
+    void removeRoutine(UUID routineId) {
+        startedRoutines.remove(routineId);
+    }
+
+    void removeNodeRoutines(UUID nodeId) {
+        for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator();
it.hasNext();) {
+            Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
+
+            ContinuousRoutineInfo info = e.getValue();
+
+            if (info.autoUnsubscribe && info.srcNodeId.equals(nodeId))
+                it.remove();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33e84515/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java
new file mode 100644
index 0000000..d41e671
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ *
+ */
+public class ContinuousRoutinesJoiningNodeDiscoveryData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final List<ContinuousRoutineInfo> startedRoutines;
+
+    ContinuousRoutinesJoiningNodeDiscoveryData(List<ContinuousRoutineInfo> startedRoutines)
{
+        this.startedRoutines = startedRoutines;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/33e84515/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index fa52be2..2650659 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -82,6 +82,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -146,6 +147,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** Query sequence number for message topic. */
     private final AtomicLong seq = new AtomicLong();
 
+    /** */
+    private ContinuousRoutinesInfo routinesInfo;
+
+    /** */
+    private int discoProtoVer;
+
     /**
      * @param ctx Kernal context.
      */
@@ -158,6 +165,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
+        discoProtoVer = ctx.config().getDiscoverySpi() instanceof TcpDiscoverySpi ? 1 : 2;
+
+        if (discoProtoVer == 2)
+            routinesInfo = new ContinuousRoutinesInfo();
+
         retryDelay = ctx.config().getNetworkSendRetryDelay();
         retryCnt = ctx.config().getNetworkSendRetryCount();
 
@@ -176,6 +188,24 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion topVer,
                     ClusterNode snd,
                     StartRoutineDiscoveryMessage msg) {
+                    // TODO ZK
+                    if (discoProtoVer == 2) {
+                        StartRequestData reqData = msg.startRequestData();
+
+                        try {
+                            routinesInfo.addRoutineInfo(createRoutineInfo(snd.id(),
+                                msg.routineId(),
+                                reqData.handler(),
+                                reqData.projectionPredicate(),
+                                reqData.bufferSize(),
+                                reqData.interval(),
+                                reqData.autoUnsubscribe()));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to register continuous handler information:
" + e);
+                        }
+                    }
+
                     if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping())
                         processStartRequest(snd, msg);
                 }
@@ -227,6 +257,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion topVer,
                     ClusterNode snd,
                     StopRoutineDiscoveryMessage msg) {
+                    if (discoProtoVer == 2)
+                        routinesInfo.removeRoutine(msg.routineId);
+
                     if (!snd.id().equals(ctx.localNodeId())) {
                         UUID routineId = msg.routineId();
 
@@ -371,6 +404,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        if (discoProtoVer == 2) {
+            routinesInfo.collectJoiningNodeData(dataBag);
+
+            return;
+        }
+
         Serializable data = getDiscoveryData(dataBag.joiningNodeId());
 
         if (data != null)
@@ -379,6 +418,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (discoProtoVer == 2) {
+            routinesInfo.collectGridNodeData(dataBag);
+
+            return;
+        }
+
         Serializable data = getDiscoveryData(dataBag.joiningNodeId());
 
         if (data != null)
@@ -465,17 +510,101 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                     ']');
         }
 
-        if (data.hasJoiningNodeData())
-            onDiscoDataReceived((DiscoveryData) data.joiningNodeData());
+        if (discoProtoVer == 2) {
+            if (data.hasJoiningNodeData())    {
+                ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData)
+                    data.joiningNodeData();
+
+                for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) {
+                    routinesInfo.addRoutineInfo(routineInfo);
+
+                    startRoutine(routineInfo);
+                }
+            }
+        }
+        else {
+            if (data.hasJoiningNodeData())
+                onDiscoDataReceived((DiscoveryData) data.joiningNodeData());
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void onGridDataReceived(GridDiscoveryData data) {
-        Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
+        if (discoProtoVer == 2) {
+            if (data.commonData() != null) {
+                ContinuousRoutinesCommonDiscoveryData commonData =
+                    (ContinuousRoutinesCommonDiscoveryData)data.commonData();
+
+                for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) {
+                    if (routinesInfo.routineExists(routineInfo.routineId))
+                        continue;
+
+                    routinesInfo.addRoutineInfo(routineInfo);
+
+                    startRoutine(routineInfo);
+                }
+            }
+        }
+        else {
+            Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
 
-        if (nodeSpecData != null) {
-            for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet())
-                onDiscoDataReceived((DiscoveryData) e.getValue());
+            if (nodeSpecData != null) {
+                for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet())
+                    onDiscoDataReceived((DiscoveryData) e.getValue());
+            }
+        }
+    }
+
+    /**
+     * @param routineInfo Routine info.
+     */
+    private void startRoutine(ContinuousRoutineInfo routineInfo) {
+        IgnitePredicate<ClusterNode> nodeFilter = null;
+
+        try {
+            if (routineInfo.nodeFilter != null) {
+                nodeFilter = U.unmarshal(marsh, routineInfo.nodeFilter, U.resolveClassLoader(ctx.config()));
+
+                ctx.resource().injectGeneric(nodeFilter);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to unmarshal continuous routine filter, ignore routine [routineId="
+ routineInfo.routineId +
+                    ", srcNodeId=" + routineInfo.srcNodeId + ']',
+                e);
+
+            return;
+        }
+
+        if (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())) {
+            GridContinuousHandler hnd;
+
+            try {
+                hnd = U.unmarshal(marsh, routineInfo.hnd, U.resolveClassLoader(ctx.config()));
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to unmarshal continuous routine handler, ignore routine
[" +
+                        "routineId=" + routineInfo.routineId +
+                        ", srcNodeId=" + routineInfo.srcNodeId + ']',
+                    e);
+
+                return;
+            }
+
+            try {
+                registerHandler(routineInfo.srcNodeId,
+                    routineInfo.routineId,
+                    hnd,
+                    routineInfo.bufSize,
+                    routineInfo.interval,
+                    routineInfo.autoUnsubscribe, false);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to register continuous routine handler, ignore routine
[" +
+                        "routineId=" + routineInfo.routineId +
+                        ", srcNodeId=" + routineInfo.srcNodeId + ']',
+                    e);
+            }
         }
     }
 
@@ -620,6 +749,17 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, 1, 0, true);
 
+        if (discoProtoVer == 2) {
+            routinesInfo.addRoutineInfo(createRoutineInfo(
+                ctx.localNodeId(),
+                routineId,
+                hnd,
+                prjPred,
+                routineInfo.bufSize,
+                routineInfo.interval,
+                routineInfo.autoUnsubscribe));
+        }
+
         locInfos.put(routineId, routineInfo);
 
         registerMessageListener(hnd);
@@ -627,6 +767,29 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         return routineId;
     }
 
+    private ContinuousRoutineInfo createRoutineInfo(
+        UUID srcNodeId,
+        UUID routineId,
+        GridContinuousHandler hnd,
+        @Nullable IgnitePredicate<ClusterNode> nodeFilter,
+        int bufSize,
+        long interval,
+        boolean autoUnsubscribe)
+        throws IgniteCheckedException {
+        byte[] hndBytes = marsh.marshal(hnd);
+
+        byte[] filterBytes = nodeFilter != null ? marsh.marshal(nodeFilter) : null;
+
+        return new ContinuousRoutineInfo(
+            srcNodeId,
+            routineId,
+            hndBytes,
+            filterBytes,
+            bufSize,
+            interval,
+            autoUnsubscribe);
+    }
+
     /**
      * @param hnd Handler.
      * @param bufSize Buffer size.
@@ -1399,6 +1562,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
             UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
+            if (discoProtoVer == 2)
+                routinesInfo.removeNodeRoutines(nodeId);
+
             clientInfos.remove(nodeId);
 
             // Unregister handlers created by left node.


Mime
View raw message