helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [1/2] git commit: Add liveinstnaceInfoProvider which allow application to store info in LiveInstance ZNode
Date Thu, 21 Mar 2013 23:19:16 GMT
Updated Branches:
  refs/heads/master 4d7703fcf -> 72559b152


Add liveinstnaceInfoProvider which allow application to store info in
LiveInstance ZNode

Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/96551332
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/96551332
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/96551332

Branch: refs/heads/master
Commit: 965513320811f8668b1b27c78cc9da567c5cf5b6
Parents: 78296ef
Author: slu2011 <lushi04@gmail.com>
Authored: Thu Mar 21 16:17:29 2013 -0700
Committer: slu2011 <lushi04@gmail.com>
Committed: Thu Mar 21 16:17:29 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/HelixManager.java   |    8 +
 .../org/apache/helix/LiveInstanceInfoProvider.java |   31 ++++
 .../apache/helix/manager/zk/ZKHelixManager.java    |   22 +++
 .../java/org/apache/helix/util/ZKClientPool.java   |   65 ++++++++
 .../src/test/java/org/apache/helix/Mocks.java      |    8 +
 .../controller/stages/DummyClusterManager.java     |    9 +
 .../helix/manager/zk/TestZkClusterManager.java     |  123 ++++++++++++++-
 .../helix/participant/MockZKHelixManager.java      |    9 +
 8 files changed, 274 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96551332/helix-core/src/main/java/org/apache/helix/HelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 7c91cd6..b33082c 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -293,4 +293,12 @@ public interface HelixManager
    * @param callback
    */
   void addPreConnectCallback(PreConnectCallback callback);
+  
+  /**
+   * Add a LiveInstanceInfoProvider that is invoked before cluster manager connects
+   * 
+   * @see LiveInstanceInfoProvider#getAdditionalLiveInstanceInfo()
+   * @param liveInstanceInfoProvider
+   */
+  void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider);
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96551332/helix-core/src/main/java/org/apache/helix/LiveInstanceInfoProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/LiveInstanceInfoProvider.java b/helix-core/src/main/java/org/apache/helix/LiveInstanceInfoProvider.java
new file mode 100644
index 0000000..1651839
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/LiveInstanceInfoProvider.java
@@ -0,0 +1,31 @@
+package org.apache.helix;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public interface LiveInstanceInfoProvider
+{
+  /**
+   * Callback function that is called by HelixManager before it creates LiveInstance Zk Node.

+   * The ZNRecord returned by this function 
+   * 
+   * @see ZkHelixManager#addLiveInstance()
+   * @see HelixManager#setLiveInstanceInfoProvider(LiveInstanceInfoProvider)
+   */
+  ZNRecord getAdditionalLiveInstanceInfo();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96551332/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index e4cf348..f9a185b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -50,6 +50,7 @@ import org.apache.helix.IdealStateChangeListener;
 import org.apache.helix.InstanceConfigChangeListener;
 import org.apache.helix.InstanceType;
 import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.LiveInstanceInfoProvider;
 import org.apache.helix.MessageListener;
 import org.apache.helix.PreConnectCallback;
 import org.apache.helix.PropertyKey;
@@ -116,6 +117,7 @@ public class ZKHelixManager implements HelixManager
   int                                          _maxDisconnectThreshold;
   public static final int                     FLAPPING_TIME_WINDIOW   = 300000; // Default
to 300 sec
   public static final int                     MAX_DISCONNECT_THRESHOLD = 5;
+  LiveInstanceInfoProvider                    _liveInstanceInfoProvider = null;
 
   public ZKHelixManager(String clusterName,
                         String instanceName,
@@ -519,6 +521,19 @@ public class ZKHelixManager implements HelixManager
     liveInstance.setSessionId(_sessionId);
     liveInstance.setHelixVersion(_version);
     liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+    
+    if(_liveInstanceInfoProvider != null)
+    {
+      logger.info("invoking _liveInstanceInfoProvider");
+      ZNRecord additionalLiveInstanceInfo = _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
+      if(additionalLiveInstanceInfo != null)
+      {
+        additionalLiveInstanceInfo.merge(liveInstance.getRecord());
+        ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
+        liveInstance = new LiveInstance(mergedLiveInstance);
+        logger.info("liveInstance content :" + _instanceName + " " + liveInstance.toString());
+      }
+    }
 
     logger.info("Add live instance: InstanceName: " + _instanceName + " Session id:"
         + _sessionId);
@@ -1049,4 +1064,11 @@ public class ZKHelixManager implements HelixManager
       task.stop();
     }
   }
+
+  @Override
+  public void setLiveInstanceInfoProvider(
+      LiveInstanceInfoProvider liveInstanceInfoProvider)
+  {
+    _liveInstanceInfoProvider = liveInstanceInfoProvider;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96551332/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
index b9f274d..f4ee80a 100644
--- a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
+++ b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
@@ -22,6 +22,7 @@ package org.apache.helix.util;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.helix.HelixException;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.zookeeper.ZooKeeper.States;
@@ -72,4 +73,68 @@ public class ZKClientPool
   {
     _zkClientMap.clear();
   }
+  
+  public static void main(String[] args) throws InterruptedException
+  {
+    Thread /*_dataSampleThread = new Thread(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        int i = 0;
+        while(!Thread.currentThread().isInterrupted())
+        {
+          try
+          {
+            // if the queue is empty, sleep 100 ms and try again
+            Thread.sleep(1000);
+            System.out.println(i++ + "...");
+            throw new RuntimeException("" + i);
+          }
+          catch (InterruptedException e)
+          {
+            System.out.println("Collector thread interrupted" + e);
+            return;
+          }
+          catch(Throwable th)
+          {
+            System.out.println("Collector thread exception/ error" + th);
+          }
+        }
+      }
+    });
+    _dataSampleThread.start();
+    
+    Thread.sleep(10000);
+    _dataSampleThread.interrupt();
+    */
+    _dataSampleThread = new Thread(new Runnable()
+    {
+      @Override
+      public void run()
+      {
+        int i = 0;
+        while(!Thread.currentThread().isInterrupted())
+        {
+          
+            // if the queue is empty, sleep 100 ms and try again
+            try
+            {
+              Thread.sleep(1000);
+            } catch (InterruptedException e)
+            {
+              // TODO Auto-generated catch block
+              e.printStackTrace();
+            }
+            System.out.println(i++ + "...");
+            throw new Error("" + i);
+          
+        }
+      }
+    });
+    _dataSampleThread.start();
+    
+    Thread.sleep(10000);
+    _dataSampleThread.interrupt();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96551332/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index d71556a..273f7da 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -493,6 +493,14 @@ public class Mocks {
 
     }
 
+  @Override
+  public void setLiveInstanceInfoProvider(
+      LiveInstanceInfoProvider liveInstanceInfoProvider)
+  {
+    // TODO Auto-generated method stub
+    
+  }
+
 	}
 
 	public static class MockAccessor implements HelixDataAccessor // DataAccessor

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96551332/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index cad41d0..f60b3ba 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -34,6 +34,7 @@ import org.apache.helix.IdealStateChangeListener;
 import org.apache.helix.InstanceConfigChangeListener;
 import org.apache.helix.InstanceType;
 import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.LiveInstanceInfoProvider;
 import org.apache.helix.MessageListener;
 import org.apache.helix.PreConnectCallback;
 import org.apache.helix.PropertyKey;
@@ -271,4 +272,12 @@ public class DummyClusterManager implements HelixManager
   	// TODO Auto-generated method stub
 
   }
+
+  @Override
+  public void setLiveInstanceInfoProvider(
+      LiveInstanceInfoProvider liveInstanceInfoProvider)
+  {
+    // TODO Auto-generated method stub
+    
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96551332/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index 50e8a2f..04a7bdb 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -19,23 +19,31 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigScope;
 import org.apache.helix.ConfigScopeBuilder;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceInfoProvider;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkHelixTestManager;
+import org.apache.helix.ZkTestHelper;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.manager.MockListener;
 import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.tools.ClusterSetup;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
@@ -112,6 +120,120 @@ public class TestZkClusterManager extends ZkUnitTestBase
 
     System.out.println("END " + className + ".testController() at " + new Date(System.currentTimeMillis()));
   }
+  
+  @Test
+  public void testLiveInstanceInfoProvider() throws Exception
+  {
+    System.out.println("START " + className + ".testLiveInstanceInfoProvider() at " + new
Date(System.currentTimeMillis()));
+    final String clusterName = CLUSTER_PREFIX + "_" + className + "_liveInstanceInfoProvider";
+    class provider implements LiveInstanceInfoProvider
+    {
+      boolean _flag = false;
+      public provider(boolean genSessionId)
+      {
+        _flag = genSessionId;
+      }
+      @Override
+      public ZNRecord getAdditionalLiveInstanceInfo()
+      {
+        ZNRecord record = new ZNRecord("info");
+        record.setSimpleField("simple", "value");
+        List<String> listFieldVal = new ArrayList<String>();
+        listFieldVal.add("val1");
+        listFieldVal.add("val2");
+        listFieldVal.add("val3");
+        record.setListField("list", listFieldVal);
+        Map<String,String> mapFieldVal = new HashMap<String, String>();
+        mapFieldVal.put("k1", "val1");
+        mapFieldVal.put("k2","val2");
+        mapFieldVal.put("k3","val3");
+        record.setMapField("map", mapFieldVal);
+        if(_flag)
+        {
+          record.setSimpleField("SESSION_ID", "value");
+          record.setSimpleField("LIVE_INSTANCE", "value");
+          record.setSimpleField("Others", "value");
+        }
+        return record;
+      }
+    }
+    
+    
+    TestHelper.setupEmptyCluster(_gZkClient, clusterName);
+    int[] ids = {0,1,2,3, 4, 5};
+    setupInstances(clusterName, ids);
+
+    /////////////////////
+    ZKHelixManager manager = new ZKHelixManager(clusterName, "localhost_0",
+        InstanceType.PARTICIPANT,
+        ZK_ADDR);
+    manager.connect();
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    
+    LiveInstance liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_0"));
+    Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 0);
+    Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 0);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 3);
+    
+    manager = new ZKHelixManager(clusterName, "localhost_1",
+        InstanceType.PARTICIPANT,
+        ZK_ADDR);
+    manager.setLiveInstanceInfoProvider(new provider(false));
+    
+    manager.connect();
+    accessor = manager.getHelixDataAccessor();
+    
+    liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_1"));
+    Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
+    Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 4);
+    
+    manager = new ZKHelixManager(clusterName, "localhost_2",
+        InstanceType.PARTICIPANT,
+        ZK_ADDR);
+    manager.setLiveInstanceInfoProvider(new provider(true));
+    
+    manager.connect();
+    accessor = manager.getHelixDataAccessor();
+    
+    liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_2"));
+    Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
+    Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertFalse(liveInstance.getSessionId().equals("value"));
+    Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
+    
+    ////////////////////////////////////
+    
+    ZkHelixTestManager manager2 = new ZkHelixTestManager(clusterName, "localhost_3",
+        InstanceType.PARTICIPANT,
+        ZK_ADDR);
+    manager2.setLiveInstanceInfoProvider(new provider(true));
+    
+    manager2.connect();
+    accessor = manager2.getHelixDataAccessor();
+    
+    liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
+    Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
+    Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertFalse(liveInstance.getSessionId().equals("value"));
+    Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
+    String sessionId = liveInstance.getSessionId();
+    
+    ZkTestHelper.expireSession(manager2.getZkClient());
+    Thread.sleep(1000);
+    
+    liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
+    Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
+    Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
+    Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
+    Assert.assertFalse(liveInstance.getSessionId().equals("value"));
+    Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
+    Assert.assertFalse(sessionId.equals(liveInstance.getSessionId()));
+
+    System.out.println("END " + className + ".testLiveInstanceInfoProvider() at " + new Date(System.currentTimeMillis()));
+  }
 
   @Test()
   public void testAdministrator() throws Exception
@@ -152,5 +274,4 @@ public class TestZkClusterManager extends ZkUnitTestBase
 
     System.out.println("END " + className + ".testAdministrator() at " + new Date(System.currentTimeMillis()));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/96551332/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 529b534..cb13b76 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -36,6 +36,7 @@ import org.apache.helix.IdealStateChangeListener;
 import org.apache.helix.InstanceConfigChangeListener;
 import org.apache.helix.InstanceType;
 import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.LiveInstanceInfoProvider;
 import org.apache.helix.MessageListener;
 import org.apache.helix.PreConnectCallback;
 import org.apache.helix.PropertyKey;
@@ -282,4 +283,12 @@ public class MockZKHelixManager implements HelixManager
 
   }
 
+  @Override
+  public void setLiveInstanceInfoProvider(
+      LiveInstanceInfoProvider liveInstanceInfoProvider)
+  {
+    // TODO Auto-generated method stub
+    
+  }
+
 }


Mime
View raw message