helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-489] Convert rabbitmq to use new API, rb=25927
Date Tue, 23 Sep 2014 17:01:28 GMT
Repository: helix
Updated Branches:
  refs/heads/master d9b25a66f -> 00dc16db3


[HELIX-489] Convert rabbitmq to use new API, rb=25927


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/00dc16db
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/00dc16db
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/00dc16db

Branch: refs/heads/master
Commit: 00dc16db36c90fea005fd5ff7fe2122c6ab283c3
Parents: d9b25a6
Author: zzhang <zzhang@apache.org>
Authored: Tue Sep 23 10:01:16 2014 -0700
Committer: zzhang <zzhang@apache.org>
Committed: Tue Sep 23 10:01:16 2014 -0700

----------------------------------------------------------------------
 .../apache/helix/recipes/rabbitmq/Consumer.java |  45 ++++----
 .../recipes/rabbitmq/ConsumerStateModel.java    | 105 ------------------
 .../ConsumerStateTransitionHandlerFactory.java  |  40 -------
 .../helix/recipes/rabbitmq/ConsumerThread.java  |  11 +-
 .../rabbitmq/ConsumerTransitionHandler.java     | 107 +++++++++++++++++++
 .../ConsumerTransitionHandlerFactory.java       |  41 +++++++
 6 files changed, 181 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/00dc16db/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
index de56171..e92cb30 100644
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
@@ -21,43 +21,45 @@ package org.apache.helix.recipes.rabbitmq;
 
 import java.util.List;
 
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.manager.zk.ZkHelixConnection;
 
 public class Consumer {
   private final String _zkAddr;
-  private final String _clusterName;
-  private final String _consumerId;
+  private final ClusterId _clusterId;
+  private final ParticipantId _consumerId;
   private final String _mqServer;
-  private HelixManager _manager = null;
+  HelixConnection _connection;
+  private HelixParticipant _participant = null;
 
-  public Consumer(String zkAddr, String clusterName, String consumerId, String mqServer)
{
+  public Consumer(String zkAddr, ClusterId clusterId, ParticipantId consumerId, String mqServer)
{
     _zkAddr = zkAddr;
-    _clusterName = clusterName;
+    _clusterId = clusterId;
     _consumerId = consumerId;
     _mqServer = mqServer;
   }
 
   public void connect() {
     try {
-      _manager =
-          HelixManagerFactory.getZKHelixManager(_clusterName, _consumerId,
-              InstanceType.PARTICIPANT, _zkAddr);
+      _connection = new ZkHelixConnection(_zkAddr);
+      _connection.connect();
+      _participant = _connection.createParticipant(_clusterId, _consumerId);
 
-      StateMachineEngine stateMach = _manager.getStateMachineEngine();
-      ConsumerStateTransitionHandlerFactory transitionHandlerFactory =
-          new ConsumerStateTransitionHandlerFactory(_consumerId, _mqServer);
+      StateMachineEngine stateMach = _participant.getStateMachineEngine();
+      ConsumerTransitionHandlerFactory transitionHandlerFactory =
+          new ConsumerTransitionHandlerFactory(_consumerId, _mqServer);
       stateMach.registerStateModelFactory(
           StateModelDefId.from(SetupConsumerCluster.DEFAULT_STATE_MODEL), transitionHandlerFactory);
-
-      _manager.connect();
+      _participant.start();
 
       Thread.currentThread().join();
     } catch (InterruptedException e) {
@@ -71,8 +73,12 @@ public class Consumer {
   }
 
   public void disconnect() {
-    if (_manager != null) {
-      _manager.disconnect();
+    if (_participant != null) {
+      _participant.stop();
+    }
+
+    if (_connection != null) {
+      _connection.disconnect();
     }
   }
 
@@ -106,7 +112,8 @@ public class Consumer {
 
       // start consumer
       final Consumer consumer =
-          new Consumer(zkAddr, clusterName, "consumer_" + consumerId, mqServer);
+          new Consumer(zkAddr, ClusterId.from(clusterName), ParticipantId.from("consumer_"
+              + consumerId), mqServer);
 
       Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/00dc16db/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
deleted file mode 100644
index b41ccf7..0000000
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateModel.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package org.apache.helix.recipes.rabbitmq;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.log4j.Logger;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.api.TransitionHandler;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-
-@StateModelInfo(initialState = "OFFLINE", states = {
-    "ONLINE", "ERROR"
-})
-public class ConsumerStateModel extends TransitionHandler {
-  private static Logger LOG = Logger.getLogger(ConsumerStateModel.class);
-
-  private final String _consumerId;
-  private final String _partition;
-
-  private final String _mqServer;
-  private ConsumerThread _thread = null;
-
-  public ConsumerStateModel(String consumerId, String partition, String mqServer) {
-    _partition = partition;
-    _consumerId = consumerId;
-    _mqServer = mqServer;
-  }
-
-  @Transition(to = "ONLINE", from = "OFFLINE")
-  public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
-    LOG.debug(_consumerId + " becomes ONLINE from OFFLINE for " + _partition);
-
-    if (_thread == null) {
-      LOG.debug("Starting ConsumerThread for " + _partition + "...");
-      _thread = new ConsumerThread(_partition, _mqServer, _consumerId);
-      _thread.start();
-      LOG.debug("Starting ConsumerThread for " + _partition + " done");
-
-    }
-  }
-
-  @Transition(to = "OFFLINE", from = "ONLINE")
-  public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
-      throws InterruptedException {
-    LOG.debug(_consumerId + " becomes OFFLINE from ONLINE for " + _partition);
-
-    if (_thread != null) {
-      LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
-
-      _thread.interrupt();
-      _thread.join(2000);
-      _thread = null;
-      LOG.debug("Stopping " + _consumerId + " for " + _partition + " done");
-
-    }
-  }
-
-  @Transition(to = "DROPPED", from = "OFFLINE")
-  public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
-    LOG.debug(_consumerId + " becomes DROPPED from OFFLINE for " + _partition);
-  }
-
-  @Transition(to = "OFFLINE", from = "ERROR")
-  public void onBecomeOfflineFromError(Message message, NotificationContext context) {
-    LOG.debug(_consumerId + " becomes OFFLINE from ERROR for " + _partition);
-  }
-
-  @Override
-  public void reset() {
-    LOG.warn("Default reset() invoked");
-
-    if (_thread != null) {
-      LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
-
-      _thread.interrupt();
-      try {
-        _thread.join(2000);
-      } catch (InterruptedException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-      _thread = null;
-      LOG.debug("Stopping " + _consumerId + " for " + _partition + " done");
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/00dc16db/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateTransitionHandlerFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateTransitionHandlerFactory.java
b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateTransitionHandlerFactory.java
deleted file mode 100644
index a0d11f8..0000000
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerStateTransitionHandlerFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.helix.recipes.rabbitmq;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.api.StateTransitionHandlerFactory;
-import org.apache.helix.api.id.PartitionId;
-
-public class ConsumerStateTransitionHandlerFactory extends StateTransitionHandlerFactory<ConsumerStateModel>
{
-  private final String _consumerId;
-  private final String _mqServer;
-
-  public ConsumerStateTransitionHandlerFactory(String consumerId, String msServer) {
-    _consumerId = consumerId;
-    _mqServer = msServer;
-  }
-
-  @Override
-  public ConsumerStateModel createStateTransitionHandler(PartitionId partition) {
-    ConsumerStateModel model =
-        new ConsumerStateModel(_consumerId, partition.stringify(), _mqServer);
-    return model;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/00dc16db/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
index ddd466c..d34fc6f 100644
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerThread.java
@@ -21,6 +21,9 @@ package org.apache.helix.recipes.rabbitmq;
 
 import java.io.IOException;
 
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
@@ -28,11 +31,11 @@ import com.rabbitmq.client.QueueingConsumer;
 
 public class ConsumerThread extends Thread {
   private static final String EXCHANGE_NAME = "topic_logs";
-  private final String _partition;
+  private final PartitionId _partition;
   private final String _mqServer;
-  private final String _consumerId;
+  private final ParticipantId _consumerId;
 
-  public ConsumerThread(String partition, String mqServer, String consumerId) {
+  public ConsumerThread(PartitionId partition, String mqServer, ParticipantId consumerId)
{
     _partition = partition;
     _mqServer = mqServer;
     _consumerId = consumerId;
@@ -50,7 +53,7 @@ public class ConsumerThread extends Thread {
       channel.exchangeDeclare(EXCHANGE_NAME, "topic");
       String queueName = channel.queueDeclare().getQueue();
 
-      String bindingKey = _partition;
+      String bindingKey = _partition.toString();
       channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
 
       System.out.println(" [*] " + _consumerId + " Waiting for messages on " + bindingKey

http://git-wip-us.apache.org/repos/asf/helix/blob/00dc16db/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerTransitionHandler.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerTransitionHandler.java
b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerTransitionHandler.java
new file mode 100644
index 0000000..8662bc7
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerTransitionHandler.java
@@ -0,0 +1,107 @@
+package org.apache.helix.recipes.rabbitmq;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.log4j.Logger;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+
+@StateModelInfo(initialState = "OFFLINE", states = {
+    "ONLINE", "ERROR"
+})
+public class ConsumerTransitionHandler extends TransitionHandler {
+  private static Logger LOG = Logger.getLogger(ConsumerTransitionHandler.class);
+
+  private final ParticipantId _consumerId;
+  private final PartitionId _partition;
+
+  private final String _mqServer;
+  private ConsumerThread _thread = null;
+
+  public ConsumerTransitionHandler(ParticipantId consumerId, PartitionId partition, String
mqServer) {
+    _partition = partition;
+    _consumerId = consumerId;
+    _mqServer = mqServer;
+  }
+
+  @Transition(to = "ONLINE", from = "OFFLINE")
+  public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+    LOG.debug(_consumerId + " becomes ONLINE from OFFLINE for " + _partition);
+
+    if (_thread == null) {
+      LOG.debug("Starting ConsumerThread for " + _partition + "...");
+      _thread = new ConsumerThread(_partition, _mqServer, _consumerId);
+      _thread.start();
+      LOG.debug("Starting ConsumerThread for " + _partition + " done");
+
+    }
+  }
+
+  @Transition(to = "OFFLINE", from = "ONLINE")
+  public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
+      throws InterruptedException {
+    LOG.debug(_consumerId + " becomes OFFLINE from ONLINE for " + _partition);
+
+    if (_thread != null) {
+      LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
+
+      _thread.interrupt();
+      _thread.join(2000);
+      _thread = null;
+      LOG.debug("Stopping " + _consumerId + " for " + _partition + " done");
+
+    }
+  }
+
+  @Transition(to = "DROPPED", from = "OFFLINE")
+  public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
+    LOG.debug(_consumerId + " becomes DROPPED from OFFLINE for " + _partition);
+  }
+
+  @Transition(to = "OFFLINE", from = "ERROR")
+  public void onBecomeOfflineFromError(Message message, NotificationContext context) {
+    LOG.debug(_consumerId + " becomes OFFLINE from ERROR for " + _partition);
+  }
+
+  @Override
+  public void reset() {
+    LOG.warn("Default reset() invoked");
+
+    if (_thread != null) {
+      LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
+
+      _thread.interrupt();
+      try {
+        _thread.join(2000);
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      _thread = null;
+      LOG.debug("Stopping " + _consumerId + " for " + _partition + " done");
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/00dc16db/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerTransitionHandlerFactory.java
----------------------------------------------------------------------
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerTransitionHandlerFactory.java
b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerTransitionHandlerFactory.java
new file mode 100644
index 0000000..a040a9c
--- /dev/null
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/ConsumerTransitionHandlerFactory.java
@@ -0,0 +1,41 @@
+package org.apache.helix.recipes.rabbitmq;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+
+public class ConsumerTransitionHandlerFactory extends StateTransitionHandlerFactory<ConsumerTransitionHandler>
{
+  private final ParticipantId _consumerId;
+  private final String _mqServer;
+
+  public ConsumerTransitionHandlerFactory(ParticipantId consumerId, String msServer) {
+    _consumerId = consumerId;
+    _mqServer = msServer;
+  }
+
+  @Override
+  public ConsumerTransitionHandler createStateTransitionHandler(PartitionId partition) {
+    ConsumerTransitionHandler model =
+        new ConsumerTransitionHandler(_consumerId, partition, _mqServer);
+    return model;
+  }
+}


Mime
View raw message