ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [4/9] ignite git commit: zk
Date Fri, 08 Dec 2017 13:43:07 GMT
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b37c35f1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b37c35f1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b37c35f1

Branch: refs/heads/ignite-zk-alpha
Commit: b37c35f118f7838ab64ac7fb7d926d25b92fcab7
Parents: 7f05c09
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Dec 8 14:53:34 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Dec 8 14:53:34 2017 +0300

----------------------------------------------------------------------
 .../continuous/ContinuousRoutinesInfo.java      | 71 +++++++++++---------
 1 file changed, 39 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b37c35f1/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
index e46887b..8061b55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
@@ -32,35 +32,41 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType
  */
 class ContinuousRoutinesInfo {
     /** */
-    private Map<UUID, ContinuousRoutineInfo> startedRoutines = new HashMap<>();
+    private final Map<UUID, ContinuousRoutineInfo> startedRoutines = new HashMap<>();
 
     /**
      * @param dataBag Discovery data bag.
      */
     void collectGridNodeData(DiscoveryDataBag dataBag) {
-        if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal()))
-            dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(),
-                new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values())));
+        synchronized (startedRoutines) {
+            if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal()))
+                dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(),
+                    new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values())));
+        }
     }
 
     /**
      * @param dataBag Discovery data bag.
      */
     void collectJoiningNodeData(DiscoveryDataBag dataBag) {
-        for (ContinuousRoutineInfo info : startedRoutines.values()) {
-            if (info.disconnected)
-                info.sourceNodeId(dataBag.joiningNodeId());
+        synchronized (startedRoutines) {
+            for (ContinuousRoutineInfo info : startedRoutines.values()) {
+                if (info.disconnected)
+                    info.sourceNodeId(dataBag.joiningNodeId());
+            }
+
+            dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(),
+                new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values())));
         }
-
-        dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(),
-            new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values())));
     }
 
     /**
      * @param info Routine info.
      */
     void addRoutineInfo(ContinuousRoutineInfo info) {
-        startedRoutines.put(info.routineId, info);
+        synchronized (startedRoutines) {
+            startedRoutines.put(info.routineId, info);
+        }
     }
 
     /**
@@ -68,29 +74,35 @@ class ContinuousRoutinesInfo {
      * @return {@code True} if routine exists.
      */
     boolean routineExists(UUID routineId) {
-        return startedRoutines.containsKey(routineId);
+        synchronized (startedRoutines) {
+            return startedRoutines.containsKey(routineId);
+        }
     }
 
     /**
      * @param routineId Routine ID.
      */
     void removeRoutine(UUID routineId) {
-        startedRoutines.remove(routineId);
+        synchronized (startedRoutines) {
+            startedRoutines.remove(routineId);
+        }
     }
 
     /**
      * @param locRoutines Routines IDs which can survive reconnect.
      */
     void onClientDisconnected(Collection<UUID> locRoutines) {
-        for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator();
it.hasNext();) {
-            Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
+        synchronized (startedRoutines) {
+            for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator();
it.hasNext();) {
+                Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
 
-            ContinuousRoutineInfo info = e.getValue();
+                ContinuousRoutineInfo info = e.getValue();
 
-            if (!locRoutines.contains(info.routineId))
-                it.remove();
-            else
-                info.onDisconnected();
+                if (!locRoutines.contains(info.routineId))
+                    it.remove();
+                else
+                    info.onDisconnected();
+            }
         }
     }
 
@@ -100,20 +112,15 @@ class ContinuousRoutinesInfo {
      * @param nodeId Node ID.
      */
     void onNodeFail(UUID nodeId) {
-        for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator();
it.hasNext();) {
-            Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
+        synchronized (startedRoutines) {
+            for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator();
it.hasNext();) {
+                Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
 
-            ContinuousRoutineInfo info = e.getValue();
+                ContinuousRoutineInfo info = e.getValue();
 
-            if (info.autoUnsubscribe && info.srcNodeId.equals(nodeId))
-                it.remove();
+                if (info.autoUnsubscribe && info.srcNodeId.equals(nodeId))
+                    it.remove();
+            }
         }
     }
-
-    /**
-     *
-     */
-    void clear() {
-        startedRoutines.clear();
-    }
 }


Mime
View raw message