hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r987314 [14/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cp...
Date Thu, 19 Aug 2010 21:25:22 GMT
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
+import org.apache.hedwig.zookeeper.ZkUtils;
+import org.apache.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback;
+import org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback;
+
+/**
+ * Topics are operated on in parallel as they are independent.
+ * 
+ */
+public class ZkTopicManager extends AbstractTopicManager implements TopicManager {
+
+    static Logger logger = Logger.getLogger(ZkTopicManager.class);
+    Random rand = new Random();
+    
+    /**
+     * Persistent storage for topic metadata.
+     */
+    private ZooKeeper zk;
+    String ephemeralNodePath;
+
+    StatCallback loadReportingStatCallback = new StatCallback() {
+        @Override
+        public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
+            if (rc != KeeperException.Code.OK.intValue()) {
+                logger.warn("Failed to update load information in zk");
+            }
+        }
+    };
+
+    // Boolean flag indicating if we should suspend activity. If this is true,
+    // all of the Ops put into the queuer will fail automatically.
+    protected volatile boolean isSuspended = false;
+
+    /**
+     * Create a new topic manager. Pass in an active ZooKeeper client object.
+     * 
+     * @param zk
+     */
+    public ZkTopicManager(final ZooKeeper zk, final ServerConfiguration cfg, ScheduledExecutorService scheduler)
+            throws UnknownHostException, PubSubException {
+
+        super(cfg, scheduler);
+        this.zk = zk;
+        this.ephemeralNodePath = cfg.getZkHostsPrefix(new StringBuilder()).append("/").append(addr).toString();
+
+        zk.register(new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (event.getType().equals(Watcher.Event.EventType.None)) {
+                    if (event.getState().equals(Watcher.Event.KeeperState.Disconnected)) {
+                        logger.warn("ZK client has been disconnected to the ZK server!");
+                        isSuspended = true;
+                    }
+                } else if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
+                    logger.error("ZK client connection to the ZK server has expired!");
+                    System.exit(1);
+                } else if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
+                    isSuspended = false;
+                }
+            }
+        });
+        final SynchronousQueue<Either<Void, PubSubException>> queue = new SynchronousQueue<Either<Void, PubSubException>>();
+
+        registerWithZookeeper(new Callback<Void>() {
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                logger.error("Failed to register hub with zookeeper", exception);
+                ConcurrencyUtils.put(queue, Either.of((Void) null, exception));
+            }
+
+            @Override
+            public void operationFinished(Object ctx, Void resultOfOperation) {
+                logger.info("Successfully registered hub with zookeeper");
+                ConcurrencyUtils.put(queue, Either.of(resultOfOperation, (PubSubException) null));
+            }
+        }, null);
+
+        PubSubException pse = ConcurrencyUtils.take(queue).right();
+
+        if (pse != null) {
+            throw pse;
+        }
+    }
+
+    void registerWithZookeeper(final Callback<Void> callback, Object ctx) {
+
+        ZkUtils.createFullPathOptimistic(zk, ephemeralNodePath, getCurrentLoadData(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
+
+                    @Override
+                    public void safeProcessResult(int rc, String path, Object ctx, String name) {
+                        if (rc == Code.OK.intValue()) {
+                            callback.operationFinished(ctx, null);
+                            return;
+                        }
+                        if (rc != Code.NODEEXISTS.intValue()) {
+                            KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                    "Could not create ephemeral node to register hub", ephemeralNodePath, rc);
+                            callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                            return;
+                        }
+
+                        logger.info("Found stale ephemeral node while registering hub with ZK, deleting it");
+
+                        // Node exists, lets try to delete it and retry
+                        zk.delete(ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() {
+                            @Override
+                            public void safeProcessResult(int rc, String path, Object ctx) {
+                                if (rc == Code.OK.intValue() || rc == Code.NONODE.intValue()) {
+                                    registerWithZookeeper(callback, ctx);
+                                    return;
+                                }
+                                KeeperException ke = ZkUtils.logErrorAndCreateZKException(
+                                        "Could not delete stale ephemeral node to register hub", ephemeralNodePath, rc);
+                                callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
+                                return;
+
+                            }
+                        }, ctx);
+
+                    }
+                }, null);
+    }
+
+    String hubPath(ByteString topic) {
+        return cfg.getZkTopicPath(new StringBuilder(), topic).append("/hub").toString();
+    }
+
+    @Override
+    protected void realGetOwner(final ByteString topic, final boolean shouldClaim,
+            final Callback<HedwigSocketAddress> cb, final Object ctx) {
+        // If operations are suspended due to a ZK client disconnect, just error
+        // out this call and return.
+        if (isSuspended) {
+            cb.operationFailed(ctx, new PubSubException.ServiceDownException(
+                    "ZKTopicManager service is temporarily suspended!"));
+            return;
+        }
+
+        if (topics.contains(topic)) {
+            cb.operationFinished(ctx, addr);
+            return;
+        }
+
+        new ZkGetOwnerOp(topic, shouldClaim, cb, ctx).read();
+    }
+
+    // Recursively call each other.
+    class ZkGetOwnerOp {
+        ByteString topic;
+        boolean shouldClaim;
+        Callback<HedwigSocketAddress> cb;
+        Object ctx;
+        String hubPath;
+
+        public ZkGetOwnerOp(ByteString topic, boolean shouldClaim, Callback<HedwigSocketAddress> cb, Object ctx) {
+            this.topic = topic;
+            this.shouldClaim = shouldClaim;
+            this.cb = cb;
+            this.ctx = ctx;
+            hubPath = hubPath(topic);
+
+        }
+
+        public void choose() {
+            // Get the list of existing hosts
+            String registeredHostsPath = cfg.getZkHostsPrefix(new StringBuilder()).toString();
+            zk.getChildren(registeredHostsPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx, List<String> children) {
+                    if (rc != Code.OK.intValue()) {
+                        KeeperException e = ZkUtils.logErrorAndCreateZKException(
+                                "Could not get list of available hubs", path, rc);
+                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                        return;
+                    }
+                    chooseLeastLoadedNode(children);
+                }
+            }, null);
+        }
+
+        public void chooseLeastLoadedNode(final List<String> children) {
+            DataCallback dataCallback = new DataCallback() {
+                int numResponses = 0;
+                int minLoad = Integer.MAX_VALUE;
+                String leastLoaded = null;
+
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+                    synchronized (this) {
+                        if (rc == KeeperException.Code.OK.intValue()) {
+                            try {
+                                int load = Integer.parseInt(new String(data));
+                                if (logger.isDebugEnabled()){
+                                	logger.debug("Found server: " + ctx + " with load: " + load);
+                                }
+                                if (load < minLoad  || (load == minLoad && rand.nextBoolean())) {
+                                    minLoad = load;
+                                    leastLoaded = (String) ctx;
+                                }
+                            } catch (NumberFormatException e) {
+                                logger.warn("Corrupted load information from hub:" + ctx);
+                                // some corrupted data, we'll just ignore this
+                                // hub
+                            }
+                        }
+                        numResponses++;
+
+                        if (numResponses == children.size()) {
+                            if (leastLoaded == null) {
+                                cb.operationFailed(ZkGetOwnerOp.this.ctx, new PubSubException.ServiceDownException(
+                                        "No hub available"));
+                                return;
+                            }
+                            HedwigSocketAddress owner = new HedwigSocketAddress(leastLoaded);
+                            if (owner.equals(addr)) {
+                                claim();
+                            } else {
+                                cb.operationFinished(ZkGetOwnerOp.this.ctx, owner);
+                            }
+                        }
+                    }
+
+                }
+            };
+
+            for (String child : children) {
+                zk.getData(cfg.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString(), false,
+                        dataCallback, child);
+            }
+        }
+
+        public void claimOrChoose() {
+            if (shouldClaim)
+                claim();
+            else
+                choose();
+        }
+
+        public void read() {
+            zk.getData(hubPath, false, new SafeAsyncZKCallback.DataCallback() {
+                @Override
+                public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+
+                    if (rc == Code.NONODE.intValue()) {
+                        claimOrChoose();
+                        return;
+                    }
+
+                    if (rc != Code.OK.intValue()) {
+                        KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read ownership for topic: "
+                                + topic.toStringUtf8(), path, rc);
+                        cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                        return;
+                    }
+
+                    // successfully did a read
+                    HedwigSocketAddress owner = new HedwigSocketAddress(new String(data));
+                    if (!owner.equals(addr)) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("topic: " + topic.toStringUtf8() + " belongs to someone else: " + owner);
+                        }
+                        cb.operationFinished(ctx, owner);
+                        return;
+                    }
+
+                    logger.info("Discovered stale self-node for topic: " + topic.toStringUtf8() + ", will delete it");
+
+                    // we must have previously failed and left a
+                    // residual ephemeral node here, so we must
+                    // delete it (clean it up) and then
+                    // re-create/re-acquire the topic.
+                    zk.delete(hubPath, stat.getVersion(), new VoidCallback() {
+                        @Override
+                        public void processResult(int rc, String path, Object ctx) {
+                            if (Code.OK.intValue() == rc || Code.NONODE.intValue() == rc) {
+                                claimOrChoose();
+                            } else {
+                                KeeperException e = ZkUtils.logErrorAndCreateZKException(
+                                        "Could not delete self node for topic: " + topic.toStringUtf8(), path, rc);
+                                cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                            }
+                        }
+                    }, ctx);
+                }
+            }, ctx);
+        }
+
+        public void claim() {
+            if (logger.isDebugEnabled()) {
+                logger.debug("claiming topic: " + topic.toStringUtf8());
+            }
+
+            ZkUtils.createFullPathOptimistic(zk, hubPath, addr.toString().getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
+
+                        @Override
+                        public void safeProcessResult(int rc, String path, Object ctx, String name) {
+                            if (rc == Code.OK.intValue()) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("claimed topic: " + topic.toStringUtf8());
+                                }
+                                notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
+                                updateLoadInformation();
+                            } else if (rc == Code.NODEEXISTS.intValue()) {
+                                read();
+                            } else {
+                                KeeperException e = ZkUtils.logErrorAndCreateZKException(
+                                        "Failed to create ephemeral node to claim ownership of topic: "
+                                                + topic.toStringUtf8(), path, rc);
+                                cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                            }
+                        }
+                    }, ctx);
+        }
+
+    }
+
+    byte[] getCurrentLoadData() {
+        // For now, using the number of topics as an indicator of load
+        // information
+        return (topics.size() + "").getBytes();
+    }
+
+    void updateLoadInformation() {
+    	byte[] currentLoad = getCurrentLoadData();
+    	if (logger.isDebugEnabled()){
+    		logger.debug("Reporting load of " + new String(currentLoad));
+    	}
+        zk.setData(ephemeralNodePath, currentLoad, -1, loadReportingStatCallback, null);
+    }
+
+    @Override
+    protected void postReleaseCleanup(final ByteString topic, final Callback<Void> cb, Object ctx) {
+
+        zk.getData(hubPath(topic), false, new SafeAsyncZKCallback.DataCallback() {
+            @Override
+            public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+                if (rc == Code.NONODE.intValue()) {
+                    // Node has somehow disappeared from under us, live with it
+                    // since its a transient node
+                    logger.warn("While deleting self-node for topic: " + topic.toStringUtf8() + ", node not found");
+                    cb.operationFinished(ctx, null);
+                    return;
+                }
+
+                if (rc != Code.OK.intValue()) {
+                    KeeperException e = ZkUtils.logErrorAndCreateZKException(
+                            "Failed to delete self-ownership node for topic: " + topic.toStringUtf8(), path, rc);
+                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                    return;
+                }
+
+                HedwigSocketAddress owner = new HedwigSocketAddress(new String(data));
+                if (!owner.equals(addr)) {
+                    logger.warn("Wanted to delete self-node for topic: " + topic.toStringUtf8() + " but node for "
+                            + owner + " found, leaving untouched");
+                    // Not our node, someone else's, leave it alone
+                    cb.operationFinished(ctx, null);
+                    return;
+                }
+
+                zk.delete(path, stat.getVersion(), new SafeAsyncZKCallback.VoidCallback() {
+                    @Override
+                    public void safeProcessResult(int rc, String path, Object ctx) {
+                        if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
+                            KeeperException e = ZkUtils
+                                    .logErrorAndCreateZKException("Failed to delete self-ownership node for topic: "
+                                            + topic.toStringUtf8(), path, rc);
+                            cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                            return;
+                        }
+
+                        cb.operationFinished(ctx, null);
+                    }
+                }, ctx);
+            }
+        }, ctx);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsynBKCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.zookeeper;
+
+import java.util.Enumeration;
+
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+
+
+public class SafeAsynBKCallback extends SafeAsyncCallback{
+
+    public static abstract class OpenCallback implements AsyncCallback.OpenCallback {
+        @Override
+        public void openComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
+            try{
+                safeOpenComplete(rc, ledgerHandle, ctx);
+            }catch(Throwable t){
+                invokeUncaughtExceptionHandler(t);
+            }
+        }
+        
+        public abstract void safeOpenComplete(int rc, LedgerHandle ledgerHandle, Object ctx);
+
+    }
+    
+    public static abstract class CloseCallback implements AsyncCallback.CloseCallback {
+        @Override
+        public void closeComplete(int rc, LedgerHandle ledgerHandle, Object ctx){
+            try{
+                safeCloseComplete(rc, ledgerHandle, ctx);
+            }catch(Throwable t){
+                invokeUncaughtExceptionHandler(t);
+            }
+        }
+        
+        public abstract void safeCloseComplete(int rc, LedgerHandle ledgerHandle, Object ctx) ;
+    }
+    
+    public static abstract class ReadCallback implements AsyncCallback.ReadCallback {
+        
+        @Override
+        public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+            try{
+                safeReadComplete(rc, lh, seq, ctx);
+            }catch(Throwable t){
+                invokeUncaughtExceptionHandler(t);
+            }
+            
+        }
+        
+        public abstract void safeReadComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx);
+    }
+    
+    public static abstract class CreateCallback implements AsyncCallback.CreateCallback {
+        
+        @Override
+        public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+            try{
+                safeCreateComplete(rc, lh, ctx);
+            }catch(Throwable t){
+                invokeUncaughtExceptionHandler(t);
+            }
+            
+        }
+        
+        public abstract void safeCreateComplete(int rc, LedgerHandle lh, Object ctx);
+        
+        
+    }
+    
+    public static abstract class AddCallback implements AsyncCallback.AddCallback {
+        
+        @Override
+        public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+            try{
+                safeAddComplete(rc, lh, entryId, ctx);
+            }catch(Throwable t){
+                invokeUncaughtExceptionHandler(t);
+            }
+        }
+        
+        public abstract void safeAddComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
+        
+    }
+    
+}
+

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.zookeeper;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
+
+public class SafeAsyncCallback {
+    protected static UncaughtExceptionHandler uncaughtExceptionHandler = new TerminateJVMExceptionHandler();
+
+    public static void setUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) {
+        SafeAsyncCallback.uncaughtExceptionHandler = uncaughtExceptionHandler;
+    }
+
+    static void invokeUncaughtExceptionHandler(Throwable t) {
+        Thread thread = Thread.currentThread();
+        uncaughtExceptionHandler.uncaughtException(thread, t);
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/SafeAsyncZKCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.zookeeper;
+
+import java.util.List;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class SafeAsyncZKCallback extends SafeAsyncCallback{
+    public static abstract class StatCallback implements AsyncCallback.StatCallback {
+        public void processResult(int rc, String path, Object ctx, Stat stat) {
+            try {
+                safeProcessResult(rc, path, ctx, stat);
+            } catch (Throwable t) {
+                invokeUncaughtExceptionHandler(t);
+            }
+        }
+
+        public abstract void safeProcessResult(int rc, String path, Object ctx, Stat stat);
+    }
+
+    public static abstract class DataCallback implements AsyncCallback.DataCallback {
+        public void processResult(int rc, String path, Object ctx, byte data[], Stat stat) {
+            try {
+                safeProcessResult(rc, path, ctx, data, stat);
+            } catch (Throwable t) {
+                invokeUncaughtExceptionHandler(t);
+            }
+        }
+
+        public abstract void safeProcessResult(int rc, String path, Object ctx, byte data[], Stat stat);
+    }
+
+    public static abstract class ACLCallback implements AsyncCallback.ACLCallback {
+        public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
+            try {
+                safeProcessResult(rc, path, ctx, acl, stat);
+            } catch (Throwable t) {
+                invokeUncaughtExceptionHandler(t);
+            }
+        }
+
+        public abstract void safeProcessResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat);
+    }
+
+    public static abstract class ChildrenCallback implements AsyncCallback.ChildrenCallback {
+        public void processResult(int rc, String path, Object ctx, List<String> children) {
+            try {
+                safeProcessResult(rc, path, ctx, children);
+            } catch (Throwable t) {
+                invokeUncaughtExceptionHandler(t);
+            }
+        }
+
+        public abstract void safeProcessResult(int rc, String path, Object ctx, List<String> children);
+    }
+
+    public static abstract class StringCallback implements AsyncCallback.StringCallback {
+        public void processResult(int rc, String path, Object ctx, String name) {
+            try {
+                safeProcessResult(rc, path, ctx, name);
+            } catch (Throwable t) {
+                invokeUncaughtExceptionHandler(t);
+            }
+        }
+
+        public abstract void safeProcessResult(int rc, String path, Object ctx, String name);
+    }
+
+    public static abstract class VoidCallback implements AsyncCallback.VoidCallback {
+        public void processResult(int rc, String path, Object ctx) {
+            try {
+                safeProcessResult(rc, path, ctx);
+            } catch (Throwable t) {
+                invokeUncaughtExceptionHandler(t);
+            }
+        }
+
+        public abstract void safeProcessResult(int rc, String path, Object ctx);
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/zookeeper/ZkUtils.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.zookeeper;
+
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.ACL;
+
+import org.apache.hedwig.util.PathUtils;
+
+public class ZkUtils {
+
+    static Logger logger = Logger.getLogger(ZkUtils.class);
+
+    public static void createFullPathOptimistic(final ZooKeeper zk, final String originalPath, final byte[] data,
+            final List<ACL> acl, final CreateMode createMode, final AsyncCallback.StringCallback callback,
+            final Object ctx) {
+
+        zk.create(originalPath, data, acl, createMode, new SafeAsyncZKCallback.StringCallback() {
+            @Override
+            public void safeProcessResult(int rc, String path, Object ctx, String name) {
+
+                if (rc != Code.NONODE.intValue()) {
+                    callback.processResult(rc, path, ctx, name);
+                    return;
+                }
+
+                // Since I got a nonode, it means that my parents don't exist
+                // create mode is persistent since ephemeral nodes can't be
+                // parents
+                ZkUtils.createFullPathOptimistic(zk, PathUtils.parent(originalPath), new byte[0], acl,
+                        CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {
+
+                            @Override
+                            public void safeProcessResult(int rc, String path, Object ctx, String name) {
+                                if (rc == Code.OK.intValue() || rc == Code.NODEEXISTS.intValue()) {
+                                    // succeeded in creating the parent, now
+                                    // create the original path
+                                    ZkUtils.createFullPathOptimistic(zk, originalPath, data, acl, createMode, callback,
+                                            ctx);
+                                } else {
+                                    callback.processResult(rc, path, ctx, name);
+                                }
+                            }
+                        }, ctx);
+            }
+        }, ctx);
+
+    }
+
+    public static KeeperException logErrorAndCreateZKException(String msg, String path, int rc) {
+        KeeperException ke = KeeperException.create(Code.get(rc), path);
+        logger.error(msg + ",zkPath: " + path, ke);
+        return ke;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/resources/p12.pass
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/resources/p12.pass?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/resources/p12.pass (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/resources/p12.pass Thu Aug 19 21:25:13 2010
@@ -0,0 +1 @@
+eUySvp2phM2Wk

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/HelperMethods.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/HelperMethods.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/HelperMethods.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/HelperMethods.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+public class HelperMethods {
+    static Random rand = new Random();
+
+    public static List<Message> getRandomPublishedMessages(int numMessages, int size) {
+        ByteString[] regions = { ByteString.copyFromUtf8("sp1"), ByteString.copyFromUtf8("re1"),
+                ByteString.copyFromUtf8("sg") };
+        return getRandomPublishedMessages(numMessages, size, regions);
+    }
+
+    public static List<Message> getRandomPublishedMessages(int numMessages, int size, ByteString[] regions) {
+        List<Message> msgs = new ArrayList<Message>();
+        for (int i = 0; i < numMessages; i++) {
+            byte[] body = new byte[size];
+            rand.nextBytes(body);
+            msgs.add(Message.newBuilder().setBody(ByteString.copyFrom(body)).setSrcRegion(
+                    regions[rand.nextInt(regions.length)]).build());
+        }
+        return msgs;
+    }
+
+    public static boolean areEqual(Message m1, Message m2) {
+        if (m1.hasSrcRegion() != m2.hasSrcRegion()) {
+            return false;
+        }
+        if (m1.hasSrcRegion() && !m1.getSrcRegion().equals(m2.getSrcRegion())) {
+            return false;
+        }
+        return m1.getBody().equals(m2.getBody());
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControl.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControl.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControl.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControl.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig;
+
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.hedwig.server.netty.PubSubServer;
+import java.net.ConnectException;
+import java.io.File;
+import java.io.IOException;
+
+public class ServerControl {
+    public class TestException extends Exception {
+	public TestException(String str) {
+	    super(str);
+	}
+    };
+
+    public interface TestServer {
+	public String getAddress();
+	public void kill();
+    }
+
+    private class BookKeeperServer extends BookieServer implements TestServer {
+	private String address;
+
+	public BookKeeperServer(int port, TestServer zkserver, String journal, String ledger) throws IOException {
+	    super(port, zkserver.getAddress(), new File(journal), new File[] { new File(ledger) });
+	    
+	    address = "localhost:"+port;
+	    start();
+	} 
+
+	public String getAddress() {
+	    return address;
+	}
+	
+	public void kill() {
+	    try {
+		shutdown();
+	    } catch (Exception e) {
+	    }
+	}
+    }
+
+    private class ZookeeperServer extends ZooKeeperServerMain implements TestServer {
+	public String address;
+	public Thread serverThread;
+	String path;
+	public ZookeeperServer(int port, String path) throws TestException {
+	    super(); 
+
+	    this.path = path;
+	    final String[] args = { Integer.toString(port), path};
+	    address = "localhost:" + port;
+	    serverThread = new Thread() {
+		    public void run() {
+			try {
+			    initializeAndRun(args);
+			} catch (Exception e) {
+			}
+		    };
+		};
+	    serverThread.start();
+	}
+
+	public String getAddress() {
+	    return address;
+	}
+
+	public void kill() {
+	    shutdown();
+	    serverThread.interrupt();
+	}
+    }
+
+    private class HedwigServer implements TestServer {
+	private PubSubServer server;
+	private String address;
+
+	public HedwigServer(int port, String region, TestServer zk) throws TestException {
+	    class MyServerConfiguration extends ServerConfiguration {
+		MyServerConfiguration(int port, TestServer zk, String region) {
+		    conf.setProperty(ServerConfiguration.SERVER_PORT, port);
+		    conf.setProperty(ServerConfiguration.ZK_HOST, zk.getAddress());
+		    conf.setProperty(ServerConfiguration.REGION, region);
+		}
+	    };
+	    
+	    address = "localhost:" + port;
+	    
+	    try {
+		server = new PubSubServer(new MyServerConfiguration(port, zk, region));
+	    } catch (Exception e) {
+		throw new TestException("Couldn't create pub sub server : " + e);
+	    }
+	}
+
+	public String getAddress() {
+	    return address;
+	}
+
+	public void kill() {
+	    server.shutdown();
+	}
+    }
+
+    private String createTempDirectory(String suffix) throws IOException {
+	String dir = System.getProperty("java.io.tmpdir") + File.separator + System.currentTimeMillis() + suffix;
+	final File dirf = new File(dir);
+	boolean good = dirf.mkdir();
+	if (!good) {
+	    throw new IOException("Unable to create directory " + dir);
+	}
+	
+	Runtime.getRuntime().addShutdownHook(new Thread() {
+		public void delete(File f) {
+		    File[] subfiles = f.listFiles();
+		    if (subfiles != null) {
+			for (File subf : subfiles) {
+			    delete(subf);
+			}
+		    }
+		    f.delete();
+		}
+
+		public void run() {
+		    delete(dirf);
+		}
+	    });
+	return dir;
+    }
+
+    public TestServer startZookeeperServer(int port) throws IOException, TestException {
+	String dir = createTempDirectory("-zookeeper-" + port);
+	ZookeeperServer server =  new ZookeeperServer(port, dir);
+	
+	return server;
+    }
+    
+    public TestServer startBookieServer(int port, TestServer zookeeperServer) throws IOException, TestException {
+	int tries = 4;
+	while (true) {
+	    try {
+		tries--;
+		ZooKeeper zk = new ZooKeeper(zookeeperServer.getAddress(), 1000, new Watcher() { public void process(WatchedEvent event) { /* do nothing */ } });
+		if (zk.exists("/ledgers/available", false) == null) {
+		    byte[] data = new byte[1];
+		    data[0] = 0;
+		    zk.create("/ledgers", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+		    zk.create("/ledgers/available", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+		}
+		zk.close();
+		break;
+	    } catch (KeeperException.ConnectionLossException ce) {
+		if (tries > 0) {
+		    try { 
+			Thread.sleep(3);
+		    } catch (Exception e) {
+			throw new TestException("Can't even sleep. Fix your machine: " + e);
+		    }
+		    continue;
+		} else {
+		    throw new TestException("Error connecting to zookeeper: " + ce);
+		}
+	    } catch (Exception e) {
+		throw new TestException("Error initialising bookkeeper ledgers: " +  e);
+	    } 
+	}
+	String journal = createTempDirectory("-bookie-" + port + "-journal");
+	String ledger = createTempDirectory("-bookie-" + port + "-ledger");
+	System.out.println(journal);
+	BookKeeperServer bookie = new BookKeeperServer(port, zookeeperServer, journal, ledger);
+	return bookie;
+    }
+    
+    public TestServer startPubSubServer(int port, String region, TestServer zookeeperServer) throws IOException, TestException {
+	return new HedwigServer(port, region, zookeeperServer);
+    }    
+
+    public ServerControl() {
+    }
+
+    public static void main(String[] args) throws Exception {
+	ServerControl control = new ServerControl();
+
+	TestServer zk = control.startZookeeperServer(12345);
+	TestServer bk1 = control.startBookieServer(12346, zk);
+	TestServer bk2 = control.startBookieServer(12347, zk);
+	TestServer bk3 = control.startBookieServer(12348, zk);
+
+	TestServer hw1 = control.startPubSubServer(12349, "foobar", zk);
+	TestServer hw2 = control.startPubSubServer(12350, "foobar", zk);
+	TestServer hw3 = control.startPubSubServer(12351, "foobar", zk);
+	TestServer hw4 = control.startPubSubServer(12352, "barfoo", zk);
+	System.out.println("Started " + zk.getAddress());
+	System.out.println("Sleeping for 10 seconds");
+	Thread.sleep(10000);
+	bk3.kill();
+	bk2.kill();
+	bk1.kill();
+	zk.kill();
+	hw1.kill();
+	hw2.kill();
+	hw3.kill();
+	hw4.kill();
+    }
+}
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControlDaemon.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControlDaemon.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControlDaemon.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/ServerControlDaemon.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import org.apache.log4j.Logger;
+
+
+import org.jboss.netty.channel.Channel;  
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.string.StringEncoder;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+
+import java.util.HashMap;
+
+public class ServerControlDaemon {
+    private static final Logger LOG =
+        Logger.getLogger(ServerControlDaemon.class);
+
+    @ChannelPipelineCoverage("all")
+    public static class ServerControlDaemonHandler extends SimpleChannelUpstreamHandler{
+	private ServerControl control;
+	private HashMap<Channel, HashMap<String, ServerControl.TestServer>> serverMap;
+	
+	public ServerControlDaemonHandler() {
+	    serverMap = new HashMap<Channel, HashMap<String, ServerControl.TestServer>>();
+	    control = new ServerControl();
+	}
+
+	private void addServerForChannel(Channel c, ServerControl.TestServer t) {
+	    LOG.info("Created server " + t.getAddress());
+	    HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+	    if (map == null) {
+		map = new HashMap<String, ServerControl.TestServer>();
+		serverMap.put(c, map);
+	    }
+	    map.put(t.getAddress(), t);	    
+	}
+	
+	private void killServerForChannel(Channel c, String name) {
+	    LOG.info("Killing server " + name);
+	    HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+	    ServerControl.TestServer t = map.get(name);
+	    map.remove(name);
+	    t.kill();
+	}
+
+	private ServerControl.TestServer lookupServer(Channel c, String name) {
+	    HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+	    return map.get(name);
+	}
+	
+	private void clearServersForChannel(Channel c) {
+	    HashMap<String, ServerControl.TestServer> map = serverMap.get(c);
+	    serverMap.remove(map);
+	    
+	    for (ServerControl.TestServer t : map.values()) {
+		t.kill();
+	    }
+	    map.clear();
+	}
+
+	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+	    try {
+		String command = (String)e.getMessage();
+		LOG.info("Command: " + command);
+		String[] args = command.split("\\s+");
+
+		if (args[0].equals("START")) {
+		    ServerControl.TestServer t = null;
+		    if (args[1].equals("ZOOKEEPER")) {
+			t = control.startZookeeperServer(Integer.valueOf(args[2]));
+			addServerForChannel(ctx.getChannel(), t);
+		    } else if (args[1].equals("BOOKKEEPER")) {
+			ServerControl.TestServer zk = lookupServer(ctx.getChannel(), args[3]);
+			t = control.startBookieServer(Integer.valueOf(args[2]), zk);
+			addServerForChannel(ctx.getChannel(), t);
+		    } else if (args[1].equals("HEDWIG")) {
+			ServerControl.TestServer zk = lookupServer(ctx.getChannel(), args[4]);
+			t = control.startPubSubServer(Integer.valueOf(args[2]), args[3], zk);
+			addServerForChannel(ctx.getChannel(), t);
+		    }
+
+		    ctx.getChannel().write("OK " + t.getAddress() + "\n");
+		} else if (args[0].equals("KILL")) {
+		    killServerForChannel(ctx.getChannel(), args[1]);
+		    ctx.getChannel().write("OK Killed " + args[1] + "\n");
+		} else {
+		    ctx.getChannel().write("ERR Bad Command\n");
+		}
+	    } catch (Exception ex) {
+		ctx.getChannel().write("ERR " + ex.toString() + "\n");
+	    }
+	}
+	
+	public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+	    clearServersForChannel(ctx.getChannel());
+	}
+    }
+
+    public static void main(String[] args) throws Exception {
+	// Configure the server.
+	int port = 5672;
+	if (args.length == 1) {
+	    port = Integer.valueOf(args[0]); 
+	}
+	ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+											  Executors.newCachedThreadPool()));
+	// Set up the pipeline factory.
+	bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+		public ChannelPipeline getPipeline() throws Exception {
+		    ChannelPipeline p = Channels.pipeline();
+		    p.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));
+		    p.addLast("stringDecoder", new StringDecoder("UTF-8"));
+		    
+		    // Encoder
+		    p.addLast("stringEncoder", new StringEncoder("UTF-8"));
+		    p.addLast("handler", new ServerControlDaemonHandler());
+		    
+		    return p;
+		}
+	    });
+	
+	LOG.info("Listening on localhost:"+port);
+	// Bind and start to accept incoming connections.
+	bootstrap.bind(new InetSocketAddress(port));
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/StubCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/StubCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/StubCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/StubCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig;
+
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
+import org.apache.hedwig.util.Callback;
+
+public class StubCallback<T> implements Callback<T> {
+
+    public SynchronousQueue<Either<T, PubSubException>> queue = new SynchronousQueue<Either<T, PubSubException>>();
+
+    public void operationFailed(Object ctx, final PubSubException exception) {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                ConcurrencyUtils.put(queue, Either.of((T) null, exception));
+
+            }
+        }).start();
+
+    }
+
+    public void operationFinished(Object ctx, final T resultOfOperation) {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                ConcurrencyUtils.put(queue, Either.of(resultOfOperation, (PubSubException) null));
+
+            }
+        }).start();
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/StubScanCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/StubScanCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/StubScanCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/StubScanCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.server.persistence.ScanCallback;
+
+public class StubScanCallback implements ScanCallback {
+    List<Message> messages = new ArrayList<Message>();
+    boolean success = false, failed = false;
+
+    public void messageScanned(Object ctx, Message message) {
+        messages.add(message);
+        success = true;
+    }
+
+    public void scanFailed(Object ctx, Exception exception) {
+        failed = true;
+        success = false;
+    }
+
+    public void scanFinished(Object ctx, ReasonForFinish reason) {
+        success = true;
+        failed = false;
+    }
+
+    public List<Message> getMessages() {
+        return messages;
+    }
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public boolean isFailed() {
+        return failed;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/client/TestPubSubClient.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client;
+
+import java.util.concurrent.SynchronousQueue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigPublisher;
+import org.apache.hedwig.client.netty.HedwigSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.server.PubSubServerStandAloneTestBase;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+
+public class TestPubSubClient extends PubSubServerStandAloneTestBase {
+
+    // Client side variables
+    protected HedwigClient client;
+    protected HedwigPublisher publisher;
+    protected HedwigSubscriber subscriber;
+
+    // SynchronousQueues to verify async calls
+    private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
+    private final SynchronousQueue<Boolean> consumeQueue = new SynchronousQueue<Boolean>();
+
+    // Test implementation of Callback for async client actions.
+    class TestCallback implements Callback<Void> {
+
+        @Override
+        public void operationFinished(Object ctx, Void resultOfOperation) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    if (logger.isDebugEnabled())
+                        logger.debug("Operation finished!");
+                    ConcurrencyUtils.put(queue, true);
+                }
+            }).start();
+        }
+
+        @Override
+        public void operationFailed(Object ctx, final PubSubException exception) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    logger.error("Operation failed!", exception);
+                    ConcurrencyUtils.put(queue, false);
+                }
+            }).start();
+        }
+    }
+
+    // Test implementation of subscriber's message handler.
+    class TestMessageHandler implements MessageHandler {
+        public void consume(ByteString topic, ByteString subscriberId, Message msg, Callback<Void> callback,
+                Object context) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    if (logger.isDebugEnabled())
+                        logger.debug("Consume operation finished successfully!");
+                    ConcurrencyUtils.put(consumeQueue, true);
+                }
+            }).start();
+            callback.operationFinished(context, null);
+        }
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        client = new HedwigClient(new ClientConfiguration());
+        publisher = client.getPublisher();
+        subscriber = client.getSubscriber();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        client.stop();
+        super.tearDown();
+    }
+
+    @Test
+    public void testSyncPublish() throws Exception {
+        boolean publishSuccess = true;
+        try {
+            publisher.publish(ByteString.copyFromUtf8("mySyncTopic"), Message.newBuilder().setBody(
+                    ByteString.copyFromUtf8("Hello Sync World!")).build());
+        } catch (Exception e) {
+            publishSuccess = false;
+        }
+        assertTrue(publishSuccess);
+    }
+
+    @Test
+    public void testAsyncPublish() throws Exception {
+        publisher.asyncPublish(ByteString.copyFromUtf8("myAsyncTopic"), Message.newBuilder().setBody(
+                ByteString.copyFromUtf8("Hello Async World!")).build(), new TestCallback(), null);
+        assertTrue(queue.take());
+    }
+
+    @Test
+    public void testMultipleAsyncPublish() throws Exception {
+        ByteString topic1 = ByteString.copyFromUtf8("myFirstTopic");
+        ByteString topic2 = ByteString.copyFromUtf8("myNewTopic");
+
+        publisher.asyncPublish(topic1, Message.newBuilder().setBody(ByteString.copyFromUtf8("Hello World!")).build(),
+                new TestCallback(), null);
+        assertTrue(queue.take());
+        publisher.asyncPublish(topic2, Message.newBuilder().setBody(ByteString.copyFromUtf8("Hello on new topic!"))
+                .build(), new TestCallback(), null);
+        assertTrue(queue.take());
+        publisher.asyncPublish(topic1, Message.newBuilder().setBody(
+                ByteString.copyFromUtf8("Hello Again on old topic!")).build(), new TestCallback(), null);
+        assertTrue(queue.take());
+    }
+
+    @Test
+    public void testSyncSubscribe() throws Exception {
+        boolean subscribeSuccess = true;
+        try {
+            subscriber.subscribe(ByteString.copyFromUtf8("mySyncSubscribeTopic"), ByteString.copyFromUtf8("1"), CreateOrAttach.CREATE_OR_ATTACH);
+        } catch (Exception e) {
+            subscribeSuccess = false;
+        }
+        assertTrue(subscribeSuccess);
+    }
+
+    @Test
+    public void testAsyncSubscribe() throws Exception {
+        subscriber.asyncSubscribe(ByteString.copyFromUtf8("myAsyncSubscribeTopic"), ByteString.copyFromUtf8("1"),
+                CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
+        assertTrue(queue.take());
+    }
+
+    @Test
+    public void testSubscribeAndConsume() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("myConsumeTopic");
+        ByteString subscriberId = ByteString.copyFromUtf8("1");
+        subscriber.asyncSubscribe(topic, subscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
+        assertTrue(queue.take());
+
+        // Start delivery for the subscriber
+        subscriber.startDelivery(topic, subscriberId, new TestMessageHandler());
+
+        // Now publish some messages for the topic to be consumed by the
+        // subscriber.
+        publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #1")).build(),
+                new TestCallback(), null);
+        assertTrue(queue.take());
+        assertTrue(consumeQueue.take());
+        publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #2")).build(),
+                new TestCallback(), null);
+        assertTrue(queue.take());
+        assertTrue(consumeQueue.take());
+        publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #3")).build(),
+                new TestCallback(), null);
+        assertTrue(queue.take());
+        assertTrue(consumeQueue.take());
+        publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #4")).build(),
+                new TestCallback(), null);
+        assertTrue(queue.take());
+        assertTrue(consumeQueue.take());
+        publisher.asyncPublish(topic, Message.newBuilder().setBody(ByteString.copyFromUtf8("Message #5")).build(),
+                new TestCallback(), null);
+        assertTrue(queue.take());
+        assertTrue(consumeQueue.take());
+    }
+
+    @Test
+    public void testAsyncSubscribeAndUnsubscribe() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("myAsyncUnsubTopic");
+        ByteString subscriberId = ByteString.copyFromUtf8("1");
+        subscriber.asyncSubscribe(topic, subscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
+        assertTrue(queue.take());
+        subscriber.asyncUnsubscribe(topic, subscriberId, new TestCallback(), null);
+        assertTrue(queue.take());
+    }
+
+    @Test
+    public void testSyncUnsubscribeWithoutSubscription() throws Exception {
+        boolean unsubscribeSuccess = false;
+        try {
+            subscriber.unsubscribe(ByteString.copyFromUtf8("mySyncUnsubTopic"), ByteString.copyFromUtf8("1"));
+        } catch (ClientNotSubscribedException e) {
+            unsubscribeSuccess = true;
+        } catch (Exception ex) {
+            unsubscribeSuccess = false;
+        }
+        assertTrue(unsubscribeSuccess);
+    }
+
+    @Test
+    public void testAsyncSubscribeAndCloseSubscription() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("myAsyncSubAndCloseSubTopic");
+        ByteString subscriberId = ByteString.copyFromUtf8("1");
+        subscriber.asyncSubscribe(topic, subscriberId, CreateOrAttach.CREATE_OR_ATTACH, new TestCallback(), null);
+        assertTrue(queue.take());
+        subscriber.closeSubscription(topic, subscriberId);
+        assertTrue(true);
+    }
+
+}
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/HedwigHubTestBase.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.PubSubServer;
+import org.apache.hedwig.server.persistence.BookKeeperTestBase;
+
+/**
+ * This is a base class for any tests that need a Hedwig Hub(s) setup with an
+ * associated BookKeeper and ZooKeeper instance.
+ * 
+ */
+public abstract class HedwigHubTestBase extends TestCase {
+
+    protected static Logger logger = Logger.getLogger(HedwigHubTestBase.class);
+
+    // BookKeeper variables
+    // Default number of bookie servers to setup. Extending classes can
+    // override this.
+    protected int numBookies = 3;
+    protected BookKeeperTestBase bktb;
+
+    // PubSubServer variables
+    // Default number of PubSubServer hubs to setup. Extending classes can
+    // override this.
+    protected int numServers = 1;
+    protected int initialServerPort = 4080;
+    protected int initialSSLServerPort = 9876;
+    protected List<PubSubServer> serversList;
+
+    // Default child class of the ServerConfiguration to be used here.
+    // Extending classes can define their own (possibly extending from this) and
+    // override the getServerConfiguration method below to return their own
+    // configuration.
+    protected class HubServerConfiguration extends ServerConfiguration {
+        private final int serverPort, sslServerPort;
+
+        public HubServerConfiguration(int serverPort, int sslServerPort) {
+            this.serverPort = serverPort;
+            this.sslServerPort = sslServerPort;
+        }
+
+        @Override
+        public int getServerPort() {
+            return serverPort;
+        }
+
+        @Override
+        public int getSSLServerPort() {
+            return sslServerPort;
+        }
+
+        @Override
+        public String getZkHost() {
+            return bktb.getZkHostPort();
+        }
+        
+        @Override
+        public boolean isSSLEnabled() {
+            return true;
+        }
+
+        @Override
+        public String getCertName() {
+            return "/server.p12";
+        }
+
+        @Override
+        public String getPassword() {
+            return "eUySvp2phM2Wk";
+        }
+    }
+
+    // Method to get a ServerConfiguration for the PubSubServers created using
+    // the specified ports. Extending child classes can override this. This
+    // default implementation will return the HubServerConfiguration object
+    // defined above.
+    protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
+        return new HubServerConfiguration(serverPort, sslServerPort);
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        logger.info("STARTING " + getName());
+        bktb = new BookKeeperTestBase(numBookies);
+        bktb.setUp();
+        // Now create the PubSubServer Hubs
+        serversList = new LinkedList<PubSubServer>();
+        for (int i = 0; i < numServers; i++) {
+            serversList.add(new PubSubServer(getServerConfiguration(initialServerPort + i, initialSSLServerPort + i)));
+        }
+        logger.info("HedwigHub test setup finished");
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        logger.info("tearDown starting");
+        // Shutdown all of the PubSubServers
+        for (PubSubServer server : serversList) {
+            server.shutdown();
+        }
+        bktb.tearDown();
+        logger.info("FINISHED " + getName());
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/HedwigRegionTestBase.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.PubSubServer;
+import org.apache.hedwig.server.persistence.BookKeeperTestBase;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+/**
+ * This is a base class for any tests that need a Hedwig Region(s) setup with a
+ * number of Hedwig hubs per region, an associated HedwigClient per region and
+ * the required BookKeeper and ZooKeeper instances.
+ * 
+ */
+public abstract class HedwigRegionTestBase extends TestCase {
+
+    protected static Logger logger = Logger.getLogger(HedwigRegionTestBase.class);
+
+    // BookKeeper variables
+    // Default number of bookie servers to setup. Extending classes
+    // can override this. We should be able to reuse the same BookKeeper
+    // ensemble among all of the regions, at least for unit testing purposes.
+    protected int numBookies = 3;
+    protected BookKeeperTestBase bktb;
+
+    // Hedwig Region variables
+    // Default number of Hedwig Regions to setup. Extending classes can
+    // override this.
+    protected int numRegions = 2;
+    protected int numServersPerRegion = 1;
+    protected int initialServerPort = 4080;
+    protected int initialSSLServerPort = 9876;
+    // Map with keys being Region names and values being the list of Hedwig
+    // Hubs (PubSubServers) for that particular region.
+    protected Map<String, List<PubSubServer>> regionServersMap;
+    // Map with keys being Region names and values being the Hedwig Client
+    // instance.
+    protected Map<String, HedwigClient> regionClientsMap;
+
+    // String constant used as the prefix for the region names.
+    protected static final String REGION_PREFIX = "region";
+
+    // Default child class of the ServerConfiguration to be used here.
+    // Extending classes can define their own (possibly extending from this) and
+    // override the getServerConfiguration method below to return their own
+    // configuration.
+    protected class RegionServerConfiguration extends ServerConfiguration {
+        private final int serverPort, sslServerPort;
+        private final String regionName;
+
+        public RegionServerConfiguration(int serverPort, int sslServerPort, String regionName) {
+            this.serverPort = serverPort;
+            this.sslServerPort = sslServerPort;
+            this.regionName = regionName;
+            setRegionList();
+        }
+
+        protected void setRegionList() {
+            List<String> myRegionList = new LinkedList<String>();
+            for (int i = 0; i < numRegions; i++) {
+                int curDefaultServerPort = initialServerPort + (i * numServersPerRegion);
+                int curDefaultSSLServerPort = initialSSLServerPort + (i * numServersPerRegion);
+                // Add this region default server port if it is for a region
+                // other than its own.
+                if (curDefaultServerPort > serverPort
+                        || Math.abs(serverPort - curDefaultServerPort) >= numServersPerRegion)
+                    myRegionList.add("localhost:" + curDefaultServerPort + ":" + curDefaultSSLServerPort);
+            }
+            regionList = myRegionList;
+        }
+
+        @Override
+        public int getServerPort() {
+            return serverPort;
+        }
+
+        @Override
+        public int getSSLServerPort() {
+            return sslServerPort;
+        }
+
+        @Override
+        public String getZkHost() {
+            return bktb.getZkHostPort();
+        }
+
+        @Override
+        public String getMyRegion() {
+            return regionName;
+        }
+
+        @Override
+        public boolean isSSLEnabled() {
+            return true;
+        }
+
+        @Override
+        public boolean isInterRegionSSLEnabled() {
+            return true;
+        }
+
+        @Override
+        public String getCertName() {
+            return "/server.p12";
+        }
+
+        @Override
+        public String getPassword() {
+            return "eUySvp2phM2Wk";
+        }
+    }
+
+    // Method to get a ServerConfiguration for the PubSubServers created using
+    // the specified ports and region name. Extending child classes can override
+    // this. This default implementation will return the
+    // RegionServerConfiguration object defined above.
+    protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort, String regionName) {
+        return new RegionServerConfiguration(serverPort, sslServerPort, regionName);
+    }
+
+    // Default ClientConfiguration to use. This just points to the first
+    // Hedwig hub server in each region as the "default server host" to connect
+    // to.
+    protected class RegionClientConfiguration extends ClientConfiguration {
+        public RegionClientConfiguration(int serverPort, int sslServerPort) {
+            myDefaultServerAddress = new HedwigSocketAddress("localhost:" + serverPort + ":" + sslServerPort);
+        }
+        // Below you can override any of the default ClientConfiguration
+        // parameters if needed.
+    }
+
+    // Method to get a ClientConfiguration for the HedwigClients created.
+    // Inputs are the default Hedwig hub server's ports to point to.
+    protected ClientConfiguration getClientConfiguration(int serverPort, int sslServerPort) {
+        return new RegionClientConfiguration(serverPort, sslServerPort);
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        logger.info("STARTING " + getName());
+        bktb = new BookKeeperTestBase(numBookies);
+        bktb.setUp();
+
+        // Create the Hedwig PubSubServer Hubs for all of the regions
+        regionServersMap = new HashMap<String, List<PubSubServer>>(numRegions, 1.0f);
+        regionClientsMap = new HashMap<String, HedwigClient>(numRegions, 1.0f);        
+        for (int i = 0; i < numRegions; i++) {
+            List<PubSubServer> serversList = new LinkedList<PubSubServer>();
+            // For the current region, create the necessary amount of hub
+            // servers. We will basically increment through the port numbers
+            // starting from the initial ones defined.
+            for (int j = 0; j < numServersPerRegion; j++) {
+                serversList.add(new PubSubServer(getServerConfiguration(initialServerPort
+                        + (j + i * numServersPerRegion), initialSSLServerPort + (j + i * numServersPerRegion),
+                        REGION_PREFIX + i)));
+            }
+            // Store this list of servers created for the current region
+            regionServersMap.put(REGION_PREFIX + i, serversList);
+
+            // Create a Hedwig Client that points to the first Hub server
+            // created in the loop above for the current region.
+            HedwigClient regionClient = new HedwigClient(getClientConfiguration(initialServerPort
+                    + (i * numServersPerRegion), initialSSLServerPort + (i * numServersPerRegion)));
+            regionClientsMap.put(REGION_PREFIX + i, regionClient);
+        }
+        logger.info("HedwigRegion test setup finished");
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        logger.info("tearDown starting");
+        // Stop all of the HedwigClients for all regions
+        for (HedwigClient client : regionClientsMap.values()) {
+            client.stop();
+        }
+        regionClientsMap.clear();
+        // Shutdown all of the PubSubServers in all regions
+        for (List<PubSubServer> serversList : regionServersMap.values()) {
+            for (PubSubServer server : serversList) {
+                server.shutdown();
+            }
+        }
+        logger.info("Finished shutting down all of the hub servers!");
+        regionServersMap.clear();
+        // Shutdown the BookKeeper and ZooKeeper stuff
+        bktb.tearDown();
+        logger.info("FINISHED " + getName());
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/PubSubServerStandAloneTestBase.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.PubSubServer;
+
+/**
+ * This is a base class for any tests that need a StandAlone PubSubServer setup.
+ */
+public abstract class PubSubServerStandAloneTestBase extends TestCase {
+
+    protected static Logger logger = Logger.getLogger(PubSubServerStandAloneTestBase.class);
+
+    protected class StandAloneServerConfiguration extends ServerConfiguration {
+        @Override
+        public boolean isStandalone() {
+            return true;
+        }
+    }
+
+    protected PubSubServer server;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        logger.info("STARTING " + getName());
+        server = new PubSubServer(new StandAloneServerConfiguration());
+        logger.info("Standalone PubSubServer test setup finished");
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        logger.info("tearDown starting");
+        server.shutdown();
+        logger.info("FINISHED " + getName());
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.server.subscriptions.MessageFilter;
+
+public class StubDeliveryManager implements DeliveryManager {
+
+    public static class StartServingRequest {
+        public ByteString topic;
+        public ByteString subscriberId;
+        public MessageSeqId seqIdToStartFrom;
+        public DeliveryEndPoint endPoint;
+        public MessageFilter filter;
+        public boolean isHubSubscriber;
+
+        public StartServingRequest(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
+                DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
+            this.topic = topic;
+            this.subscriberId = subscriberId;
+            this.seqIdToStartFrom = seqIdToStartFrom;
+            this.endPoint = endPoint;
+            this.filter = filter;
+            this.isHubSubscriber = isHubSubscriber;
+        }
+
+    }
+
+    public Queue<Object> lastRequest = new LinkedList<Object>();
+
+    @Override
+    public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
+            DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
+
+        lastRequest.add(new StartServingRequest(topic, subscriberId, seqIdToStartFrom, endPoint, filter,
+                isHubSubscriber));
+
+    }
+
+    @Override
+    public void stopServingSubscriber(ByteString topic, ByteString subscriberId) {
+        lastRequest.add(new TopicSubscriber(topic, subscriberId));
+    }
+}



Mime
View raw message