ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [10/13] ignite git commit: zk
Date Wed, 29 Nov 2017 08:43:37 GMT
zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ee69f50
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ee69f50
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ee69f50

Branch: refs/heads/ignite-zk
Commit: 8ee69f5017912caba0ef0b249a39fc525d6b7a65
Parents: 287b717
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Nov 29 11:21:19 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Nov 29 11:21:19 2017 +0300

----------------------------------------------------------------------
 .../continuous/ContinuousRoutinesInfo.java      |  30 +++-
 .../continuous/GridContinuousProcessor.java     | 152 +++++++++++++----
 .../continuous/StartRequestDataV2.java          | 164 +++++++++++++++++++
 .../StartRoutineDiscoveryMessageV2.java         |  77 +++++++++
 .../discovery/zk/internal/ZkIgnitePaths.java    |  10 +-
 .../IgniteCacheEntryListenerAtomicTest.java     |   2 +-
 .../zk/internal/ZookeeperClientTest.java        |  35 ++++
 .../ZookeeperDiscoverySpiBasicTest.java         |  22 +++
 8 files changed, 454 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
index 0d9ee44..8977b15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/ContinuousRoutinesInfo.java
@@ -33,30 +33,51 @@ class ContinuousRoutinesInfo {
     /** */
     private Map<UUID, ContinuousRoutineInfo> startedRoutines = new HashMap<>();
 
+    /**
+     * @param dataBag Discovery data bag.
+     */
     void collectGridNodeData(DiscoveryDataBag dataBag) {
         if (!dataBag.commonDataCollectedFor(CONTINUOUS_PROC.ordinal()))
             dataBag.addGridCommonData(CONTINUOUS_PROC.ordinal(),
                 new ContinuousRoutinesCommonDiscoveryData(new ArrayList<>(startedRoutines.values())));
     }
 
+    /**
+     * @param dataBag Discovery data bag.
+     */
     void collectJoiningNodeData(DiscoveryDataBag dataBag) {
         dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(),
             new ContinuousRoutinesJoiningNodeDiscoveryData(new ArrayList<>(startedRoutines.values())));
     }
 
+    /**
+     * @param info Routine info.
+     */
     void addRoutineInfo(ContinuousRoutineInfo info) {
         startedRoutines.put(info.routineId, info);
     }
 
+    /**
+     * @param routineId Routine ID.
+     * @return {@code True} if routine exists.
+     */
     boolean routineExists(UUID routineId) {
         return startedRoutines.containsKey(routineId);
     }
 
+    /**
+     * @param routineId Routine ID.
+     */
     void removeRoutine(UUID routineId) {
         startedRoutines.remove(routineId);
     }
 
-    void removeNodeRoutines(UUID nodeId) {
+    /**
+     * Removes all routines with autoUnsubscribe=false started by given node.
+     *
+     * @param nodeId Node ID.
+     */
+    void onNodeFail(UUID nodeId) {
         for (Iterator<Map.Entry<UUID, ContinuousRoutineInfo>> it = startedRoutines.entrySet().iterator();
it.hasNext();) {
             Map.Entry<UUID, ContinuousRoutineInfo> e = it.next();
 
@@ -66,4 +87,11 @@ class ContinuousRoutinesInfo {
                 it.remove();
         }
     }
+
+    /**
+     *
+     */
+    void clear() {
+        startedRoutines.clear();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 2650659..bd9818a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -188,29 +188,27 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion topVer,
                     ClusterNode snd,
                     StartRoutineDiscoveryMessage msg) {
-                    // TODO ZK
-                    if (discoProtoVer == 2) {
-                        StartRequestData reqData = msg.startRequestData();
-
-                        try {
-                            routinesInfo.addRoutineInfo(createRoutineInfo(snd.id(),
-                                msg.routineId(),
-                                reqData.handler(),
-                                reqData.projectionPredicate(),
-                                reqData.bufferSize(),
-                                reqData.interval(),
-                                reqData.autoUnsubscribe()));
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to register continuous handler information:
" + e);
-                        }
-                    }
+                    assert discoProtoVer == 1 : discoProtoVer;
 
                     if (!snd.id().equals(ctx.localNodeId()) && !ctx.isStopping())
                         processStartRequest(snd, msg);
                 }
             });
 
+        ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class,
+            new CustomEventListener<StartRoutineDiscoveryMessageV2>() {
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer,
+                    ClusterNode snd,
+                    StartRoutineDiscoveryMessageV2 msg) {
+                    assert discoProtoVer == 2 : discoProtoVer;
+
+                    if (ctx.isStopping())
+                        return;
+
+                    processStartRequestV2(snd, msg);
+                }
+            });
+
         ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class,
             new CustomEventListener<StartRoutineAckDiscoveryMessage>() {
                 @Override public void onCustomEvent(AffinityTopologyVersion topVer,
@@ -505,9 +503,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
         if (log.isDebugEnabled()) {
             log.info("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() +
-                    ", loc=" + ctx.localNodeId() +
-                    ", data=" + data.joiningNodeData() +
-                    ']');
+                ", loc=" + ctx.localNodeId() +
+                ", data=" + data.joiningNodeData() +
+                ']');
         }
 
         if (discoProtoVer == 2) {
@@ -569,9 +567,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
         }
         catch (IgniteCheckedException e) {
-            U.error(log, "Failed to unmarshal continuous routine filter, ignore routine [routineId="
+ routineInfo.routineId +
-                    ", srcNodeId=" + routineInfo.srcNodeId + ']',
-                e);
+            U.error(log, "Failed to unmarshal continuous routine filter, ignore routine ["
+
+                "routineId=" + routineInfo.routineId +
+                ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
 
             return;
         }
@@ -584,9 +582,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to unmarshal continuous routine handler, ignore routine
[" +
-                        "routineId=" + routineInfo.routineId +
-                        ", srcNodeId=" + routineInfo.srcNodeId + ']',
-                    e);
+                    "routineId=" + routineInfo.routineId +
+                    ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
 
                 return;
             }
@@ -597,13 +594,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                     hnd,
                     routineInfo.bufSize,
                     routineInfo.interval,
-                    routineInfo.autoUnsubscribe, false);
+                    routineInfo.autoUnsubscribe,
+                    false);
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to register continuous routine handler, ignore routine
[" +
-                        "routineId=" + routineInfo.routineId +
-                        ", srcNodeId=" + routineInfo.srcNodeId + ']',
-                    e);
+                    "routineId=" + routineInfo.routineId +
+                    ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
             }
         }
     }
@@ -1139,6 +1136,99 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param snd Sender.
+     * @param msg Start request.
+     */
+    private void processStartRequestV2(ClusterNode snd, StartRoutineDiscoveryMessageV2 msg)
{
+        StartRequestDataV2 reqData = msg.startRequestData();
+
+        ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(),
+            msg.routineId(),
+            reqData.handlerBytes(),
+            reqData.nodeFilterBytes(),
+            reqData.bufferSize(),
+            reqData.interval(),
+            reqData.autoUnsubscribe());
+
+        routinesInfo.addRoutineInfo(routineInfo);
+
+        Exception err = null;
+
+        IgnitePredicate<ClusterNode> nodeFilter = null;
+
+        if (reqData.nodeFilterBytes() != null) {
+            try {
+                if (ctx.config().isPeerClassLoadingEnabled() && reqData.className()
!= null) {
+                    String clsName = reqData.className();
+                    GridDeploymentInfo depInfo = reqData.deploymentInfo();
+
+                    GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(),
+                        clsName,
+                        clsName,
+                        depInfo.userVersion(),
+                        snd.id(),
+                        depInfo.classLoaderId(),
+                        depInfo.participants(),
+                        null);
+
+                    if (dep == null)
+                        throw new IgniteDeploymentCheckedException("Failed to obtain deployment
for class: " + clsName);
+
+                    nodeFilter = U.unmarshal(marsh, reqData.nodeFilterBytes(), U.resolveClassLoader(dep.classLoader(),
ctx.config()));
+                }
+                else
+                    nodeFilter = U.unmarshal(marsh, reqData.nodeFilterBytes(), U.resolveClassLoader(ctx.config()));
+
+                if (nodeFilter != null)
+                    ctx.resource().injectGeneric(nodeFilter);
+            }
+            catch (Exception e) {
+                err = e;
+
+                U.error(log, "Failed to unmarshal continuous routine filter [" +
+                        "routineId=" + routineInfo.routineId +
+                        ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+            }
+        }
+
+        boolean register = err == null && (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode()));
+
+        if (register) {
+            try {
+                GridContinuousHandler hnd = U.unmarshal(marsh, reqData.handlerBytes(), U.resolveClassLoader(ctx.config()));
+
+                if (ctx.config().isPeerClassLoadingEnabled())
+                    hnd.p2pUnmarshal(snd.id(), ctx);
+
+                if (msg.keepBinary()) {
+                    assert hnd instanceof CacheContinuousQueryHandler : hnd;
+
+                    ((CacheContinuousQueryHandler)hnd).keepBinary(true);
+                }
+
+                GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ?
+                    new GridMessageListenHandler((GridMessageListenHandler)hnd) :
+                    hnd;
+
+                registerHandler(snd.id(),
+                    msg.routineId,
+                    hnd0,
+                    reqData.bufferSize(),
+                    reqData.interval(),
+                    reqData.autoUnsubscribe(),
+                    false);
+            }
+            catch (Exception e) {
+                err = e;
+
+                U.error(log, "Failed to register continuous routine handler [" +
+                    "routineId=" + routineInfo.routineId +
+                    ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+            }
+        }
+    }
+
+    /**
      * @param node Sender.
      * @param req Start request.
      */
@@ -1563,7 +1653,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
             if (discoProtoVer == 2)
-                routinesInfo.removeNodeRoutines(nodeId);
+                routinesInfo.onNodeFail(nodeId);
 
             clientInfos.remove(nodeId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java
new file mode 100644
index 0000000..c001616
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Start request data.
+ */
+class StartRequestDataV2 implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Serialized node filter. */
+    private byte[] nodeFilterBytes;
+
+    /** Deployment class name. */
+    private String clsName;
+
+    /** Deployment info. */
+    private GridDeploymentInfo depInfo;
+
+    /** Serialized handler. */
+    private byte[] hndBytes;
+
+    /** Buffer size. */
+    private int bufSize;
+
+    /** Time interval. */
+    private long interval;
+
+    /** Automatic unsubscribe flag. */
+    private boolean autoUnsubscribe;
+
+    /**
+     * @param nodeFilterBytes Serialized node filter.
+     * @param hndBytes Serialized handler.
+     * @param bufSize Buffer size.
+     * @param interval Time interval.
+     * @param autoUnsubscribe Automatic unsubscribe flag.
+     */
+    StartRequestDataV2(
+        byte[] nodeFilterBytes,
+        byte[] hndBytes,
+        int bufSize,
+        long interval,
+        boolean autoUnsubscribe) {
+        assert hndBytes != null;
+        assert bufSize > 0;
+        assert interval >= 0;
+
+        this.nodeFilterBytes = nodeFilterBytes;
+        this.hndBytes = hndBytes;
+        this.bufSize = bufSize;
+        this.interval = interval;
+        this.autoUnsubscribe = autoUnsubscribe;
+    }
+
+    /**
+     * @return Serialized node filter.
+     */
+    public byte[] nodeFilterBytes() {
+        return nodeFilterBytes;
+    }
+
+    /**
+     * @return Deployment class name.
+     */
+    public String className() {
+        return clsName;
+    }
+
+    /**
+     * @param clsName New deployment class name.
+     */
+    public void className(String clsName) {
+        this.clsName = clsName;
+    }
+
+    /**
+     * @return Deployment info.
+     */
+    public GridDeploymentInfo deploymentInfo() {
+        return depInfo;
+    }
+
+    /**
+     * @param depInfo New deployment info.
+     */
+    public void deploymentInfo(GridDeploymentInfo depInfo) {
+        this.depInfo = depInfo;
+    }
+
+    /**
+     * @return Handler.
+     */
+    public byte[] handlerBytes() {
+        return hndBytes;
+    }
+
+    /**
+     * @return Buffer size.
+     */
+    public int bufferSize() {
+        return bufSize;
+    }
+
+    /**
+     * @param bufSize New buffer size.
+     */
+    public void bufferSize(int bufSize) {
+        this.bufSize = bufSize;
+    }
+
+    /**
+     * @return Time interval.
+     */
+    public long interval() {
+        return interval;
+    }
+
+    /**
+     * @param interval New time interval.
+     */
+    public void interval(long interval) {
+        this.interval = interval;
+    }
+
+    /**
+     * @return Automatic unsubscribe flag.
+     */
+    public boolean autoUnsubscribe() {
+        return autoUnsubscribe;
+    }
+
+    /**
+     * @param autoUnsubscribe New automatic unsubscribe flag.
+     */
+    public void autoUnsubscribe(boolean autoUnsubscribe) {
+        this.autoUnsubscribe = autoUnsubscribe;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StartRequestDataV2.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
new file mode 100644
index 0000000..e9760a8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class StartRoutineDiscoveryMessageV2 extends AbstractContinuousMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final int KEEP_BINARY_FLAG = 0x01;
+
+    /** */
+    private final StartRequestDataV2 startReqData;
+
+    /** Flags. */
+    private int flags;
+
+    /**
+     * @param routineId Routine id.
+     * @param startReqData Start request data.
+     * @param keepBinary Keep binary flag.
+     */
+    public StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2 startReqData,
boolean keepBinary) {
+        super(routineId);
+
+        this.startReqData = startReqData;
+
+        if (keepBinary)
+            flags |= KEEP_BINARY_FLAG;
+    }
+
+    /**
+     * @return Start request data.
+     */
+    public StartRequestDataV2 startRequestData() {
+        return startReqData;
+    }
+
+    /**
+     * @return {@code True} if keep binary flag was set on continuous handler.
+     */
+    public boolean keepBinary() {
+        return (flags & KEEP_BINARY_FLAG) != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StartRoutineDiscoveryMessageV2.class, this, "routineId", routineId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 30138e5..535df93 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -27,19 +27,19 @@ class ZkIgnitePaths {
     private static final int UUID_LEN = 36;
 
     /** */
-    private static final String JOIN_DATA_DIR = "joinData";
+    private static final String JOIN_DATA_DIR = "jd";
 
     /** */
-    private static final String CUSTOM_EVTS_DIR = "customEvts";
+    private static final String CUSTOM_EVTS_DIR = "c";
 
     /** */
-    private static final String CUSTOM_EVTS_ACKS_DIR = "customEvtsAcks";
+    private static final String CUSTOM_EVTS_ACKS_DIR = "ca";
 
     /** */
-    private static final String ALIVE_NODES_DIR = "alive";
+    private static final String ALIVE_NODES_DIR = "n";
 
     /** */
-    private static final String DISCO_EVENTS_PATH = "events";
+    private static final String DISCO_EVENTS_PATH = "e";
 
     /** */
     final String basePath;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java
index cddb446..d7d97a4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicTest.java
@@ -30,7 +30,7 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 public class IgniteCacheEntryListenerAtomicTest extends IgniteCacheEntryListenerAbstractTest
{
     /** {@inheritDoc} */
     @Override protected int gridCount() {
-        return 3;
+        return 1;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
index 6330595..ec495cf 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
@@ -34,6 +34,10 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
 /**
@@ -50,6 +54,37 @@ public class ZookeeperClientTest extends GridCommonAbstractTest {
         super.afterTest();
     }
 
+//    /**
+//     * @throws Exception If failed.
+//     */
+//    public void testSaveLargeValue() throws Exception {
+//        startZK(1);
+//
+//        final ZookeeperClient client = new ZookeeperClient(log, zkCluster.getConnectString(),
3000, null);
+//
+//        ZooKeeper zk = client.zk();
+//
+//        int s = 1048526 + 1;
+//        // 1048517 11 1048528
+//        // 1048519 9 1048528
+//        // 1048520 8 1048528
+//
+//        String path = "/aaaaaaa";
+//
+//        while (true) {
+//            try {
+//                zk.create(path, new byte[s], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+//
+//                info("Created: " + s + " " + path.length() + " " + (s + path.length()));
+//
+//                break;
+//            }
+//            catch (KeeperException.ConnectionLossException e) {
+//                s -= 1;
+//            }
+//        }
+//    }
+
     /**
      * @throws Exception If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8ee69f50/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index bbf2945..a46c678 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -110,6 +110,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
     /** */
     private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>();
 
+    /** */
+    private Map<String, Object> userAttrs;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
         if (testSockNio)
@@ -145,6 +148,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
 
         cfg.setClientMode(client);
 
+        if (userAttrs != null)
+            cfg.setUserAttributes(userAttrs);
+
         Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
 
         lsnrs.put(new IgnitePredicate<Event>() {
@@ -1107,6 +1113,22 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If failed.
      */
+    public void testLargeUserAttribute() throws Exception {
+        userAttrs = new HashMap<>();
+
+        int[] attr = new int[1024 * 1024];
+
+        for (int i = 0; i < attr.length; i++)
+            attr[i] = i;
+
+        userAttrs.put("testAttr", attr);
+
+        startGrid(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testClientReconnectSessionExpire1() throws Exception {
         startGrid(0);
 


Mime
View raw message