helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/7] helix git commit: Add protective check for ZooKeeper writing data that is bigger than 1MB
Date Wed, 04 Oct 2017 01:49:42 GMT
Repository: helix
Updated Branches:
  refs/heads/master 6775cd3ff -> d57882b9b


Add protective check for ZooKeeper writing data that is bigger than 1MB

ZooKeeper server drops connections for requests that are trying to write data bigger than
1 MB, without returning any error code. When a Helix user does so, the request times out without
giving a reason.

ZkClient in Helix is a wrapper for ZkClient in ZooKeeper. Add check in the Helix ZkClient
wrapper to give user exact timeout reason when data is bigger than 1MB.

Add unit test.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f9f554e6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f9f554e6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f9f554e6

Branch: refs/heads/master
Commit: f9f554e68bfbffdfd8f87db76d546f7202f1541b
Parents: 6775cd3
Author: Weihan Kong <wkong@linkedin.com>
Authored: Mon Jan 23 17:18:00 2017 -0800
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Tue Oct 3 14:49:05 2017 -0700

----------------------------------------------------------------------
 .../org/apache/helix/manager/zk/ZkClient.java   |  24 +++-
 .../org/apache/helix/TestZkClientWrapper.java   | 116 ------------------
 .../apache/helix/manager/zk/TestZkClient.java   | 122 +++++++++++++++++++
 3 files changed, 140 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f9f554e6/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 0a61e82..8f11eb3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -30,6 +30,8 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
@@ -287,7 +289,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
     long startT = System.nanoTime();
     try {
       final byte[] data = serialize(datat, path);
-
+      checkDataSizeLimit(data);
       retryUntilConnected(new Callable<Object>() {
 
         @Override
@@ -308,12 +310,13 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
       throws InterruptedException {
     long start = System.nanoTime();
     try {
-      final byte[] bytes = _zkSerializer.serialize(datat, path);
+      final byte[] data = _zkSerializer.serialize(datat, path);
+      checkDataSizeLimit(data);
       return retryUntilConnected(new Callable<Stat>() {
 
         @Override
         public Stat call() throws Exception {
-          return ((ZkConnection) _connection).getZookeeper().setData(path, bytes, expectedVersion);
+          return ((ZkConnection) _connection).getZookeeper().setData(path, data, expectedVersion);
         }
       });
     } finally {
@@ -325,7 +328,7 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
   }
 
   @Override
-  public String create(final String path, Object data, final CreateMode mode)
+  public String create(final String path, Object datat, final CreateMode mode)
       throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException
{
     if (path == null) {
       throw new NullPointerException("path must not be null.");
@@ -333,13 +336,14 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
 
     long startT = System.nanoTime();
     try {
-      final byte[] bytes = data == null ? null : serialize(data, path);
+      final byte[] data = datat == null ? null : serialize(datat, path);
+      checkDataSizeLimit(data);
 
       return retryUntilConnected(new Callable<String>() {
 
         @Override
         public String call() throws Exception {
-          return _connection.create(path, bytes, mode);
+          return _connection.create(path, data, mode);
         }
       });
     } finally {
@@ -451,4 +455,12 @@ public class ZkClient extends org.I0Itec.zkclient.ZkClient {
       }
     });
   }
+
+  private void checkDataSizeLimit(byte[] data) {
+    if (data != null && data.length > ZNRecord.SIZE_LIMIT) {
+      LOG.error("Data size larger than 1M, will not write to zk. Data (first 1k): "
+          + new String(data).substring(0, 1024));
+      throw new HelixException("Data size larger than 1M");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f9f554e6/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java b/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
deleted file mode 100644
index bc3d266..0000000
--- a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.helix;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestZkClientWrapper extends ZkUnitTestBase {
-  private static Logger LOG = Logger.getLogger(TestZkClientWrapper.class);
-
-  ZkClient _zkClient;
-
-  @BeforeClass
-  public void beforeClass() {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-  }
-
-  @AfterClass
-  public void afterClass() {
-    _zkClient.close();
-  }
-
-  @Test()
-  void testGetStat() {
-    String path = "/tmp/getStatTest";
-    _zkClient.deleteRecursive(path);
-
-    Stat stat, newStat;
-    stat = _zkClient.getStat(path);
-    AssertJUnit.assertNull(stat);
-    _zkClient.createPersistent(path, true);
-
-    stat = _zkClient.getStat(path);
-    AssertJUnit.assertNotNull(stat);
-
-    newStat = _zkClient.getStat(path);
-    AssertJUnit.assertEquals(stat, newStat);
-
-    _zkClient.writeData(path, new ZNRecord("Test"));
-    newStat = _zkClient.getStat(path);
-    AssertJUnit.assertNotSame(stat, newStat);
-  }
-
-  @Test()
-  void testSessioExpire() throws Exception {
-    IZkStateListener listener = new IZkStateListener() {
-
-      @Override
-      public void handleStateChanged(KeeperState state) throws Exception {
-        System.out.println("In Old connection New state " + state);
-      }
-
-      @Override
-      public void handleNewSession() throws Exception {
-        System.out.println("In Old connection New session");
-      }
-
-      @Override
-      public void handleSessionEstablishmentError(Throwable var1) throws Exception {
-      }
-    };
-
-    _zkClient.subscribeStateChanges(listener);
-    ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
-    ZooKeeper zookeeper = connection.getZookeeper();
-    System.out.println("old sessionId= " + zookeeper.getSessionId());
-    Watcher watcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        System.out.println("In New connection In process event:" + event);
-      }
-    };
-    ZooKeeper newZookeeper =
-        new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
-            zookeeper.getSessionId(), zookeeper.getSessionPasswd());
-    Thread.sleep(3000);
-    System.out.println("New sessionId= " + newZookeeper.getSessionId());
-    Thread.sleep(3000);
-    newZookeeper.close();
-    Thread.sleep(10000);
-    connection = ((ZkConnection) _zkClient.getConnection());
-    zookeeper = connection.getZookeeper();
-    System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/f9f554e6/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
new file mode 100644
index 0000000..0019d40
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
@@ -0,0 +1,122 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestZkClient extends ZkUnitTestBase {
+  private static Logger LOG = Logger.getLogger(TestZkClient.class);
+
+  ZkClient _zkClient;
+
+  @BeforeClass
+  public void beforeClass() {
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _zkClient.close();
+  }
+
+  @Test()
+  void testGetStat() {
+    String path = "/tmp/getStatTest";
+    _zkClient.deleteRecursive(path);
+
+    Stat stat, newStat;
+    stat = _zkClient.getStat(path);
+    AssertJUnit.assertNull(stat);
+    _zkClient.createPersistent(path, true);
+
+    stat = _zkClient.getStat(path);
+    AssertJUnit.assertNotNull(stat);
+
+    newStat = _zkClient.getStat(path);
+    AssertJUnit.assertEquals(stat, newStat);
+
+    _zkClient.writeData(path, new ZNRecord("Test"));
+    newStat = _zkClient.getStat(path);
+    AssertJUnit.assertNotSame(stat, newStat);
+  }
+
+  @Test()
+  void testSessioExpire() throws Exception {
+    IZkStateListener listener = new IZkStateListener() {
+
+      @Override
+      public void handleStateChanged(KeeperState state) throws Exception {
+        System.out.println("In Old connection New state " + state);
+      }
+
+      @Override
+      public void handleNewSession() throws Exception {
+        System.out.println("In Old connection New session");
+      }
+
+      @Override
+      public void handleSessionEstablishmentError(Throwable var1) throws Exception {
+      }
+    };
+
+    _zkClient.subscribeStateChanges(listener);
+    ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
+    ZooKeeper zookeeper = connection.getZookeeper();
+    System.out.println("old sessionId= " + zookeeper.getSessionId());
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        System.out.println("In New connection In process event:" + event);
+      }
+    };
+    ZooKeeper newZookeeper =
+        new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
+            zookeeper.getSessionId(), zookeeper.getSessionPasswd());
+    Thread.sleep(3000);
+    System.out.println("New sessionId= " + newZookeeper.getSessionId());
+    Thread.sleep(3000);
+    newZookeeper.close();
+    Thread.sleep(10000);
+    connection = ((ZkConnection) _zkClient.getConnection());
+    zookeeper = connection.getZookeeper();
+    System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
+  }
+
+  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data
size larger than 1M.*")
+  void testDataSizeLimit() {
+    ZNRecord data = new ZNRecord(new String(new char[1024*1024]));
+    _zkClient.writeData("/test", data, -1);
+  }
+}


Mime
View raw message