incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tof...@apache.org
Subject svn commit: r1221635 [2/2] - in /incubator/hcatalog/trunk: ./ storage-drivers/hbase/ storage-drivers/hbase/if/ storage-drivers/hbase/ivy/ storage-drivers/hbase/src/gen-java/ storage-drivers/hbase/src/gen-java/org/ storage-drivers/hbase/src/gen-java/org...
Date Wed, 21 Dec 2011 07:43:05 GMT
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+class ZKUtil {
+
+    private int              DEFAULT_SESSION_TIMEOUT = 1000000;
+    private ZooKeeper        zkSession;
+    private String           baseDir;
+    private String           connectString;
+    private static final Log LOG = LogFactory.getLog(ZKUtil.class);
+
+    static enum UpdateMode {
+        APPEND, REMOVE, KEEP_ALIVE
+    };
+
+    ZKUtil(String connection, String baseDir) {
+        this.connectString = connection;
+        this.baseDir = baseDir;
+    }
+
+    /**
+     * This method creates znodes related to table.
+     *
+     * @param table The name of the table.
+     * @param families The list of column families of the table.
+     * @throws IOException
+     */
+    void setUpZnodesForTable(String table, List<String> families)
+            throws IOException {
+
+        String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir, table);
+        ensurePathExists(transactionDataTablePath, null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        for (String cf : families) {
+            String runningDataPath = PathUtil.getRunningTxnInfoPath(
+                    this.baseDir, table, cf);
+            ensurePathExists(runningDataPath, null, Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            String abortDataPath = PathUtil.getAbortInformationPath(
+                    this.baseDir, table, cf);
+            ensurePathExists(abortDataPath, null, Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        }
+
+    }
+
+    /**
+     * This method ensures that a given path exists in zookeeper. If the path
+     * does not exists, it creates one.
+     *
+     * @param path The path of znode that is required to exist.
+     * @param data The data to be associated with the znode.
+     * @param acl The ACLs required.
+     * @param flags The CreateMode for the znode.
+     * @throws IOException
+     */
+    void ensurePathExists(String path, byte[] data, List<ACL> acl,
+            CreateMode flags) throws IOException {
+        String[] dirs = path.split("/");
+        String parentPath = "";
+        for (String subDir : dirs) {
+            if (subDir.equals("") == false) {
+                parentPath = parentPath + "/" + subDir;
+                try {
+                    Stat stat = getSession().exists(parentPath, false);
+                    if (stat == null) {
+                        getSession().create(parentPath, data, acl, flags);
+                    }
+                } catch (Exception e) {
+                    throw new IOException("Exception while creating path "
+                            + parentPath, e);
+                }
+            }
+        }
+
+    }
+
+    /**
+     * This method returns a list of columns of a table which were used in any
+     * of the transactions.
+     *
+     * @param tableName The name of table.
+     * @return List<String> The list of column families in table.
+     * @throws IOException
+     */
+    List<String> getColumnFamiliesOfTable(String tableName) throws IOException {
+        String path = PathUtil.getTxnDataPath(baseDir, tableName);
+        List<String> children = null;
+        List<String> columnFamlies = new ArrayList<String>();
+        try {
+            children = getSession().getChildren(path, false);
+        } catch (KeeperException e) {
+            LOG.warn("Caught: ", e);
+            throw new IOException("Exception while obtaining columns of table.",e);
+        } catch (InterruptedException e) {
+            LOG.warn("Caught: ", e);
+            throw new IOException("Exception while obtaining columns of table.",e);
+        }
+
+        for (String child : children) {
+            if ((child.contains("idgen") == false)
+                    && (child.contains("_locknode_") == false)) {
+                columnFamlies.add(child);
+            }
+        }
+        return columnFamlies;
+    }
+
+    /**
+     * This method returns a time stamp for use by the transactions.
+     *
+     * @return long The current timestamp in zookeeper.
+     * @throws IOException
+     */
+    long getTimeStamp() throws IOException {
+        long timeStamp;
+        Stat stat;
+        String clockPath = PathUtil.getClockPath(this.baseDir);
+        ensurePathExists(clockPath, null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        try {
+            getSession().exists(clockPath, false);
+            stat = getSession().setData(clockPath, null, -1);
+
+        } catch (KeeperException e) {
+            LOG.warn("Caught: ", e);
+            throw new IOException("Exception while obtaining timestamp ", e);
+        } catch (InterruptedException e) {
+            LOG.warn("Caught: ", e);
+            throw new IOException("Exception while obtaining timestamp ", e);
+        }
+        timeStamp = stat.getMtime();
+        return timeStamp;
+    }
+
+    /**
+     * This method returns the next revision number to be used for any
+     * transaction purposes.
+     *
+     * @param tableName The name of the table.
+     * @return revision number The revision number last used by any transaction.
+     * @throws IOException
+     */
+    long nextId(String tableName) throws IOException {
+        String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
+        ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        String lockNode = PathUtil.getLockManagementNode(idNode);
+        ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
+        long id = idf.obtainID();
+        return id;
+    }
+
+    /**
+     * The latest used revision id of the table.
+     *
+     * @param tableName The name of the table.
+     * @return the long The revision number to use by any transaction.
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    long currentID(String tableName) throws IOException{
+        String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
+        ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        String lockNode = PathUtil.getLockManagementNode(idNode);
+        ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
+        long id = idf.readID();
+        return id;
+    }
+
+    /**
+     * This methods retrieves the list of transaction information associated
+     * with each column/column family of a table.
+     *
+     * @param path The znode path
+     * @return List of FamilyRevision The list of transactions in the given path.
+     * @throws IOException
+     */
+    List<FamilyRevision> getTransactionList(String path)
+            throws IOException {
+
+        byte[] data = getRawData(path, new Stat());
+        ArrayList<FamilyRevision> wtxnList = new ArrayList<FamilyRevision>();
+        if (data == null) {
+            return wtxnList;
+        }
+        StoreFamilyRevisionList txnList = new StoreFamilyRevisionList();
+        deserialize(txnList, data);
+        Iterator<StoreFamilyRevision> itr = txnList.getRevisionListIterator();
+
+        while (itr.hasNext()) {
+            StoreFamilyRevision wtxn = itr.next();
+            wtxnList.add(new FamilyRevision(wtxn.getRevision(), wtxn
+                    .getTimestamp()));
+        }
+
+        return wtxnList;
+    }
+
+    /**
+     * This method returns the data associated with the path in zookeeper.
+     *
+     * @param path The znode path
+     * @param stat Zookeeper stat
+     * @return byte array The data stored in the znode.
+     * @throws IOException
+     */
+    byte[] getRawData(String path, Stat stat) throws IOException {
+        byte[] data = null;
+        try {
+            data = getSession().getData(path, false, stat);
+        } catch (Exception e) {
+            throw new IOException(
+                    "Exception while obtaining raw data from zookeeper path "
+                            + path, e);
+        }
+        return data;
+    }
+
+    /**
+     * This method created the basic znodes in zookeeper for revision
+     * management.
+     *
+     * @throws IOException
+     */
+    void createRootZNodes() throws IOException {
+        String txnBaseNode = PathUtil.getTransactionBasePath(this.baseDir);
+        String clockNode = PathUtil.getClockPath(this.baseDir);
+        ensurePathExists(txnBaseNode, null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        ensurePathExists(clockNode, null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+    }
+
+    /**
+     * This method closes the zookeeper session.
+     */
+    void closeZKConnection() {
+        if (zkSession != null) {
+            try {
+                zkSession.close();
+            } catch (InterruptedException e) {
+                LOG.warn("Close failed: ", e);
+            }
+            zkSession = null;
+            LOG.info("Disconnected to ZooKeeper");
+        }
+    }
+
+    /**
+     * This method returns a zookeeper session. If the current session is closed,
+     * then a new session is created.
+     *
+     * @return ZooKeeper An instance of zookeeper client.
+     * @throws IOException
+     */
+     ZooKeeper getSession() throws IOException {
+        if (zkSession == null || zkSession.getState() == States.CLOSED) {
+            synchronized (this) {
+                if (zkSession == null || zkSession.getState() == States.CLOSED) {
+                    zkSession = new ZooKeeper(this.connectString,
+                            this.DEFAULT_SESSION_TIMEOUT, new ZKWatcher());
+                }
+            }
+        }
+        return zkSession;
+    }
+
+    /**
+     * This method updates the transaction data related to a znode.
+     *
+     * @param String The path to the transaction data.
+     * @param FamilyRevision The FamilyRevision to be updated.
+     * @param UpdateMode The mode to update like append, update, remove.
+     * @throws IOException
+     */
+    void updateData(String path, FamilyRevision updateTx, UpdateMode mode)
+            throws IOException {
+
+        if (updateTx == null) {
+            throw new IOException(
+                    "The transaction to be updated found to be null.");
+        }
+        List<FamilyRevision> currentData = getTransactionList(path);
+        List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
+        boolean dataFound = false;
+        long updateVersion = updateTx.getRevision();
+        for (FamilyRevision tranx : currentData) {
+            if (tranx.getRevision() != updateVersion) {
+                newData.add(tranx);
+            } else {
+                dataFound = true;
+            }
+        }
+        switch (mode) {
+            case REMOVE:
+                if (dataFound == false) {
+                    throw new IOException(
+                            "The transaction to be removed not found in the data.");
+                }
+                LOG.info("Removed trasaction : " + updateTx.toString());
+                break;
+            case KEEP_ALIVE:
+                if (dataFound == false) {
+                    throw new IOException(
+                            "The transaction to be kept alove not found in the data. It might have been expired.");
+                }
+                newData.add(updateTx);
+                LOG.info("keep alive of transaction : " + updateTx.toString());
+                break;
+            case APPEND:
+                if (dataFound == true) {
+                    throw new IOException(
+                            "The data to be appended already exists.");
+                }
+                newData.add(updateTx);
+                LOG.info("Added transaction : " + updateTx.toString());
+                break;
+        }
+
+        // For serialization purposes.
+        List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
+        for (FamilyRevision wtxn : newData) {
+            StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
+                    wtxn.getExpireTimestamp());
+            newTxnList.add(newTxn);
+        }
+        StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
+        byte[] newByteData = serialize(wtxnList);
+
+        Stat stat = null;
+        try {
+            stat = zkSession.setData(path, newByteData, -1);
+        } catch (KeeperException e) {
+            throw new IOException(
+                    "Exception while updating trasactional data. ", e);
+        } catch (InterruptedException e) {
+            throw new IOException(
+                    "Exception while updating trasactional data. ", e);
+        }
+
+        if (stat != null) {
+            LOG.info("Transaction list stored at " + path + ".");
+        }
+
+    }
+
+    /**
+     * Refresh transactions on a given transaction data path.
+     *
+     * @param path The path to the transaction data.
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    void refreshTransactions(String path) throws IOException{
+        List<FamilyRevision> currentData = getTransactionList(path);
+        List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
+
+        for (FamilyRevision tranx : currentData) {
+            if (tranx.getExpireTimestamp() > getTimeStamp()) {
+                newData.add(tranx);
+            }
+        }
+
+        if(newData.equals(currentData) == false){
+            List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
+            for (FamilyRevision wtxn : newData) {
+                StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
+                        wtxn.getExpireTimestamp());
+                newTxnList.add(newTxn);
+            }
+            StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
+            byte[] newByteData = serialize(wtxnList);
+
+            try {
+                zkSession.setData(path, newByteData, -1);
+            } catch (KeeperException e) {
+                throw new IOException(
+                        "Exception while updating trasactional data. ", e);
+            } catch (InterruptedException e) {
+                throw new IOException(
+                        "Exception while updating trasactional data. ", e);
+            }
+
+        }
+
+    }
+
+    /**
+     * This method serializes a given instance of TBase object.
+     *
+     * @param obj An instance of TBase
+     * @return byte array The serialized data.
+     * @throws IOException
+     */
+    static byte[] serialize(TBase obj) throws IOException {
+        if (obj == null)
+            return new byte[0];
+        try {
+            TSerializer serializer = new TSerializer(
+                    new TBinaryProtocol.Factory());
+            byte[] bytes = serializer.serialize(obj);
+            return bytes;
+        } catch (Exception e) {
+            throw new IOException("Serialization error: ", e);
+        }
+    }
+
+
+    /**
+     * This method deserializes the given byte array into the TBase object.
+     *
+     * @param obj An instance of TBase
+     * @param data Output of deserialization.
+     * @throws IOException
+     */
+    static void deserialize(TBase obj, byte[] data) throws IOException {
+        if (data == null || data.length == 0)
+            return;
+        try {
+            TDeserializer deserializer = new TDeserializer(
+                    new TBinaryProtocol.Factory());
+            deserializer.deserialize(obj, data);
+        } catch (Exception e) {
+            throw new IOException("Deserialization error: " + e.getMessage(), e);
+        }
+    }
+
+    private class ZKWatcher implements Watcher {
+        public void process(WatchedEvent event) {
+            switch (event.getState()) {
+                case Expired:
+                    LOG.info("The client session has expired. Try opening a new "
+                            + "session and connecting again.");
+                    zkSession = null;
+                    break;
+                default:
+
+            }
+        }
+    }
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+/**
+ * This class has two methods which are call
+ * back methods when a lock is acquired and
+ * when the lock is released.
+ *  This class has been used as-is from the zookeeper 3.3.4 recipes minor changes
+ *  in the package name.
+ */
+public interface LockListener {
+    /**
+     * call back called when the lock
+     * is acquired
+     */
+    public void lockAcquired();
+
+    /**
+     * call back called when the lock is
+     * released.
+     */
+    public void lockReleased();
+}
\ No newline at end of file

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,193 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.hcatalog.hbase.snapshot.lock.ZooKeeperOperation;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A base class for protocol implementations which provides a number of higher
+ * level helper methods for working with ZooKeeper along with retrying synchronous
+ *  operations if the connection to ZooKeeper closes such as
+ *  {@link #retryOperation(ZooKeeperOperation)}
+ *  This class has been used as-is from the zookeeper 3.4.0 recipes with
+ *  changes in the retry delay, retry count values and package name.
+ */
+class ProtocolSupport {
+    private static final Logger LOG = Logger.getLogger(ProtocolSupport.class);
+
+    protected final ZooKeeper zookeeper;
+    private AtomicBoolean closed = new AtomicBoolean(false);
+    private long retryDelay = 500L;
+    private int retryCount = 3;
+    private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+    public ProtocolSupport(ZooKeeper zookeeper) {
+        this.zookeeper = zookeeper;
+    }
+
+    /**
+     * Closes this strategy and releases any ZooKeeper resources; but keeps the
+     *  ZooKeeper instance open
+     */
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            doClose();
+        }
+    }
+
+    /**
+     * return zookeeper client instance
+     * @return zookeeper client instance
+     */
+    public ZooKeeper getZookeeper() {
+        return zookeeper;
+    }
+
+    /**
+     * return the acl its using
+     * @return the acl.
+     */
+    public List<ACL> getAcl() {
+        return acl;
+    }
+
+    /**
+     * set the acl
+     * @param acl the acl to set to
+     */
+    public void setAcl(List<ACL> acl) {
+        this.acl = acl;
+    }
+
+    /**
+     * get the retry delay in milliseconds
+     * @return the retry delay
+     */
+    public long getRetryDelay() {
+        return retryDelay;
+    }
+
+    /**
+     * Sets the time waited between retry delays
+     * @param retryDelay the retry delay
+     */
+    public void setRetryDelay(long retryDelay) {
+        this.retryDelay = retryDelay;
+    }
+
+    /**
+     * Allow derived classes to perform
+     * some custom closing operations to release resources
+     */
+    protected void doClose() {
+    }
+
+
+    /**
+     * Perform the given operation, retrying if the connection fails
+     * @return object. it needs to be cast to the callee's expected
+     * return type.
+     */
+    protected Object retryOperation(ZooKeeperOperation operation)
+        throws KeeperException, InterruptedException {
+        KeeperException exception = null;
+        for (int i = 0; i < retryCount; i++) {
+            try {
+                return operation.execute();
+            } catch (KeeperException.SessionExpiredException e) {
+                LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e);
+                throw e;
+            } catch (KeeperException.ConnectionLossException e) {
+                if (exception == null) {
+                    exception = e;
+                }
+                LOG.debug("Attempt " + i + " failed with connection loss so " +
+                		"attempting to reconnect: " + e, e);
+                retryDelay(i);
+            }
+        }
+        throw exception;
+    }
+
+    /**
+     * Ensures that the given path exists with no data, the current
+     * ACL and no flags
+     * @param path
+     */
+    protected void ensurePathExists(String path) {
+        ensureExists(path, null, acl, CreateMode.PERSISTENT);
+    }
+
+    /**
+     * Ensures that the given path exists with the given data, ACL and flags
+     * @param path
+     * @param acl
+     * @param flags
+     */
+    protected void ensureExists(final String path, final byte[] data,
+            final List<ACL> acl, final CreateMode flags) {
+        try {
+            retryOperation(new ZooKeeperOperation() {
+                public boolean execute() throws KeeperException, InterruptedException {
+                    Stat stat = zookeeper.exists(path, false);
+                    if (stat != null) {
+                        return true;
+                    }
+                    zookeeper.create(path, data, acl, flags);
+                    return true;
+                }
+            });
+        } catch (KeeperException e) {
+            LOG.warn("Caught: " + e, e);
+        } catch (InterruptedException e) {
+            LOG.warn("Caught: " + e, e);
+        }
+    }
+
+    /**
+     * Returns true if this protocol has been closed
+     * @return true if this protocol is closed
+     */
+    protected boolean isClosed() {
+        return closed.get();
+    }
+
+    /**
+     * Performs a retry delay if this is not the first attempt
+     * @param attemptCount the number of the attempts performed so far
+     */
+    protected void retryDelay(int attemptCount) {
+        if (attemptCount > 0) {
+            try {
+                Thread.sleep(attemptCount * retryDelay);
+            } catch (InterruptedException e) {
+                LOG.debug("Failed to sleep: " + e, e);
+            }
+        }
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/WriteLock.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,297 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * A <a href="package.html">protocol to implement an exclusive
+ *  write lock or to elect a leader</a>. <p/> You invoke {@link #lock()} to
+ *  start the process of grabbing the lock; you may get the lock then or it may be
+ *  some time later. <p/> You can register a listener so that you are invoked
+ *  when you get the lock; otherwise you can ask if you have the lock
+ *  by calling {@link #isOwner()}
+ *  This class has been used as-is from the zookeeper 3.4.0 recipes. The only change
+ *  made is a TODO for sorting using suffixes and the package name.
+ */
+public class WriteLock extends ProtocolSupport {
+    private static final Logger LOG = Logger.getLogger(WriteLock.class);
+
+    private final String dir;
+    private String id;
+    private ZNodeName idName;
+    private String ownerId;
+    private String lastChildId;
+    private byte[] data = {0x12, 0x34};
+    private LockListener callback;
+    private LockZooKeeperOperation zop;
+
+    /**
+     * zookeeper contructor for writelock
+     * @param zookeeper zookeeper client instance
+     * @param dir the parent path you want to use for locking
+     * @param acls the acls that you want to use for all the paths,
+     * if null world read/write is used.
+     */
+    public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) {
+        super(zookeeper);
+        this.dir = dir;
+        if (acl != null) {
+            setAcl(acl);
+        }
+        this.zop = new LockZooKeeperOperation();
+    }
+
+    /**
+     * zookeeper contructor for writelock with callback
+     * @param zookeeper the zookeeper client instance
+     * @param dir the parent path you want to use for locking
+     * @param acl the acls that you want to use for all the paths
+     * @param callback the call back instance
+     */
+    public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl,
+            LockListener callback) {
+        this(zookeeper, dir, acl);
+        this.callback = callback;
+    }
+
+    /**
+     * return the current locklistener
+     * @return the locklistener
+     */
+    public LockListener getLockListener() {
+        return this.callback;
+    }
+
+    /**
+     * register a different call back listener
+     * @param callback the call back instance
+     */
+    public void setLockListener(LockListener callback) {
+        this.callback = callback;
+    }
+
+    /**
+     * Removes the lock or associated znode if
+     * you no longer require the lock. this also
+     * removes your request in the queue for locking
+     * in case you do not already hold the lock.
+     * @throws RuntimeException throws a runtime exception
+     * if it cannot connect to zookeeper.
+     */
+    public synchronized void unlock() throws RuntimeException {
+
+        if (!isClosed() && id != null) {
+            // we don't need to retry this operation in the case of failure
+            // as ZK will remove ephemeral files and we don't wanna hang
+            // this process when closing if we cannot reconnect to ZK
+            try {
+
+                ZooKeeperOperation zopdel = new ZooKeeperOperation() {
+                    public boolean execute() throws KeeperException,
+                        InterruptedException {
+                        zookeeper.delete(id, -1);
+                        return Boolean.TRUE;
+                    }
+                };
+                zopdel.execute();
+            } catch (InterruptedException e) {
+                LOG.warn("Caught: " + e, e);
+                //set that we have been interrupted.
+               Thread.currentThread().interrupt();
+            } catch (KeeperException.NoNodeException e) {
+                // do nothing
+            } catch (KeeperException e) {
+                LOG.warn("Caught: " + e, e);
+                throw (RuntimeException) new RuntimeException(e.getMessage()).
+                    initCause(e);
+            }
+            finally {
+                if (callback != null) {
+                    callback.lockReleased();
+                }
+                id = null;
+            }
+        }
+    }
+
+    /**
+     * the watcher called on
+     * getting watch while watching
+     * my predecessor
+     */
+    private class LockWatcher implements Watcher {
+        public void process(WatchedEvent event) {
+            // lets either become the leader or watch the new/updated node
+            LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +
+                    event.getState() + " type " + event.getType());
+            try {
+                lock();
+            } catch (Exception e) {
+                LOG.warn("Failed to acquire lock: " + e, e);
+            }
+        }
+    }
+
+    /**
+     * a zoookeeper operation that is mainly responsible
+     * for all the magic required for locking.
+     */
+    private  class LockZooKeeperOperation implements ZooKeeperOperation {
+
+        /** find if we have been created earler if not create our node
+         *
+         * @param prefix the prefix node
+         * @param zookeeper teh zookeeper client
+         * @param dir the dir paretn
+         * @throws KeeperException
+         * @throws InterruptedException
+         */
+        private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
+            throws KeeperException, InterruptedException {
+            List<String> names = zookeeper.getChildren(dir, false);
+            for (String name : names) {
+                if (name.startsWith(prefix)) {
+                    id = name;
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Found id created last time: " + id);
+                    }
+                    break;
+                }
+            }
+            if (id == null) {
+                id = zookeeper.create(dir + "/" + prefix, data,
+                        getAcl(), EPHEMERAL_SEQUENTIAL);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Created id: " + id);
+                }
+            }
+
+        }
+
+        /**
+         * the command that is run and retried for actually
+         * obtaining the lock
+         * @return if the command was successful or not
+         */
+        public boolean execute() throws KeeperException, InterruptedException {
+            do {
+                if (id == null) {
+                    long sessionId = zookeeper.getSessionId();
+                    String prefix = "x-" + sessionId + "-";
+                    // lets try look up the current ID if we failed
+                    // in the middle of creating the znode
+                    findPrefixInChildren(prefix, zookeeper, dir);
+                    idName = new ZNodeName(id);
+                }
+                if (id != null) {
+                    List<String> names = zookeeper.getChildren(dir, false);
+                    if (names.isEmpty()) {
+                        LOG.warn("No children in: " + dir + " when we've just " +
+                        "created one! Lets recreate it...");
+                        // lets force the recreation of the id
+                        id = null;
+                    } else {
+                        // lets sort them explicitly (though they do seem to come back in order ususally :)
+                        SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
+                        for (String name : names) {
+                            //TODO: Just use the suffix to sort.
+                            sortedNames.add(new ZNodeName(dir + "/" + name));
+                        }
+                        ownerId = sortedNames.first().getName();
+                        SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
+                        if (!lessThanMe.isEmpty()) {
+                            ZNodeName lastChildName = lessThanMe.last();
+                            lastChildId = lastChildName.getName();
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("watching less than me node: " + lastChildId);
+                            }
+                            Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
+                            if (stat != null) {
+                                return Boolean.FALSE;
+                            } else {
+                                LOG.warn("Could not find the" +
+                                		" stats for less than me: " + lastChildName.getName());
+                            }
+                        } else {
+                            if (isOwner()) {
+                                if (callback != null) {
+                                    callback.lockAcquired();
+                                }
+                                return Boolean.TRUE;
+                            }
+                        }
+                    }
+                }
+            }
+            while (id == null);
+            return Boolean.FALSE;
+        }
+    };
+
+    /**
+     * Attempts to acquire the exclusive write lock returning whether or not it was
+     * acquired. Note that the exclusive lock may be acquired some time later after
+     * this method has been invoked due to the current lock owner going away.
+     */
+    public synchronized boolean lock() throws KeeperException, InterruptedException {
+        if (isClosed()) {
+            return false;
+        }
+        ensurePathExists(dir);
+
+        return (Boolean) retryOperation(zop);
+    }
+
+    /**
+     * return the parent dir for lock
+     * @return the parent dir used for locks.
+     */
+    public String getDir() {
+        return dir;
+    }
+
+    /**
+     * Returns true if this node is the owner of the
+     *  lock (or the leader)
+     */
+    public boolean isOwner() {
+        return id != null && ownerId != null && id.equals(ownerId);
+    }
+
+    /**
+     * return the id for this lock
+     * @return the id for this lock
+     */
+    public String getId() {
+       return this.id;
+    }
+}
+

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZNodeName.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,110 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Represents an ephemeral znode name which has an ordered sequence number
+ * and can be sorted in order
+ *  This class has been used as-is from the zookeeper 3.4.0 recipes with a
+ *  change in package name.
+ */
+public class ZNodeName implements Comparable<ZNodeName> {
+    private final String name;
+    private String prefix;
+    private int sequence = -1;
+    private static final Logger LOG = Logger.getLogger(ZNodeName.class);
+
+    public ZNodeName(String name) {
+        if (name == null) {
+            throw new NullPointerException("id cannot be null");
+        }
+        this.name = name;
+        this.prefix = name;
+        int idx = name.lastIndexOf('-');
+        if (idx >= 0) {
+            this.prefix = name.substring(0, idx);
+            try {
+                this.sequence = Integer.parseInt(name.substring(idx + 1));
+                // If an exception occurred we misdetected a sequence suffix,
+                // so return -1.
+            } catch (NumberFormatException e) {
+                LOG.info("Number format exception for " + idx, e);
+            } catch (ArrayIndexOutOfBoundsException e) {
+               LOG.info("Array out of bounds for " + idx, e);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return name.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ZNodeName sequence = (ZNodeName) o;
+
+        if (!name.equals(sequence.name)) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return name.hashCode() + 37;
+    }
+
+    public int compareTo(ZNodeName that) {
+        int answer = this.prefix.compareTo(that.prefix);
+        if (answer == 0) {
+            int s1 = this.sequence;
+            int s2 = that.sequence;
+            if (s1 == -1 && s2 == -1) {
+                return this.name.compareTo(that.name);
+            }
+            answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2;
+        }
+        return answer;
+    }
+
+    /**
+     * Returns the name of the znode
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Returns the sequence number
+     */
+    public int getZNodeName() {
+        return sequence;
+    }
+
+    /**
+     * Returns the text prefix before the sequence number
+     */
+    public String getPrefix() {
+        return prefix;
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ZooKeeperOperation.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A callback object which can be used for implementing retry-able operations in the
+ * {@link org.apache.hcatalog.hbase.snapshot.lock.ProtocolSupport} class
+ *  This class has been used as-is from the zookeeper 3.4.0 with change in the
+ *  package name .
+ */
+public interface ZooKeeperOperation {
+
+    /**
+     * Performs the operation - which may be involved multiple times if the connection
+     * to ZooKeeper closes during this operation
+     *
+     * @return the result of the operation or null
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public boolean execute() throws KeeperException, InterruptedException;
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/IDGenClient.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class IDGenClient extends Thread {
+
+    String connectionStr;
+    String base_dir;
+    ZKUtil zkutil;
+    Random sleepTime = new Random();
+    int runtime;
+    HashMap<Long, Long> idMap;
+    String tableName;
+
+    IDGenClient(String connectionStr, String base_dir, int time, String tableName) {
+        super();
+        this.connectionStr = connectionStr;
+        this.base_dir = base_dir;
+        this.zkutil = new ZKUtil(connectionStr, base_dir);
+        this.runtime = time;
+        idMap = new HashMap<Long, Long>();
+        this.tableName = tableName;
+    }
+
+    /*
+     * @see java.lang.Runnable#run()
+     */
+    @Override
+    public void run() {
+        long startTime = System.currentTimeMillis();
+        int timeElapsed = 0;
+        while( timeElapsed <= runtime){
+            try {
+                long id = zkutil.nextId(tableName);
+                idMap.put(System.currentTimeMillis(), id);
+
+                int sTime = sleepTime.nextInt(2);
+                Thread.sleep(sTime * 100);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+            timeElapsed = (int) Math.ceil((System.currentTimeMillis() - startTime)/(double)1000);
+        }
+
+    }
+
+    Map<Long, Long> getIdMap(){
+        return idMap;
+    }
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestIDGenerator.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.hcatalog.hbase.SkeletonHBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestIDGenerator extends SkeletonHBaseTest{
+
+    @Test
+    public void testIDGeneration() throws Exception {
+
+        int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+        String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+        String[] splits = servers.split(",");
+        StringBuffer sb = new StringBuffer();
+        for(String split : splits){
+            sb.append(split);
+            sb.append(':');
+            sb.append(port);
+        }
+        ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base");
+
+        String tableName = "myTable";
+        long initId = zkutil.nextId(tableName);
+        for (int i=0; i<10; i++) {
+            long id = zkutil.nextId(tableName);
+            Assert.assertEquals(initId + (i + 1), id);
+        }
+    }
+
+    @Test
+    public void testMultipleClients() throws InterruptedException{
+
+        int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+        String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+        String[] splits = servers.split(",");
+        StringBuffer sb = new StringBuffer();
+        for(String split : splits){
+            sb.append(split);
+            sb.append(':');
+            sb.append(port);
+        }
+
+        ArrayList<IDGenClient> clients = new ArrayList<IDGenClient>();
+
+        for(int i =0; i < 5; i++){
+            IDGenClient idClient = new IDGenClient(sb.toString(), "/rm_base", 10, "testTable");
+            clients.add(idClient);
+        }
+
+        for(IDGenClient idClient : clients){
+            idClient.run();
+        }
+
+        for(IDGenClient idClient : clients){
+            idClient.join();
+        }
+
+        HashMap<Long, Long> idMap = new HashMap<Long, Long>();
+        for(IDGenClient idClient : clients){
+            idMap.putAll(idClient.getIdMap());
+        }
+
+        ArrayList<Long> keys = new ArrayList<Long>(idMap.keySet());
+        Collections.sort(keys);
+        int startId = 1;
+        for(Long key: keys){
+            Long id = idMap.get(key);
+            System.out.println("Key: " + key + " Value "+ id);
+            assertTrue(id == startId);
+            startId++;
+
+        }
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hcatalog.hbase.SkeletonHBaseTest;
+import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+public class TestRevisionManager extends SkeletonHBaseTest{
+
+    @Test
+    public void testBasicZNodeCreation() throws IOException, KeeperException, InterruptedException{
+
+        int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+        String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+        String[] splits = servers.split(",");
+        StringBuffer sb = new StringBuffer();
+        for(String split : splits){
+            sb.append(split);
+            sb.append(':');
+            sb.append(port);
+        }
+
+        ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base");
+        String tableName = newTableName("testTable");
+        List<String> columnFamilies = Arrays.asList("cf001", "cf002", "cf003");
+
+        zkutil.createRootZNodes();
+        ZooKeeper zk = zkutil.getSession();
+        Stat tempTwo = zk.exists("/rm_base" + PathUtil.DATA_DIR, false);
+        assertTrue(tempTwo != null);
+        Stat tempThree = zk.exists("/rm_base" + PathUtil.CLOCK_NODE, false);
+        assertTrue(tempThree != null);
+
+        zkutil.setUpZnodesForTable(tableName, columnFamilies);
+        String transactionDataTablePath = "/rm_base" + PathUtil.DATA_DIR + "/" + tableName;
+        Stat result = zk.exists(transactionDataTablePath, false);
+        assertTrue(result != null);
+
+        for(String colFamiliy : columnFamilies){
+            String cfPath = transactionDataTablePath + "/" + colFamiliy;
+            Stat resultTwo = zk.exists(cfPath, false);
+            assertTrue(resultTwo != null);
+        }
+
+    }
+
+    @Test
+    public void testCommitTransaction() throws IOException{
+
+        int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+        String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+        String[] splits = servers.split(",");
+        StringBuffer sb = new StringBuffer();
+        for(String split : splits){
+            sb.append(split);
+            sb.append(':');
+            sb.append(port);
+        }
+
+        Properties props = new Properties();
+        props.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+        props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+        ZKBasedRevisionManager  manager = new ZKBasedRevisionManager();
+        manager.initialize(props);
+        manager.open();
+        ZKUtil zkutil = new ZKUtil(sb.toString(), "/rm_base");
+
+        String tableName = newTableName("testTable");
+        List<String> columnFamilies = Arrays.asList("cf1", "cf2", "cf3");
+        Transaction txn = manager.beginWriteTransaction(tableName,
+                columnFamilies);
+
+        List<String> cfs = zkutil.getColumnFamiliesOfTable(tableName);
+        assertTrue(cfs.size() == columnFamilies.size());
+        for (String cf : cfs){
+            assertTrue(columnFamilies.contains(cf));
+        }
+
+        for(String colFamily : columnFamilies){
+            String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamily);
+            byte[] data = zkutil.getRawData(path, null);
+            StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+            ZKUtil.deserialize(list, data);
+            assertEquals(list.getRevisionListSize(), 1);
+            StoreFamilyRevision lightTxn = list.getRevisionList().get(0);
+            assertEquals(lightTxn.timestamp, txn.getTransactionExpireTimeStamp());
+            assertEquals(lightTxn.revision, txn.getRevisionNumber());
+
+        }
+        manager.commitWriteTransaction(txn);
+        for(String colFamiliy : columnFamilies){
+            String path = PathUtil.getRunningTxnInfoPath("/rm_base", tableName, colFamiliy);
+            byte[] data = zkutil.getRawData(path, null);
+            StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+            ZKUtil.deserialize(list, data);
+            assertEquals(list.getRevisionListSize(), 0);
+
+        }
+
+        manager.close();
+    }
+
+    @Test
+    public void testAbortTransaction() throws IOException{
+
+        int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+        String host = getHbaseConf().get("hbase.zookeeper.quorum");
+        Properties props = new Properties();
+        props.put(ZKBasedRevisionManager.HOSTLIST, host + ':' + port);
+        props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+        ZKBasedRevisionManager  manager = new ZKBasedRevisionManager();
+        manager.initialize(props);
+        manager.open();
+        ZKUtil zkutil = new ZKUtil(host + ':' + port, "/rm_base");
+
+        String tableName = newTableName("testTable");
+        List<String> columnFamilies = Arrays.asList("cf1", "cf2", "cf3");
+        Transaction txn = manager.beginWriteTransaction(tableName, columnFamilies);
+        List<String> cfs = zkutil.getColumnFamiliesOfTable(tableName);
+
+        assertTrue(cfs.size() == columnFamilies.size());
+        for (String cf : cfs){
+            assertTrue(columnFamilies.contains(cf));
+        }
+
+        for(String colFamiliy : columnFamilies){
+            String path = PathUtil.getRunningTxnInfoPath("/rm_base",tableName, colFamiliy);
+            byte[] data = zkutil.getRawData(path, null);
+            StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+            ZKUtil.deserialize(list, data);
+            assertEquals(list.getRevisionListSize(), 1);
+            StoreFamilyRevision lightTxn = list.getRevisionList().get(0);
+            assertEquals(lightTxn.timestamp, txn.getTransactionExpireTimeStamp());
+            assertEquals(lightTxn.revision, txn.getRevisionNumber());
+
+        }
+        manager.abortWriteTransaction(txn);
+        for(String colFamiliy : columnFamilies){
+            String path = PathUtil.getRunningTxnInfoPath("/rm_base",tableName, colFamiliy);
+            byte[] data = zkutil.getRawData(path, null);
+            StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+            ZKUtil.deserialize(list, data);
+            assertEquals(list.getRevisionListSize(), 0);
+
+        }
+
+        for(String colFamiliy : columnFamilies){
+            String path = PathUtil.getAbortInformationPath("/rm_base",tableName, colFamiliy);
+            byte[] data = zkutil.getRawData(path, null);
+            StoreFamilyRevisionList list = new StoreFamilyRevisionList();
+            ZKUtil.deserialize(list, data);
+            assertEquals(list.getRevisionListSize(), 1);
+            StoreFamilyRevision abortedTxn = list.getRevisionList().get(0);
+            assertEquals(abortedTxn.getRevision(), txn.getRevisionNumber());
+        }
+        manager.close();
+    }
+
+    @Test
+    public void testKeepAliveTransaction() throws InterruptedException, IOException {
+
+        int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+        String servers = getHbaseConf().get("hbase.zookeeper.quorum");
+        String[] splits = servers.split(",");
+        StringBuffer sb = new StringBuffer();
+        for(String split : splits){
+            sb.append(split);
+            sb.append(':');
+            sb.append(port);
+        }
+
+        Properties props = new Properties();
+        props.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+        props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+        ZKBasedRevisionManager  manager = new ZKBasedRevisionManager();
+        manager.initialize(props);
+        manager.open();
+        String tableName = newTableName("testTable");
+        List<String> columnFamilies = Arrays.asList("cf1", "cf2");
+        Transaction txn = manager.beginWriteTransaction(tableName,
+                columnFamilies, 40);
+        Thread.sleep(100);
+        try {
+            manager.commitWriteTransaction(txn);
+        } catch (Exception e) {
+            assertTrue(e instanceof IOException);
+            assertEquals(e.getMessage(),
+                    "The transaction to be removed not found in the data.");
+        }
+
+    }
+
+    @Test
+    public void testCreateSnapshot() throws IOException{
+        int port = getHbaseConf().getInt("hbase.zookeeper.property.clientPort", 2181);
+        String host = getHbaseConf().get("hbase.zookeeper.quorum");
+        Properties props = new Properties();
+        props.put(ZKBasedRevisionManager.HOSTLIST, host + ':' + port);
+        props.put(ZKBasedRevisionManager.DATADIR, "/rm_base");
+        ZKBasedRevisionManager  manager = new ZKBasedRevisionManager();
+        manager.initialize(props);
+        manager.open();
+        String tableName = newTableName("testTable");
+        List<String> cfOne = Arrays.asList("cf1", "cf2");
+        List<String> cfTwo = Arrays.asList("cf2", "cf3");
+        Transaction tsx1 = manager.beginWriteTransaction(tableName, cfOne);
+        Transaction tsx2 = manager.beginWriteTransaction(tableName, cfTwo);
+        TableSnapshot snapshotOne = manager.createSnapshot(tableName);
+        assertEquals(snapshotOne.getRevision("cf1"), 0);
+        assertEquals(snapshotOne.getRevision("cf2"), 0);
+        assertEquals(snapshotOne.getRevision("cf3"), 1);
+
+        List<String> cfThree = Arrays.asList("cf1", "cf3");
+        Transaction tsx3 = manager.beginWriteTransaction(tableName, cfThree);
+        manager.commitWriteTransaction(tsx1);
+        TableSnapshot snapshotTwo = manager.createSnapshot(tableName);
+        assertEquals(snapshotTwo.getRevision("cf1"), 2);
+        assertEquals(snapshotTwo.getRevision("cf2"), 1);
+        assertEquals(snapshotTwo.getRevision("cf3"), 1);
+
+        manager.commitWriteTransaction(tsx2);
+        TableSnapshot snapshotThree = manager.createSnapshot(tableName);
+        assertEquals(snapshotThree.getRevision("cf1"), 2);
+        assertEquals(snapshotThree.getRevision("cf2"), 3);
+        assertEquals(snapshotThree.getRevision("cf3"), 2);
+        manager.commitWriteTransaction(tsx3);
+        TableSnapshot snapshotFour = manager.createSnapshot(tableName);
+        assertEquals(snapshotFour.getRevision("cf1"), 3);
+        assertEquals(snapshotFour.getRevision("cf2"), 3);
+        assertEquals(snapshotFour.getRevision("cf3"), 3);
+
+    }
+
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestThriftSerialization.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hcatalog.hbase.snapshot.transaction.thrift.*;
+import org.junit.Test;
+
+public class TestThriftSerialization {
+
+    @Test
+    public void testLightWeightTransaction(){
+       StoreFamilyRevision trxn = new StoreFamilyRevision(0, 1000);
+        try {
+
+            byte[] data = ZKUtil.serialize(trxn);
+            StoreFamilyRevision newWtx = new StoreFamilyRevision();
+            ZKUtil.deserialize(newWtx, data);
+
+            assertTrue(newWtx.getRevision() == trxn.getRevision());
+            assertTrue(newWtx.getTimestamp() == trxn.getTimestamp());
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testWriteTransactionList(){
+        List<StoreFamilyRevision> txnList = new ArrayList<StoreFamilyRevision>();
+        long version;
+        long timestamp;
+        for( int i = 0; i < 10; i++){
+            version = i;
+            timestamp = 1000 + i;
+            StoreFamilyRevision wtx = new StoreFamilyRevision(version, timestamp);
+            txnList.add(wtx);
+        }
+
+        StoreFamilyRevisionList wList = new StoreFamilyRevisionList(txnList);
+
+        try {
+            byte[] data = ZKUtil.serialize(wList);
+            StoreFamilyRevisionList newList = new StoreFamilyRevisionList();
+            ZKUtil.deserialize(newList, data);
+            assertTrue(newList.getRevisionListSize() == wList.getRevisionListSize());
+
+            Iterator<StoreFamilyRevision> itr = newList.getRevisionListIterator();
+            int i = 0;
+            while(itr.hasNext()){
+                StoreFamilyRevision txn = itr.next();
+                assertTrue(txn.getRevision() ==  i);
+                assertTrue(txn.getTimestamp() == (i + 1000));
+                i++;
+            }
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/WriteLockTest.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * test for writelock
+ * This class is taken from the zookeeper 3.4.0 as-is to test the zookeeper lock
+ * Recipe with a change in the package name.
+ */
+public class WriteLockTest extends ClientBase {
+    protected int sessionTimeout = 10 * 1000;
+    protected String dir = "/" + getClass().getName();
+    protected WriteLock[] nodes;
+    protected CountDownLatch latch = new CountDownLatch(1);
+    private boolean restartServer = true;
+    private boolean workAroundClosingLastZNodeFails = true;
+    private boolean killLeader = true;
+
+    @Test
+    public void testRun() throws Exception {
+        runTest(3);
+    }
+
+    class LockCallback implements LockListener {
+        public void lockAcquired() {
+            latch.countDown();
+        }
+
+        public void lockReleased() {
+
+        }
+
+    }
+    protected void runTest(int count) throws Exception {
+        nodes = new WriteLock[count];
+        for (int i = 0; i < count; i++) {
+            ZooKeeper keeper = createClient();
+            WriteLock leader = new WriteLock(keeper, dir, null);
+            leader.setLockListener(new LockCallback());
+            nodes[i] = leader;
+
+            leader.lock();
+        }
+
+        // lets wait for any previous leaders to die and one of our new
+        // nodes to become the new leader
+        latch.await(30, TimeUnit.SECONDS);
+
+        WriteLock first = nodes[0];
+        dumpNodes(count);
+
+        // lets assert that the first election is the leader
+        Assert.assertTrue("The first znode should be the leader " + first.getId(), first.isOwner());
+
+        for (int i = 1; i < count; i++) {
+            WriteLock node = nodes[i];
+            Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner());
+        }
+
+        if (count > 1) {
+            if (killLeader) {
+            System.out.println("Now killing the leader");
+            // now lets kill the leader
+            latch = new CountDownLatch(1);
+            first.unlock();
+            latch.await(30, TimeUnit.SECONDS);
+            //Thread.sleep(10000);
+            WriteLock second = nodes[1];
+            dumpNodes(count);
+            // lets assert that the first election is the leader
+            Assert.assertTrue("The second znode should be the leader " + second.getId(), second.isOwner());
+
+            for (int i = 2; i < count; i++) {
+                WriteLock node = nodes[i];
+                Assert.assertFalse("Node should not be the leader " + node.getId(), node.isOwner());
+            }
+            }
+
+
+            if (restartServer) {
+                // now lets stop the server
+                System.out.println("Now stopping the server");
+                stopServer();
+                Thread.sleep(10000);
+
+                // TODO lets assert that we are no longer the leader
+                dumpNodes(count);
+
+                System.out.println("Starting the server");
+                startServer();
+                Thread.sleep(10000);
+
+                for (int i = 0; i < count - 1; i++) {
+                    System.out.println("Calling acquire for node: " + i);
+                    nodes[i].lock();
+                }
+                dumpNodes(count);
+                System.out.println("Now closing down...");
+            }
+        }
+    }
+
+    protected void dumpNodes(int count) {
+        for (int i = 0; i < count; i++) {
+            WriteLock node = nodes[i];
+            System.out.println("node: " + i + " id: " +
+                    node.getId() + " is leader: " + node.isOwner());
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (nodes != null) {
+            for (int i = 0; i < nodes.length; i++) {
+                WriteLock node = nodes[i];
+                if (node != null) {
+                    System.out.println("Closing node: " + i);
+                    node.close();
+                    if (workAroundClosingLastZNodeFails && i == nodes.length - 1) {
+                        System.out.println("Not closing zookeeper: " + i + " due to bug!");
+                    } else {
+                        System.out.println("Closing zookeeper: " + i);
+                        node.getZookeeper().close();
+                        System.out.println("Closed zookeeper: " + i);
+                    }
+                }
+            }
+        }
+        System.out.println("Now lets stop the server");
+        super.tearDown();
+
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java?rev=1221635&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/lock/ZNodeNameTest.java Wed Dec 21 07:43:04 2011
@@ -0,0 +1,60 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot.lock;
+
+import junit.framework.TestCase;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.junit.Test;
+
+/**
+ * test for znodenames. This class is taken as-is from zookeeper lock recipe test.
+ * The package name has been changed.
+ */
+public class ZNodeNameTest extends TestCase {
+    @Test
+    public void testOrderWithSamePrefix() throws Exception {
+        String[] names = { "x-3", "x-5", "x-11", "x-1" };
+        String[] expected = { "x-1", "x-3", "x-5", "x-11" };
+        assertOrderedNodeNames(names, expected);
+    }
+    @Test
+    public void testOrderWithDifferentPrefixes() throws Exception {
+        String[] names = { "r-3", "r-2", "r-1", "w-2", "w-1" };
+        String[] expected = { "r-1", "r-2", "r-3", "w-1", "w-2" };
+        assertOrderedNodeNames(names, expected);
+    }
+
+    protected void assertOrderedNodeNames(String[] names, String[] expected) {
+        int size = names.length;
+        assertEquals("The two arrays should be the same size!", names.length, expected.length);
+        SortedSet<ZNodeName> nodeNames = new TreeSet<ZNodeName>();
+        for (String name : names) {
+            nodeNames.add(new ZNodeName(name));
+        }
+
+        int index = 0;
+        for (ZNodeName nodeName : nodeNames) {
+            String name = nodeName.getName();
+            assertEquals("Node " + index, expected[index++], name);
+        }
+    }
+
+}



Mime
View raw message