helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [1/4] [HELIX-470] Netty-based IPC layer
Date Thu, 28 Aug 2014 18:00:15 GMT
Repository: helix
Updated Branches:
  refs/heads/master 59b4bbb0b -> d8ec1ae75


http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java b/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java
new file mode 100644
index 0000000..b399377
--- /dev/null
+++ b/helix-ipc/src/test/java/org/apache/helix/ipc/TestNettyHelixIPCService.java
@@ -0,0 +1,353 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.ipc.netty.NettyHelixIPCService;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.helix.resolver.HelixResolver;
+import org.apache.helix.resolver.zk.ZKHelixResolver;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestNettyHelixIPCService extends ZkTestBase {
+
+  private static final Logger LOG = Logger.getLogger(TestNettyHelixIPCService.class);
+
+  private static final String CLUSTER_NAME = "TEST_CLUSTER";
+  private static final String RESOURCE_NAME = "MyResource";
+
+  private int firstPort;
+  private int secondPort;
+  private HelixManager controller;
+  private HelixManager firstNode;
+  private HelixManager secondNode;
+  private HelixResolver firstResolver;
+  private HelixResolver secondResolver;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    // Allocate test resources
+    firstPort = TestHelper.getRandomPort();
+    secondPort = TestHelper.getRandomPort();
+
+    // Setup cluster
+    ClusterSetup clusterSetup = new ClusterSetup(_zkaddr);
+    clusterSetup.addCluster(CLUSTER_NAME, true);
+    clusterSetup.addInstanceToCluster(CLUSTER_NAME, "localhost_" + firstPort);
+    clusterSetup.addInstanceToCluster(CLUSTER_NAME, "localhost_" + secondPort);
+
+    // Start Helix agents
+    controller =
+        HelixControllerMain.startHelixController(_zkaddr, CLUSTER_NAME, "CONTROLLER", "STANDALONE");
+    firstNode =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "localhost_" + firstPort,
+            InstanceType.PARTICIPANT, _zkaddr);
+    secondNode =
+        HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "localhost_" + secondPort,
+            InstanceType.PARTICIPANT, _zkaddr);
+
+    // Connect participants
+    firstNode.getStateMachineEngine().registerStateModelFactory(
+        StateModelDefId.from("OnlineOffline"), new DummyStateModelFactory());
+    firstNode.connect();
+    secondNode.getStateMachineEngine().registerStateModelFactory(
+        StateModelDefId.from("OnlineOffline"), new DummyStateModelFactory());
+    secondNode.connect();
+
+    // Add a resource
+    clusterSetup.addResourceToCluster(CLUSTER_NAME, RESOURCE_NAME, 4, "OnlineOffline");
+    clusterSetup.rebalanceResource(CLUSTER_NAME, RESOURCE_NAME, 1);
+
+    // Wait for External view convergence
+    ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+        _zkaddr, CLUSTER_NAME), 10000);
+
+    // Connect resolvers
+    firstResolver = new ZKHelixResolver(_zkaddr);
+    firstResolver.connect();
+    secondResolver = new ZKHelixResolver(_zkaddr);
+    secondResolver.connect();
+
+    // Configure
+    firstNode.getConfigAccessor().set(
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
+            .forCluster(firstNode.getClusterName()).forParticipant(firstNode.getInstanceName())
+            .build(), HelixIPCService.IPC_PORT, String.valueOf(firstPort));
+    secondNode.getConfigAccessor().set(
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT)
+            .forCluster(secondNode.getClusterName()).forParticipant(secondNode.getInstanceName())
+            .build(), HelixIPCService.IPC_PORT, String.valueOf(secondPort));
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    firstNode.disconnect();
+    secondNode.disconnect();
+    controller.disconnect();
+  }
+
+  @Test
+  public void testService() throws Exception {
+    final int numMessages = 1000;
+    final int messageType = 1;
+
+    // Start first IPC service w/ counter
+    final ConcurrentMap<String, AtomicInteger> firstCounts =
+        new ConcurrentHashMap<String, AtomicInteger>();
+    final HelixIPCService firstIPC =
+        new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+            firstNode.getInstanceName()).setPort(firstPort));
+    firstIPC.registerCallback(messageType, new HelixIPCCallback() {
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        String key = scope.getPartition() + ":" + scope.getState();
+        firstCounts.putIfAbsent(key, new AtomicInteger());
+        firstCounts.get(key).incrementAndGet();
+      }
+    });
+    firstIPC.start();
+
+    // Start second IPC Service w/ counter
+    final ConcurrentMap<String, AtomicInteger> secondCounts =
+        new ConcurrentHashMap<String, AtomicInteger>();
+    final HelixIPCService secondIPC =
+        new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+            secondNode.getInstanceName()).setPort(secondPort));
+    secondIPC.registerCallback(messageType, new HelixIPCCallback() {
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        String key = scope.getPartition() + ":" + scope.getState();
+        secondCounts.putIfAbsent(key, new AtomicInteger());
+        secondCounts.get(key).incrementAndGet();
+      }
+    });
+    secondIPC.start();
+
+    // Allow resolver callbacks to fire
+    Thread.sleep(500);
+
+    // Find all partitions on second node...
+    String secondName = "localhost_" + secondPort;
+    Set<String> secondPartitions = new HashSet<String>();
+    IdealState idealState =
+        controller.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME);
+    for (String partitionName : idealState.getPartitionSet()) {
+      for (Map.Entry<String, String> stateEntry : idealState.getInstanceStateMap(partitionName)
+          .entrySet()) {
+        if (stateEntry.getKey().equals(secondName)) {
+          secondPartitions.add(partitionName);
+        }
+      }
+    }
+
+    // And use first node to send messages to them
+    for (String partitionName : secondPartitions) {
+      for (int i = 0; i < numMessages; i++) {
+        HelixMessageScope scope =
+            new HelixMessageScope.Builder().cluster(firstNode.getClusterName())
+                .resource(RESOURCE_NAME).partition(partitionName).state("ONLINE").build();
+
+        Set<HelixAddress> destinations = firstResolver.getDestinations(scope);
+        for (HelixAddress destination : destinations) {
+          ByteBuf message = Unpooled.wrappedBuffer(("Hello" + i).getBytes());
+          firstIPC.send(destination, messageType, UUID.randomUUID(), message);
+        }
+      }
+    }
+
+    // Loopback
+    for (String partitionName : secondPartitions) {
+      for (int i = 0; i < numMessages; i++) {
+        HelixMessageScope scope =
+            new HelixMessageScope.Builder().cluster(secondNode.getClusterName())
+                .resource(RESOURCE_NAME).partition(partitionName).state("ONLINE").build();
+
+        Set<HelixAddress> destinations = secondResolver.getDestinations(scope);
+        for (HelixAddress destination : destinations) {
+          ByteBuf message = Unpooled.wrappedBuffer(("Hello" + i).getBytes());
+          secondIPC.send(destination, messageType, UUID.randomUUID(), message);
+        }
+      }
+    }
+
+    // Check
+    Thread.sleep(500); // just in case
+    for (String partitionName : secondPartitions) {
+      AtomicInteger count = secondCounts.get(partitionName + ":ONLINE");
+      Assert.assertNotNull(count);
+      Assert.assertEquals(count.get(), 2 * numMessages);
+    }
+
+    // Shutdown
+    firstIPC.shutdown();
+    secondIPC.shutdown();
+  }
+
+  @Test
+  public void testMessageManager() throws Exception {
+    final int numMessages = 1000;
+    final int messageType = 1;
+    final int ackMessageType = 2;
+
+    // First IPC service
+    final HelixIPCService firstIPC =
+        new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+            firstNode.getInstanceName()).setPort(firstPort));
+    firstIPC.registerCallback(messageType, new HelixIPCCallback() {
+      final Random random = new Random();
+
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        if (random.nextInt() % 2 == 0) {
+          HelixAddress sender = firstResolver.getSource(scope);
+          firstIPC.send(sender, ackMessageType, messageId, null);
+        }
+      }
+    });
+    firstIPC.start();
+
+    // Second IPC service
+    final HelixIPCService secondIPC =
+        new NettyHelixIPCService(new NettyHelixIPCService.Config().setInstanceName(
+            secondNode.getInstanceName()).setPort(secondPort));
+    secondIPC.registerCallback(messageType, new HelixIPCCallback() {
+      final Random random = new Random();
+
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        if (random.nextInt() % 2 == 0) {
+          HelixAddress sender = secondResolver.getSource(scope);
+          secondIPC.send(sender, ackMessageType, messageId, null);
+        }
+      }
+    });
+    secondIPC.start();
+
+    // Allow resolver callbacks to fire
+    Thread.sleep(500);
+
+    // Start state machine (uses first, sends to second)
+    final AtomicInteger numAcks = new AtomicInteger();
+    final AtomicInteger numErrors = new AtomicInteger();
+    HelixIPCService messageManager =
+        new HelixIPCMessageManager(Executors.newSingleThreadScheduledExecutor(), firstIPC,
300, -1);
+    messageManager.registerCallback(ackMessageType, new HelixIPCCallback() {
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        numAcks.incrementAndGet();
+      }
+    });
+    messageManager.start();
+
+    // Find all partitions on second node...
+    String secondName = "localhost_" + secondPort;
+    Set<String> secondPartitions = new HashSet<String>();
+    IdealState idealState =
+        controller.getClusterManagmentTool().getResourceIdealState(CLUSTER_NAME, RESOURCE_NAME);
+    for (String partitionName : idealState.getPartitionSet()) {
+      for (Map.Entry<String, String> stateEntry : idealState.getInstanceStateMap(partitionName)
+          .entrySet()) {
+        if (stateEntry.getKey().equals(secondName)) {
+          secondPartitions.add(partitionName);
+        }
+      }
+    }
+
+    // And use first node to send messages to them
+    for (String partitionName : secondPartitions) {
+      for (int i = 0; i < numMessages; i++) {
+        HelixMessageScope scope =
+            new HelixMessageScope.Builder().cluster(firstNode.getClusterName())
+                .resource(RESOURCE_NAME).partition(partitionName).state("ONLINE").build();
+        Set<HelixAddress> destinations = firstResolver.getDestinations(scope);
+        for (HelixAddress destination : destinations) {
+          ByteBuf message = Unpooled.wrappedBuffer(("Hello" + i).getBytes());
+          messageManager.send(destination, messageType, UUID.randomUUID(), message);
+        }
+      }
+    }
+
+    // Ensure they're all ack'ed (tests retry logic because only every other one is acked)
+    Thread.sleep(5000);
+    Assert.assertEquals(numAcks.get() + numErrors.get(), numMessages * secondPartitions.size());
+
+    // Shutdown
+    messageManager.shutdown();
+    firstIPC.shutdown();
+    secondIPC.shutdown();
+  }
+
+  public static class DummyStateModelFactory extends
+      StateTransitionHandlerFactory<TransitionHandler> {
+    @Override
+    public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
+      return new DummyStateModel();
+    }
+
+    @StateModelInfo(states = "{'OFFLINE', 'ONLINE'}", initialState = "OFFLINE")
+    public static class DummyStateModel extends TransitionHandler {
+      @Transition(from = "OFFLINE", to = "ONLINE")
+      public void fromOfflineToOnline(Message message, NotificationContext context) {
+        LOG.info(message);
+      }
+
+      @Transition(from = "ONLINE", to = "OFFLINE")
+      public void fromOnlineToOffline(Message message, NotificationContext context) {
+        LOG.info(message);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java
new file mode 100644
index 0000000..2d682f7
--- /dev/null
+++ b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java
@@ -0,0 +1,221 @@
+package org.apache.helix.ipc.benchmark;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.management.MXBean;
+import javax.management.ObjectName;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.helix.ipc.HelixIPCCallback;
+import org.apache.helix.ipc.HelixIPCService;
+import org.apache.helix.ipc.netty.NettyHelixIPCService;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Run with following to enable JMX:
+ * -Dcom.sun.management.jmxremote <br/>
+ * -Dcom.sun.management.jmxremote.port=10000 <br/>
+ * -Dcom.sun.management.jmxremote.authenticate=false <br/>
+ * -Dcom.sun.management.jmxremote.ssl=false <br/>
+ */
+public class BenchmarkDriver implements Runnable {
+
+  private static final int MESSAGE_TYPE = 1025;
+
+  private final int port;
+  private final int numPartitions;
+  private final AtomicBoolean isShutdown;
+  private final byte[] messageBytes;
+  private final int numConnections;
+
+  private HelixIPCService ipcService;
+  private String localhost;
+  private Thread[] trafficThreads;
+
+  public BenchmarkDriver(int port, int numPartitions, int numThreads, int messageSize,
+      int numConnections) {
+    this.port = port;
+    this.numPartitions = numPartitions;
+    this.isShutdown = new AtomicBoolean(true);
+    this.trafficThreads = new Thread[numThreads];
+    this.numConnections = numConnections;
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < messageSize; i++) {
+      sb.append("A");
+    }
+    this.messageBytes = sb.toString().getBytes();
+  }
+
+  @Override
+  public void run() {
+    try {
+      // Register controller MBean
+      final BenchmarkDriver driver = this;
+      ManagementFactory.getPlatformMBeanServer().registerMBean(new Controller() {
+        @Override
+        public void startTraffic(String remoteHost, int remotePort) {
+          driver.startTraffic(remoteHost, remotePort);
+        }
+
+        @Override
+        public void stopTraffic() {
+          driver.stopTraffic();
+        }
+      }, new ObjectName("org.apache.helix:type=BenchmarkDriver"));
+
+      // The local server
+      localhost = InetAddress.getLocalHost().getCanonicalHostName();
+      ipcService =
+          new NettyHelixIPCService(new NettyHelixIPCService.Config()
+              .setInstanceName(localhost + "_" + port).setPort(port)
+              .setNumConnections(numConnections));
+
+      // Counts number of messages received, and ack them
+      ipcService.registerCallback(MESSAGE_TYPE, new HelixIPCCallback() {
+        @Override
+        public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+          // Do nothing
+        }
+      });
+
+      ipcService.start();
+      System.out.println("Started IPC service on "
+          + InetAddress.getLocalHost().getCanonicalHostName() + ":" + port);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void startTraffic(final String remoteHost, final int remotePort) {
+    if (isShutdown.getAndSet(false)) {
+      System.out.println("Starting " + trafficThreads.length + " threads to generate traffic");
+      for (int i = 0; i < trafficThreads.length; i++) {
+        Thread t = new Thread() {
+          @Override
+          public void run() {
+            ByteBuf m = ByteBufAllocator.DEFAULT.buffer(messageBytes.length);
+            m.writeBytes(messageBytes);
+            while (!isShutdown.get()) {
+              for (int i = 0; i < numPartitions; i++) {
+                HelixMessageScope scope =
+                    new HelixMessageScope.Builder().cluster("CLUSTER").resource("RESOURCE")
+                        .partition("PARTITION_" + i).sourceInstance(localhost + "_" + port).build();
+
+                Set<HelixAddress> destinations =
+                    ImmutableSet.of(new HelixAddress(scope, remoteHost + "_" + remotePort,
+                        new InetSocketAddress(remoteHost, remotePort)));
+
+                UUID uuid = UUID.randomUUID();
+
+                try {
+                  for (HelixAddress destination : destinations) {
+                    m.retain();
+                    ipcService.send(destination, MESSAGE_TYPE, uuid, m);
+                  }
+                } catch (Exception e) {
+                  e.printStackTrace();
+                }
+              }
+            }
+          }
+        };
+        t.start();
+        trafficThreads[i] = t;
+      }
+      System.out.println("Started traffic to " + remoteHost + ":" + remotePort);
+    }
+  }
+
+  private void stopTraffic() {
+    if (!isShutdown.getAndSet(true)) {
+      try {
+        for (Thread t : trafficThreads) {
+          t.join();
+        }
+        System.out.println("Stopped traffic");
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @MXBean
+  public interface Controller {
+    void startTraffic(String remoteHost, int remotePort);
+
+    void stopTraffic();
+  }
+
+  public static void main(String[] args) throws Exception {
+    BasicConfigurator.configure();
+    Logger.getRootLogger().setLevel(Level.DEBUG);
+
+    Options options = new Options();
+    options.addOption("partitions", true, "Number of partitions");
+    options.addOption("threads", true, "Number of threads");
+    options.addOption("messageSize", true, "Message size in bytes");
+    options.addOption("numConnections", true, "Number of connections between nodes");
+
+    CommandLine commandLine = new GnuParser().parse(options, args);
+
+    if (commandLine.getArgs().length != 1) {
+      HelpFormatter helpFormatter = new HelpFormatter();
+      helpFormatter.printHelp("usage: [options] port", options);
+      System.exit(1);
+    }
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        latch.countDown();
+      }
+    });
+
+    new BenchmarkDriver(Integer.parseInt(commandLine.getArgs()[0]), Integer.parseInt(commandLine
+        .getOptionValue("partitions", "1")), Integer.parseInt(commandLine.getOptionValue("threads",
+        "1")), Integer.parseInt(commandLine.getOptionValue("messageSize", "1024")),
+        Integer.parseInt(commandLine.getOptionValue("numConnections", "1"))).run();
+
+    latch.await();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java b/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java
new file mode 100644
index 0000000..4e802d7
--- /dev/null
+++ b/helix-ipc/src/test/java/org/apache/helix/resolver/TestZKHelixResolver.java
@@ -0,0 +1,161 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.manager.zk.MockController;
+import org.apache.helix.manager.zk.MockParticipant;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.resolver.zk.ZKHelixResolver;
+import org.apache.helix.testutil.ZkTestBase;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Test basic routing table lookups for a ZK-based Helix resolver.
+ */
+public class TestZKHelixResolver extends ZkTestBase {
+  private static final int NUM_PARTICIPANTS = 2;
+  private static final int NUM_PARTITIONS = 2;
+  private static final String CLUSTER_NAME = TestZKHelixResolver.class.getSimpleName();
+  private static final String RESOURCE_NAME = "MyResource";
+  private MockParticipant[] _participants;
+  private MockController _controller;
+  private ClusterSetup _setupTool;
+  private HelixResolver _resolver;
+  private Map<String, InetSocketAddress> _socketMap;
+
+  @BeforeClass
+  public void beforeClass() {
+    // Set up cluster
+    _setupTool = new ClusterSetup(_zkclient);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    _setupTool.addResourceToCluster(CLUSTER_NAME, RESOURCE_NAME, NUM_PARTITIONS, "OnlineOffline",
+        IdealState.RebalanceMode.FULL_AUTO.toString());
+    _setupTool.rebalanceCluster(CLUSTER_NAME, RESOURCE_NAME, 1, RESOURCE_NAME, null);
+
+    // Set up and start instances
+    _socketMap = Maps.newHashMap();
+    HelixAdmin admin = _setupTool.getClusterManagementTool();
+    _participants = new MockParticipant[NUM_PARTICIPANTS];
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      String host = "localhost";
+      int port = i;
+      int ipcPort = i + 100;
+      String instanceName = host + "_" + port;
+      InstanceConfig config = new InstanceConfig(instanceName);
+      config.setHostName(host);
+      config.setPort(Integer.toString(port));
+      config.getRecord().setSimpleField("IPC_PORT", Integer.toString(ipcPort));
+      admin.addInstance(CLUSTER_NAME, config);
+      _socketMap.put(instanceName, new InetSocketAddress(host, ipcPort));
+      _participants[i] = new MockParticipant(_zkaddr, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+    }
+
+    // Start controller
+    _controller = new MockController(_zkaddr, CLUSTER_NAME, "controller_0");
+    _controller.syncStart();
+
+    // Connect a resolver
+    _resolver = new ZKHelixResolver(_zkaddr);
+    _resolver.connect();
+
+    // Wait for External view convergence
+    ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
+        _zkaddr, CLUSTER_NAME), 10000);
+  }
+
+  @Test
+  public void testResolution() {
+    HelixMessageScope clusterScope = new HelixMessageScope.Builder().cluster(CLUSTER_NAME).build();
+    Set<HelixAddress> destinations = _resolver.getDestinations(clusterScope);
+    Assert.assertNotNull(destinations);
+    Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>();
+    for (HelixAddress destination : destinations) {
+      addresses.add(destination.getSocketAddress());
+    }
+    Assert.assertTrue(addresses.containsAll(_socketMap.values()), "Expected " + _socketMap.values()
+        + ", found " + addresses);
+
+    HelixMessageScope resourceScope =
+        new HelixMessageScope.Builder().cluster(CLUSTER_NAME).resource(RESOURCE_NAME).build();
+    destinations = _resolver.getDestinations(resourceScope);
+    Assert.assertNotNull(destinations);
+    addresses.clear();
+    for (HelixAddress destination : destinations) {
+      addresses.add(destination.getSocketAddress());
+    }
+    Assert.assertTrue(addresses.containsAll(_socketMap.values()), "Expected " + _socketMap.values()
+        + ", found " + addresses);
+
+    HelixMessageScope partition0Scope =
+        new HelixMessageScope.Builder().cluster(CLUSTER_NAME).resource(RESOURCE_NAME)
+            .partition(RESOURCE_NAME + "_0").build();
+    destinations = _resolver.getDestinations(partition0Scope);
+    Assert.assertNotNull(destinations);
+    ExternalView externalView =
+        _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, RESOURCE_NAME);
+    Set<String> instanceSet = externalView.getStateMap(RESOURCE_NAME + "_0").keySet();
+    Set<InetSocketAddress> expectedSocketAddrs = Sets.newHashSet();
+    for (String instanceName : instanceSet) {
+      expectedSocketAddrs.add(_socketMap.get(instanceName));
+    }
+    addresses.clear();
+    for (HelixAddress destination : destinations) {
+      addresses.add(destination.getSocketAddress());
+    }
+    Assert.assertEquals(addresses, expectedSocketAddrs, "Expected " + expectedSocketAddrs
+        + ", found " + addresses);
+
+    HelixMessageScope sourceInstanceScope =
+        new HelixMessageScope.Builder().cluster(CLUSTER_NAME).resource(RESOURCE_NAME)
+            .partition(RESOURCE_NAME + "_0").sourceInstance(_participants[0].getInstanceName())
+            .build();
+    HelixAddress sourceAddress = _resolver.getSource(sourceInstanceScope);
+    Assert.assertNotNull(sourceAddress);
+    Assert.assertEquals(sourceAddress.getSocketAddress(),
+        _socketMap.get(_participants[0].getInstanceName()));
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _resolver.disconnect();
+    _controller.syncStop();
+    for (MockParticipant participant : _participants) {
+      participant.syncStop();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/resources/build_benchmark.sh
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/resources/build_benchmark.sh b/helix-ipc/src/test/resources/build_benchmark.sh
new file mode 100644
index 0000000..8b94e38
--- /dev/null
+++ b/helix-ipc/src/test/resources/build_benchmark.sh
@@ -0,0 +1,29 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+cd helix-ipc
+mvn package -DskipTests
+mkdir -p /tmp/ipc-benchmark
+cp target/helix-ipc-0.7-1-jar-with-dependencies.jar /tmp/ipc-benchmark
+cp target/helix-ipc-0.7.1-tests.jar /tmp/ipc-benchmark
+cp src/test/resources/run_benchmark.sh /tmp/ipc-benchmark
+cd /tmp
+tar cvzf ipc-benchmark.tar.gz ipc-benchmark

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/test/resources/run_benchmark.sh
----------------------------------------------------------------------
diff --git a/helix-ipc/src/test/resources/run_benchmark.sh b/helix-ipc/src/test/resources/run_benchmark.sh
new file mode 100644
index 0000000..0b9373d
--- /dev/null
+++ b/helix-ipc/src/test/resources/run_benchmark.sh
@@ -0,0 +1,37 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+$JAVA_HOME/bin/java -cp helix-ipc-0.7.1-jar-with-dependencies.jar:helix-ipc-0.7.1-tests.jar
\
+    -Xloggc:gc_$JMX_PORT.log \
+    -XX:+UseConcMarkSweepGC \
+    -XX:+UseCMSInitiatingOccupancyOnly \
+    -XX:CMSInitiatingOccupancyFraction=65 \
+    -XX:NewRatio=2 \
+    -Xmx4g \
+    -Xms4g \
+    -Dcom.sun.management.jmxremote \
+    -Dcom.sun.management.jmxremote.port=$JMX_PORT \
+    -Dcom.sun.management.jmxremote.authenticate=false \
+    -Dcom.sun.management.jmxremote.ssl=false \
+    -Dio.netty.resourceLeakDetection \
+    -Dio.netty.allocator.type=pooled \
+    -Dio.netty.noPreferDirect=false \
+    org.apache.helix.ipc.benchmark.BenchmarkDriver $@

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4bd6f2e..ae49319 100644
--- a/pom.xml
+++ b/pom.xml
@@ -199,6 +199,7 @@ under the License.
     <module>helix-agent</module>
     <module>helix-provisioning</module>
     <module>helix-examples</module>
+    <module>helix-ipc</module>
     <module>recipes</module>
   </modules>
 


Mime
View raw message