Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C9658200D49 for ; Fri, 24 Nov 2017 10:56:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C7BED160BF2; Fri, 24 Nov 2017 09:56:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 96DDB160BEE for ; Fri, 24 Nov 2017 10:56:14 +0100 (CET) Received: (qmail 64775 invoked by uid 500); 24 Nov 2017 09:56:13 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 64766 invoked by uid 99); 24 Nov 2017 09:56:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Nov 2017 09:56:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AFADDDFD78; Fri, 24 Nov 2017 09:56:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <0048865c8bb84de584e0c4cc67375800@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: zk Date: Fri, 24 Nov 2017 09:56:13 +0000 (UTC) archived-at: Fri, 24 Nov 2017 09:56:16 -0000 Repository: ignite Updated Branches: refs/heads/ignite-zk 9dbcdd1c3 -> 33e84515e zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33e84515 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33e84515 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33e84515 Branch: refs/heads/ignite-zk Commit: 33e84515e11e72bec9f52e483dfd15b6792d907f Parents: 9dbcdd1 Author: sboikov Authored: Fri Nov 24 12:56:06 2017 +0300 Committer: sboikov Committed: Fri Nov 24 12:56:06 2017 +0300 ---------------------------------------------------------------------- .../continuous/ContinuousRoutineInfo.java | 75 ++++++++ .../ContinuousRoutinesCommonDiscoveryData.java | 36 ++++ .../continuous/ContinuousRoutinesInfo.java | 69 +++++++ ...tinuousRoutinesJoiningNodeDiscoveryData.java | 36 ++++ .../continuous/GridContinuousProcessor.java | 178 ++++++++++++++++++- 5 files changed, 388 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/33e84515/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java new file mode 100644 index 0000000..fb56505 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutineInfo.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.io.Serializable; +import java.util.UUID; + +/** + * + */ +class ContinuousRoutineInfo implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final UUID srcNodeId; + + /** */ + final UUID routineId; + + /** */ + final byte[] hnd; + + /** */ + final byte[] nodeFilter; + + /** */ + final int bufSize; + + /** */ + final long interval; + + /** */ + final boolean autoUnsubscribe; + + /** + * @param hnd + * @param nodeFilter + * @param bufSize + * @param interval + * @param autoUnsubscribe + */ + ContinuousRoutineInfo( + UUID srcNodeId, + UUID routineId, + byte[] hnd, + byte[] nodeFilter, + int bufSize, + long interval, + boolean autoUnsubscribe) + { + this.srcNodeId = srcNodeId; + this.routineId = routineId; + this.hnd = hnd; + this.nodeFilter = nodeFilter; + this.bufSize = bufSize; + this.interval = interval; + this.autoUnsubscribe = autoUnsubscribe; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/33e84515/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java new file mode 100644 index 0000000..6fd62ba --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesCommonDiscoveryData.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.io.Serializable; +import java.util.List; + +/** + * + */ +public class ContinuousRoutinesCommonDiscoveryData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final List startedRoutines; + + ContinuousRoutinesCommonDiscoveryData(List startedRoutines) { + this.startedRoutines = startedRoutines; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/33e84515/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java new file mode 100644 index 0000000..0d9ee44 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.spi.discovery.DiscoveryDataBag; + +import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC; + +/** + * + */ +class ContinuousRoutinesInfo { + /** */ + private Map startedRoutines = new HashMap<>(); + + void collectGridNodeData(DiscoveryDataBag dataBag) { + if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal())) + dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(), + new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values()))); + } + + void collectJoiningNodeData(DiscoveryDataBag dataBag) { + dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), + new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values()))); + } + + void addRoutineInfo(ContinuousRoutineInfo info) { + startedRoutines.put(info.routineId, info); + } + + boolean routineExists(UUID routineId) { + return startedRoutines.containsKey(routineId); + } + + void removeRoutine(UUID routineId) { + startedRoutines.remove(routineId); + } + + void removeNodeRoutines(UUID nodeId) { + for (Iterator> it = startedRoutines.entrySet().iterator(); it.hasNext();) { + Map.Entry e = it.next(); + + ContinuousRoutineInfo info = e.getValue(); + + if (info.autoUnsubscribe && info.srcNodeId.equals(nodeId)) + it.remove(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/33e84515/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java new file mode 100644 index 0000000..d41e671 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesJoiningNodeDiscoveryData.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.io.Serializable; +import java.util.List; + +/** + * + */ +public class ContinuousRoutinesJoiningNodeDiscoveryData implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + final List startedRoutines; + + ContinuousRoutinesJoiningNodeDiscoveryData(List startedRoutines) { + this.startedRoutines = startedRoutines; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/33e84515/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index fa52be2..2650659 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -82,6 +82,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -146,6 +147,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Query sequence number for message topic. */ private final AtomicLong seq = new AtomicLong(); + /** */ + private ContinuousRoutinesInfo routinesInfo; + + /** */ + private int discoProtoVer; + /** * @param ctx Kernal context. */ @@ -158,6 +165,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (ctx.config().isDaemon()) return; + discoProtoVer = ctx.config().getDiscoverySpi() instanceof TcpDiscoverySpi ? 1 : 2; + + if (discoProtoVer == 2) + routinesInfo = new ContinuousRoutinesInfo(); + retryDelay = ctx.config().getNetworkSendRetryDelay(); retryCnt = ctx.config().getNetworkSendRetryCount(); @@ -176,6 +188,24 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StartRoutineDiscoveryMessage msg) { + // TODO ZK + if (discoProtoVer == 2) { + StartRequestData reqData = msg.startRequestData(); + + try { + routinesInfo.addRoutineInfo(createRoutineInfo(snd.id(), + msg.routineId(), + reqData.handler(), + reqData.projectionPredicate(), + reqData.bufferSize(), + reqData.interval(), + reqData.autoUnsubscribe())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to register continuous handler information: " + e); + } + } + if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping()) processStartRequest(snd, msg); } @@ -227,6 +257,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StopRoutineDiscoveryMessage msg) { + if (discoProtoVer == 2) + routinesInfo.removeRoutine(msg.routineId); + if (!snd.id().equals(ctx.localNodeId())) { UUID routineId = msg.routineId(); @@ -371,6 +404,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { + if (discoProtoVer == 2) { + routinesInfo.collectJoiningNodeData(dataBag); + + return; + } + Serializable data = getDiscoveryData(dataBag.joiningNodeId()); if (data != null) @@ -379,6 +418,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { + if (discoProtoVer == 2) { + routinesInfo.collectGridNodeData(dataBag); + + return; + } + Serializable data = getDiscoveryData(dataBag.joiningNodeId()); if (data != null) @@ -465,17 +510,101 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ']'); } - if (data.hasJoiningNodeData()) - onDiscoDataReceived((DiscoveryData) data.joiningNodeData()); + if (discoProtoVer == 2) { + if (data.hasJoiningNodeData()) { + ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) + data.joiningNodeData(); + + for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { + routinesInfo.addRoutineInfo(routineInfo); + + startRoutine(routineInfo); + } + } + } + else { + if (data.hasJoiningNodeData()) + onDiscoDataReceived((DiscoveryData) data.joiningNodeData()); + } } /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecData = data.nodeSpecificData(); + if (discoProtoVer == 2) { + if (data.commonData() != null) { + ContinuousRoutinesCommonDiscoveryData commonData = + (ContinuousRoutinesCommonDiscoveryData)data.commonData(); + + for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { + if (routinesInfo.routineExists(routineInfo.routineId)) + continue; + + routinesInfo.addRoutineInfo(routineInfo); + + startRoutine(routineInfo); + } + } + } + else { + Map nodeSpecData = data.nodeSpecificData(); - if (nodeSpecData != null) { - for (Map.Entry e : nodeSpecData.entrySet()) - onDiscoDataReceived((DiscoveryData) e.getValue()); + if (nodeSpecData != null) { + for (Map.Entry e : nodeSpecData.entrySet()) + onDiscoDataReceived((DiscoveryData) e.getValue()); + } + } + } + + /** + * @param routineInfo Routine info. + */ + private void startRoutine(ContinuousRoutineInfo routineInfo) { + IgnitePredicate nodeFilter = null; + + try { + if (routineInfo.nodeFilter != null) { + nodeFilter = U.unmarshal(marsh, routineInfo.nodeFilter, U.resolveClassLoader(ctx.config())); + + ctx.resource().injectGeneric(nodeFilter); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal continuous routine filter, ignore routine [routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', + e); + + return; + } + + if (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())) { + GridContinuousHandler hnd; + + try { + hnd = U.unmarshal(marsh, routineInfo.hnd, U.resolveClassLoader(ctx.config())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal continuous routine handler, ignore routine [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', + e); + + return; + } + + try { + registerHandler(routineInfo.srcNodeId, + routineInfo.routineId, + hnd, + routineInfo.bufSize, + routineInfo.interval, + routineInfo.autoUnsubscribe, false); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to register continuous routine handler, ignore routine [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', + e); + } } } @@ -620,6 +749,17 @@ public class GridContinuousProcessor extends GridProcessorAdapter { LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, 1, 0, true); + if (discoProtoVer == 2) { + routinesInfo.addRoutineInfo(createRoutineInfo( + ctx.localNodeId(), + routineId, + hnd, + prjPred, + routineInfo.bufSize, + routineInfo.interval, + routineInfo.autoUnsubscribe)); + } + locInfos.put(routineId, routineInfo); registerMessageListener(hnd); @@ -627,6 +767,29 @@ public class GridContinuousProcessor extends GridProcessorAdapter { return routineId; } + private ContinuousRoutineInfo createRoutineInfo( + UUID srcNodeId, + UUID routineId, + GridContinuousHandler hnd, + @Nullable IgnitePredicate nodeFilter, + int bufSize, + long interval, + boolean autoUnsubscribe) + throws IgniteCheckedException { + byte[] hndBytes = marsh.marshal(hnd); + + byte[] filterBytes = nodeFilter != null ? marsh.marshal(nodeFilter) : null; + + return new ContinuousRoutineInfo( + srcNodeId, + routineId, + hndBytes, + filterBytes, + bufSize, + interval, + autoUnsubscribe); + } + /** * @param hnd Handler. * @param bufSize Buffer size. @@ -1399,6 +1562,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); + if (discoProtoVer == 2) + routinesInfo.removeNodeRoutines(nodeId); + clientInfos.remove(nodeId); // Unregister handlers created by left node.