zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b37c35f1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b37c35f1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b37c35f1
Branch: refs/heads/ignite-zk-alpha
Commit: b37c35f118f7838ab64ac7fb7d926d25b92fcab7
Parents: 7f05c09
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Dec 8 14:53:34 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Dec 8 14:53:34 2017 +0300
----------------------------------------------------------------------
.../continuous/ContinuousRoutinesInfo.java | 71 +++++++++++---------
1 file changed, 39 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b37c35f1/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
index e46887b..8061b55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
@@ -32,35 +32,41 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType
*/
class ContinuousRoutinesInfo {
/** */
- private Map<UUID, ContinuousRoutineInfo> startedRoutines = new HashMap<>();
+ private final Map<UUID, ContinuousRoutineInfo> startedRoutines = new HashMap<>();
/**
* @param dataBag Discovery data bag.
*/
void collectGridNodeData(DiscoveryDataBag dataBag) {
- if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal()))
- dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(),
- new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values())));
+ synchronized (startedRoutines) {
+ if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal()))
+ dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(),
+ new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values())));
+ }
}
/**
* @param dataBag Discovery data bag.
*/
void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- for (ContinuousRoutineInfo info : startedRoutines.values()) {
- if (info.disconnected)
- info.sourceNodeId(dataBag.joiningNodeId());
+ synchronized (startedRoutines) {
+ for (ContinuousRoutineInfo info : startedRoutines.values()) {
+ if (info.disconnected)
+ info.sourceNodeId(dataBag.joiningNodeId());
+ }
+
+ dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(),
+ new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values())));
}
-
- dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(),
- new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values())));
}
/**
* @param info Routine info.
*/
void addRoutineInfo(ContinuousRoutineInfo info) {
- startedRoutines.put(info.routineId, info);
+ synchronized (startedRoutines) {
+ startedRoutines.put(info.routineId, info);
+ }
}
/**
@@ -68,29 +74,35 @@ class ContinuousRoutinesInfo {
* @return {@code True} if routine exists.
*/
boolean routineExists(UUID routineId) {
- return startedRoutines.containsKey(routineId);
+ synchronized (startedRoutines) {
+ return startedRoutines.containsKey(routineId);
+ }
}
/**
* @param routineId Routine ID.
*/
void removeRoutine(UUID routineId) {
- startedRoutines.remove(routineId);
+ synchronized (startedRoutines) {
+ startedRoutines.remove(routineId);
+ }
}
/**
* @param locRoutines Routines IDs which can survive reconnect.
*/
void onClientDisconnected(Collection<UUID> locRoutines) {
- for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator();
it.hasNext();) {
- Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
+ synchronized (startedRoutines) {
+ for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator();
it.hasNext();) {
+ Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
- ContinuousRoutineInfo info = e.getValue();
+ ContinuousRoutineInfo info = e.getValue();
- if (!locRoutines.contains(info.routineId))
- it.remove();
- else
- info.onDisconnected();
+ if (!locRoutines.contains(info.routineId))
+ it.remove();
+ else
+ info.onDisconnected();
+ }
}
}
@@ -100,20 +112,15 @@ class ContinuousRoutinesInfo {
* @param nodeId Node ID.
*/
void onNodeFail(UUID nodeId) {
- for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator();
it.hasNext();) {
- Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
+ synchronized (startedRoutines) {
+ for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator();
it.hasNext();) {
+ Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
- ContinuousRoutineInfo info = e.getValue();
+ ContinuousRoutineInfo info = e.getValue();
- if (info.autoUnsubscribe && info.srcNodeId.equals(nodeId))
- it.remove();
+ if (info.autoUnsubscribe && info.srcNodeId.equals(nodeId))
+ it.remove();
+ }
}
}
-
- /**
- *
- */
- void clear() {
- startedRoutines.clear();
- }
}
|