hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1522098 [7/30] - in /hive/branches/vectorization: ./ beeline/src/test/org/apache/hive/beeline/src/test/ bin/ bin/ext/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/ap...
Date Thu, 12 Sep 2013 01:21:29 GMT
Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java Thu Sep 12 01:21:10 2013
@@ -28,63 +28,63 @@ import java.util.Map;
  */
 public class TableSnapshot implements Serializable {
 
-    private String name;
+  private String name;
 
-    private Map<String, Long> cfRevisionMap;
+  private Map<String, Long> cfRevisionMap;
 
-    private long latestRevision;
+  private long latestRevision;
 
 
-    public TableSnapshot(String name, Map<String, Long> cfRevMap, long latestRevision) {
-        this.name = name;
-        if (cfRevMap == null) {
-            throw new IllegalArgumentException("revision map cannot be null");
-        }
-        this.cfRevisionMap = cfRevMap;
-        this.latestRevision = latestRevision;
-    }
-
-    /**
-     * Gets the table name.
-     *
-     * @return String The name of the table.
-     */
-    public String getTableName() {
-        return name;
-    }
-
-    /**
-     * Gets the column families.
-     *
-     * @return List<String> A list of column families associated with the snapshot.
-     */
-    public List<String> getColumnFamilies(){
-        return  new ArrayList<String>(this.cfRevisionMap.keySet());
-    }
-
-    /**
-     * Gets the revision.
-     *
-     * @param familyName The name of the column family.
-     * @return the revision
-     */
-    public long getRevision(String familyName){
-        if(cfRevisionMap.containsKey(familyName))
-            return cfRevisionMap.get(familyName);
-        return latestRevision;
-    }
-
-    /**
-     * @return the latest committed revision when this snapshot was taken
-     */
-    public long getLatestRevision() {
-        return latestRevision;
-    }
-
-    @Override
-    public String toString() {
-        String snapshot = "Table Name : " + name +" Latest Revision: " + latestRevision
-                + " Column Familiy revision : " + cfRevisionMap.toString();
-        return snapshot;
-    }
+  public TableSnapshot(String name, Map<String, Long> cfRevMap, long latestRevision) {
+    this.name = name;
+    if (cfRevMap == null) {
+      throw new IllegalArgumentException("revision map cannot be null");
+    }
+    this.cfRevisionMap = cfRevMap;
+    this.latestRevision = latestRevision;
+  }
+
+  /**
+   * Gets the table name.
+   *
+   * @return String The name of the table.
+   */
+  public String getTableName() {
+    return name;
+  }
+
+  /**
+   * Gets the column families.
+   *
+   * @return List<String> A list of column families associated with the snapshot.
+   */
+  public List<String> getColumnFamilies(){
+    return  new ArrayList<String>(this.cfRevisionMap.keySet());
+  }
+
+  /**
+   * Gets the revision.
+   *
+   * @param familyName The name of the column family.
+   * @return the revision
+   */
+  public long getRevision(String familyName){
+    if(cfRevisionMap.containsKey(familyName))
+      return cfRevisionMap.get(familyName);
+    return latestRevision;
+  }
+
+  /**
+   * @return the latest committed revision when this snapshot was taken
+   */
+  public long getLatestRevision() {
+    return latestRevision;
+  }
+
+  @Override
+  public String toString() {
+    String snapshot = "Table Name : " + name +" Latest Revision: " + latestRevision
+        + " Column Familiy revision : " + cfRevisionMap.toString();
+    return snapshot;
+  }
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java Thu Sep 12 01:21:10 2013
@@ -28,89 +28,89 @@ import java.util.List;
  */
 public class Transaction implements Serializable {
 
-    private String tableName;
-    private List<String> columnFamilies = new ArrayList<String>();
-    private long timeStamp;
-    private long keepAlive;
-    private long revision;
-
-
-    Transaction(String tableName, List<String> columnFamilies, long revision, long timestamp) {
-        this.tableName = tableName;
-        this.columnFamilies = columnFamilies;
-        this.timeStamp = timestamp;
-        this.revision = revision;
-    }
-
-    /**
-     * @return The revision number associated with a transaction.
-     */
-    public long getRevisionNumber() {
-        return this.revision;
-    }
-
-    /**
-     * @return The table name associated with a transaction.
-     */
-    public String getTableName() {
-        return tableName;
-    }
-
-    /**
-     * @return The column families associated with a transaction.
-     */
-    public List<String> getColumnFamilies() {
-        return columnFamilies;
-    }
-
-    /**
-     * @return The expire timestamp associated with a transaction.
-     */
-    long getTransactionExpireTimeStamp() {
-        return this.timeStamp + this.keepAlive;
-    }
-
-    void setKeepAlive(long seconds) {
-        this.keepAlive = seconds;
-    }
-
-    /**
-     * Gets the keep alive value.
-     *
-     * @return long  The keep alive value for the transaction.
-     */
-    public long getKeepAliveValue() {
-        return this.keepAlive;
-    }
-
-    /**
-     * Gets the family revision info.
-     *
-     * @return FamilyRevision An instance of FamilyRevision associated with the transaction.
-     */
-    FamilyRevision getFamilyRevisionInfo() {
-        return new FamilyRevision(revision, getTransactionExpireTimeStamp());
-    }
-
-    /**
-     * Keep alive transaction. This methods extends the expire timestamp of a
-     * transaction by the "keep alive" amount.
-     */
-    void keepAliveTransaction() {
-        this.timeStamp = this.timeStamp + this.keepAlive;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Revision : ");
-        sb.append(this.getRevisionNumber());
-        sb.append(" Timestamp : ");
-        sb.append(this.getTransactionExpireTimeStamp());
-        sb.append("\n").append("Table : ");
-        sb.append(this.tableName).append("\n");
-        sb.append("Column Families : ");
-        sb.append(this.columnFamilies.toString());
-        return sb.toString();
-    }
+  private String tableName;
+  private List<String> columnFamilies = new ArrayList<String>();
+  private long timeStamp;
+  private long keepAlive;
+  private long revision;
+
+
+  Transaction(String tableName, List<String> columnFamilies, long revision, long timestamp) {
+    this.tableName = tableName;
+    this.columnFamilies = columnFamilies;
+    this.timeStamp = timestamp;
+    this.revision = revision;
+  }
+
+  /**
+   * @return The revision number associated with a transaction.
+   */
+  public long getRevisionNumber() {
+    return this.revision;
+  }
+
+  /**
+   * @return The table name associated with a transaction.
+   */
+  public String getTableName() {
+    return tableName;
+  }
+
+  /**
+   * @return The column families associated with a transaction.
+   */
+  public List<String> getColumnFamilies() {
+    return columnFamilies;
+  }
+
+  /**
+   * @return The expire timestamp associated with a transaction.
+   */
+  long getTransactionExpireTimeStamp() {
+    return this.timeStamp + this.keepAlive;
+  }
+
+  void setKeepAlive(long seconds) {
+    this.keepAlive = seconds;
+  }
+
+  /**
+   * Gets the keep alive value.
+   *
+   * @return long  The keep alive value for the transaction.
+   */
+  public long getKeepAliveValue() {
+    return this.keepAlive;
+  }
+
+  /**
+   * Gets the family revision info.
+   *
+   * @return FamilyRevision An instance of FamilyRevision associated with the transaction.
+   */
+  FamilyRevision getFamilyRevisionInfo() {
+    return new FamilyRevision(revision, getTransactionExpireTimeStamp());
+  }
+
+  /**
+   * Keep alive transaction. This methods extends the expire timestamp of a
+   * transaction by the "keep alive" amount.
+   */
+  void keepAliveTransaction() {
+    this.timeStamp = this.timeStamp + this.keepAlive;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Revision : ");
+    sb.append(this.getRevisionNumber());
+    sb.append(" Timestamp : ");
+    sb.append(this.getTransactionExpireTimeStamp());
+    sb.append("\n").append("Table : ");
+    sb.append(this.tableName).append("\n");
+    sb.append("Column Families : ");
+    sb.append(this.columnFamilies.toString());
+    return sb.toString();
+  }
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Thu Sep 12 01:21:10 2013
@@ -38,424 +38,424 @@ import org.slf4j.LoggerFactory;
  */
 public class ZKBasedRevisionManager implements RevisionManager {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ZKBasedRevisionManager.class);
-    private String zkHostList;
-    private String baseDir;
-    private ZKUtil zkUtil;
-    private long writeTxnTimeout;
-
-
-    /*
-     * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#initialize()
-     */
-    @Override
-    public void initialize(Configuration conf) {
-        conf = new Configuration(conf);
-        if (conf.get(RMConstants.ZOOKEEPER_HOSTLIST) == null) {
-            String zkHostList = conf.get(HConstants.ZOOKEEPER_QUORUM);
-            int port = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
-                HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-            String[] splits = zkHostList.split(",");
-            StringBuffer sb = new StringBuffer();
-            for (String split : splits) {
-                sb.append(split);
-                sb.append(':');
-                sb.append(port);
-                sb.append(',');
-            }
-            sb.deleteCharAt(sb.length() - 1);
-            conf.set(RMConstants.ZOOKEEPER_HOSTLIST, sb.toString());
-        }
-        this.zkHostList = conf.get(RMConstants.ZOOKEEPER_HOSTLIST);
-        this.baseDir = conf.get(RMConstants.ZOOKEEPER_DATADIR);
-        this.writeTxnTimeout = Long.parseLong(conf.get(RMConstants.WRITE_TRANSACTION_TIMEOUT));
-    }
-
-    /**
-     * Open a ZooKeeper connection
-     * @throws java.io.IOException
-     */
-
-    public void open() throws IOException {
-        zkUtil = new ZKUtil(zkHostList, this.baseDir);
-        zkUtil.createRootZNodes();
-        LOG.info("Created root znodes for revision manager.");
-    }
-
-    /**
-     * Close Zookeeper connection
-     */
-    public void close() {
-        zkUtil.closeZKConnection();
-    }
-
-    private void checkInputParams(String table, List<String> families) {
-        if (table == null) {
-            throw new IllegalArgumentException(
-                "The table name must be specified for reading.");
-        }
-        if (families == null || families.isEmpty()) {
-            throw new IllegalArgumentException(
-                "At least one column family should be specified for reading.");
-        }
-    }
-
-    @Override
-    public void createTable(String table, List<String> columnFamilies) throws IOException {
-        zkUtil.createRootZNodes();
-        zkUtil.setUpZnodesForTable(table, columnFamilies);
-    }
-
-    @Override
-    public void dropTable(String table) throws IOException {
-        zkUtil.deleteZNodes(table);
-    }
-
-    /* @param table
-    /* @param families
-    /* @param keepAlive
-    /* @return
-    /* @throws IOException
-     * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List, long)
-     */
-    public Transaction beginWriteTransaction(String table,
-                                             List<String> families, long keepAlive) throws IOException {
-
-        checkInputParams(table, families);
-        zkUtil.setUpZnodesForTable(table, families);
-        long nextId = zkUtil.nextId(table);
-        long expireTimestamp = zkUtil.getTimeStamp();
-        Transaction transaction = new Transaction(table, families, nextId,
-            expireTimestamp);
-        if (keepAlive != -1) {
-            transaction.setKeepAlive(keepAlive);
-        } else {
-            transaction.setKeepAlive(writeTxnTimeout);
-        }
-
-        refreshTransactionList(transaction.getTableName());
-        String lockPath = prepareLockNode(table);
-        WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
-            Ids.OPEN_ACL_UNSAFE);
-        RMLockListener myLockListener = new RMLockListener();
-        wLock.setLockListener(myLockListener);
-        try {
-            boolean lockGrabbed = wLock.lock();
-            if (lockGrabbed == false) {
-                //TO DO : Let this request queue up and try obtaining lock.
-                throw new IOException(
-                    "Unable to obtain lock while beginning transaction. "
-                        + transaction.toString());
-            } else {
-                List<String> colFamilies = transaction.getColumnFamilies();
-                FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
-                for (String cfamily : colFamilies) {
-                    String path = PathUtil.getRunningTxnInfoPath(
-                        baseDir, table, cfamily);
-                    zkUtil.updateData(path, revisionData,
-                        ZKUtil.UpdateMode.APPEND);
-                }
-            }
-        } catch (KeeperException e) {
-            throw new IOException("Exception while obtaining lock.", e);
-        } catch (InterruptedException e) {
-            throw new IOException("Exception while obtaining lock.", e);
-        } finally {
-            wLock.unlock();
-        }
-
-        return transaction;
-    }
-
-    /* @param table The table name.
-    /* @param families The column families involved in the transaction.
-    /* @return transaction The transaction which was started.
-    /* @throws IOException
-     * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List)
-     */
-    public Transaction beginWriteTransaction(String table, List<String> families)
-        throws IOException {
-        return beginWriteTransaction(table, families, -1);
-    }
-
-    /**
-     * This method commits a write transaction.
-     * @param transaction The revision information associated with transaction.
-     * @throws java.io.IOException
-     */
-    public void commitWriteTransaction(Transaction transaction) throws IOException {
-        refreshTransactionList(transaction.getTableName());
-
-        String lockPath = prepareLockNode(transaction.getTableName());
-        WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
-            Ids.OPEN_ACL_UNSAFE);
-        RMLockListener myLockListener = new RMLockListener();
-        wLock.setLockListener(myLockListener);
-        try {
-            boolean lockGrabbed = wLock.lock();
-            if (lockGrabbed == false) {
-                //TO DO : Let this request queue up and try obtaining lock.
-                throw new IOException(
-                    "Unable to obtain lock while commiting transaction. "
-                        + transaction.toString());
-            } else {
-                String tableName = transaction.getTableName();
-                List<String> colFamilies = transaction.getColumnFamilies();
-                FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
-                for (String cfamily : colFamilies) {
-                    String path = PathUtil.getRunningTxnInfoPath(
-                        baseDir, tableName, cfamily);
-                    zkUtil.updateData(path, revisionData,
-                        ZKUtil.UpdateMode.REMOVE);
-                }
-
-            }
-        } catch (KeeperException e) {
-            throw new IOException("Exception while obtaining lock.", e);
-        } catch (InterruptedException e) {
-            throw new IOException("Exception while obtaining lock.", e);
-        } finally {
-            wLock.unlock();
-        }
-        LOG.info("Write Transaction committed: " + transaction.toString());
-    }
-
-    /**
-     * This method aborts a write transaction.
-     * @param transaction
-     * @throws java.io.IOException
-     */
-    public void abortWriteTransaction(Transaction transaction) throws IOException {
-
-        refreshTransactionList(transaction.getTableName());
-        String lockPath = prepareLockNode(transaction.getTableName());
-        WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
-            Ids.OPEN_ACL_UNSAFE);
-        RMLockListener myLockListener = new RMLockListener();
-        wLock.setLockListener(myLockListener);
-        try {
-            boolean lockGrabbed = wLock.lock();
-            if (lockGrabbed == false) {
-                //TO DO : Let this request queue up and try obtaining lock.
-                throw new IOException(
-                    "Unable to obtain lock while aborting transaction. "
-                        + transaction.toString());
-            } else {
-                String tableName = transaction.getTableName();
-                List<String> colFamilies = transaction.getColumnFamilies();
-                FamilyRevision revisionData = transaction
-                    .getFamilyRevisionInfo();
-                for (String cfamily : colFamilies) {
-                    String path = PathUtil.getRunningTxnInfoPath(
-                        baseDir, tableName, cfamily);
-                    zkUtil.updateData(path, revisionData,
-                        ZKUtil.UpdateMode.REMOVE);
-                    path = PathUtil.getAbortInformationPath(baseDir,
-                        tableName, cfamily);
-                    zkUtil.updateData(path, revisionData,
-                        ZKUtil.UpdateMode.APPEND);
-                }
-
-            }
-        } catch (KeeperException e) {
-            throw new IOException("Exception while obtaining lock.", e);
-        } catch (InterruptedException e) {
-            throw new IOException("Exception while obtaining lock.", e);
-        } finally {
-            wLock.unlock();
-        }
-        LOG.info("Write Transaction aborted: " + transaction.toString());
+  private static final Logger LOG = LoggerFactory.getLogger(ZKBasedRevisionManager.class);
+  private String zkHostList;
+  private String baseDir;
+  private ZKUtil zkUtil;
+  private long writeTxnTimeout;
+
+
+  /*
+   * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#initialize()
+   */
+  @Override
+  public void initialize(Configuration conf) {
+    conf = new Configuration(conf);
+    if (conf.get(RMConstants.ZOOKEEPER_HOSTLIST) == null) {
+      String zkHostList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+      int port = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
+        HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+      String[] splits = zkHostList.split(",");
+      StringBuffer sb = new StringBuffer();
+      for (String split : splits) {
+        sb.append(split);
+        sb.append(':');
+        sb.append(port);
+        sb.append(',');
+      }
+      sb.deleteCharAt(sb.length() - 1);
+      conf.set(RMConstants.ZOOKEEPER_HOSTLIST, sb.toString());
+    }
+    this.zkHostList = conf.get(RMConstants.ZOOKEEPER_HOSTLIST);
+    this.baseDir = conf.get(RMConstants.ZOOKEEPER_DATADIR);
+    this.writeTxnTimeout = Long.parseLong(conf.get(RMConstants.WRITE_TRANSACTION_TIMEOUT));
+  }
+
+  /**
+   * Open a ZooKeeper connection
+   * @throws java.io.IOException
+   */
+
+  public void open() throws IOException {
+    zkUtil = new ZKUtil(zkHostList, this.baseDir);
+    zkUtil.createRootZNodes();
+    LOG.info("Created root znodes for revision manager.");
+  }
+
+  /**
+   * Close Zookeeper connection
+   */
+  public void close() {
+    zkUtil.closeZKConnection();
+  }
+
+  private void checkInputParams(String table, List<String> families) {
+    if (table == null) {
+      throw new IllegalArgumentException(
+        "The table name must be specified for reading.");
+    }
+    if (families == null || families.isEmpty()) {
+      throw new IllegalArgumentException(
+        "At least one column family should be specified for reading.");
+    }
+  }
+
+  @Override
+  public void createTable(String table, List<String> columnFamilies) throws IOException {
+    zkUtil.createRootZNodes();
+    zkUtil.setUpZnodesForTable(table, columnFamilies);
+  }
+
+  @Override
+  public void dropTable(String table) throws IOException {
+    zkUtil.deleteZNodes(table);
+  }
+
+  /* @param table
+  /* @param families
+  /* @param keepAlive
+  /* @return
+  /* @throws IOException
+   * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List, long)
+   */
+  public Transaction beginWriteTransaction(String table,
+                       List<String> families, long keepAlive) throws IOException {
+
+    checkInputParams(table, families);
+    zkUtil.setUpZnodesForTable(table, families);
+    long nextId = zkUtil.nextId(table);
+    long expireTimestamp = zkUtil.getTimeStamp();
+    Transaction transaction = new Transaction(table, families, nextId,
+      expireTimestamp);
+    if (keepAlive != -1) {
+      transaction.setKeepAlive(keepAlive);
+    } else {
+      transaction.setKeepAlive(writeTxnTimeout);
+    }
+
+    refreshTransactionList(transaction.getTableName());
+    String lockPath = prepareLockNode(table);
+    WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+      Ids.OPEN_ACL_UNSAFE);
+    RMLockListener myLockListener = new RMLockListener();
+    wLock.setLockListener(myLockListener);
+    try {
+      boolean lockGrabbed = wLock.lock();
+      if (lockGrabbed == false) {
+        //TO DO : Let this request queue up and try obtaining lock.
+        throw new IOException(
+          "Unable to obtain lock while beginning transaction. "
+            + transaction.toString());
+      } else {
+        List<String> colFamilies = transaction.getColumnFamilies();
+        FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+        for (String cfamily : colFamilies) {
+          String path = PathUtil.getRunningTxnInfoPath(
+            baseDir, table, cfamily);
+          zkUtil.updateData(path, revisionData,
+            ZKUtil.UpdateMode.APPEND);
+        }
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Exception while obtaining lock.", e);
+    } catch (InterruptedException e) {
+      throw new IOException("Exception while obtaining lock.", e);
+    } finally {
+      wLock.unlock();
+    }
+
+    return transaction;
+  }
+
+  /* @param table The table name.
+  /* @param families The column families involved in the transaction.
+  /* @return transaction The transaction which was started.
+  /* @throws IOException
+   * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List)
+   */
+  public Transaction beginWriteTransaction(String table, List<String> families)
+    throws IOException {
+    return beginWriteTransaction(table, families, -1);
+  }
+
+  /**
+   * This method commits a write transaction.
+   * @param transaction The revision information associated with transaction.
+   * @throws java.io.IOException
+   */
+  public void commitWriteTransaction(Transaction transaction) throws IOException {
+    refreshTransactionList(transaction.getTableName());
+
+    String lockPath = prepareLockNode(transaction.getTableName());
+    WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+      Ids.OPEN_ACL_UNSAFE);
+    RMLockListener myLockListener = new RMLockListener();
+    wLock.setLockListener(myLockListener);
+    try {
+      boolean lockGrabbed = wLock.lock();
+      if (lockGrabbed == false) {
+        //TO DO : Let this request queue up and try obtaining lock.
+        throw new IOException(
+          "Unable to obtain lock while commiting transaction. "
+            + transaction.toString());
+      } else {
+        String tableName = transaction.getTableName();
+        List<String> colFamilies = transaction.getColumnFamilies();
+        FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+        for (String cfamily : colFamilies) {
+          String path = PathUtil.getRunningTxnInfoPath(
+            baseDir, tableName, cfamily);
+          zkUtil.updateData(path, revisionData,
+            ZKUtil.UpdateMode.REMOVE);
+        }
+
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Exception while obtaining lock.", e);
+    } catch (InterruptedException e) {
+      throw new IOException("Exception while obtaining lock.", e);
+    } finally {
+      wLock.unlock();
+    }
+    LOG.info("Write Transaction committed: " + transaction.toString());
+  }
+
+  /**
+   * This method aborts a write transaction.
+   * @param transaction
+   * @throws java.io.IOException
+   */
+  public void abortWriteTransaction(Transaction transaction) throws IOException {
+
+    refreshTransactionList(transaction.getTableName());
+    String lockPath = prepareLockNode(transaction.getTableName());
+    WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+      Ids.OPEN_ACL_UNSAFE);
+    RMLockListener myLockListener = new RMLockListener();
+    wLock.setLockListener(myLockListener);
+    try {
+      boolean lockGrabbed = wLock.lock();
+      if (lockGrabbed == false) {
+        //TO DO : Let this request queue up and try obtaining lock.
+        throw new IOException(
+          "Unable to obtain lock while aborting transaction. "
+            + transaction.toString());
+      } else {
+        String tableName = transaction.getTableName();
+        List<String> colFamilies = transaction.getColumnFamilies();
+        FamilyRevision revisionData = transaction
+          .getFamilyRevisionInfo();
+        for (String cfamily : colFamilies) {
+          String path = PathUtil.getRunningTxnInfoPath(
+            baseDir, tableName, cfamily);
+          zkUtil.updateData(path, revisionData,
+            ZKUtil.UpdateMode.REMOVE);
+          path = PathUtil.getAbortInformationPath(baseDir,
+            tableName, cfamily);
+          zkUtil.updateData(path, revisionData,
+            ZKUtil.UpdateMode.APPEND);
+        }
+
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Exception while obtaining lock.", e);
+    } catch (InterruptedException e) {
+      throw new IOException("Exception while obtaining lock.", e);
+    } finally {
+      wLock.unlock();
     }
+    LOG.info("Write Transaction aborted: " + transaction.toString());
+  }
 
 
-    /* @param transaction
+  /* @param transaction
    /* @throws IOException
-    * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#keepAlive(org.apache.hcatalog.hbase.snapshot.Transaction)
-    */
-    public void keepAlive(Transaction transaction)
-        throws IOException {
-
-        refreshTransactionList(transaction.getTableName());
-        transaction.keepAliveTransaction();
-        String lockPath = prepareLockNode(transaction.getTableName());
-        WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
-            Ids.OPEN_ACL_UNSAFE);
-        RMLockListener myLockListener = new RMLockListener();
-        wLock.setLockListener(myLockListener);
-        try {
-            boolean lockGrabbed = wLock.lock();
-            if (lockGrabbed == false) {
-                //TO DO : Let this request queue up and try obtaining lock.
-                throw new IOException(
-                    "Unable to obtain lock for keep alive of transaction. "
-                        + transaction.toString());
-            } else {
-                String tableName = transaction.getTableName();
-                List<String> colFamilies = transaction.getColumnFamilies();
-                FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
-                for (String cfamily : colFamilies) {
-                    String path = PathUtil.getRunningTxnInfoPath(
-                        baseDir, tableName, cfamily);
-                    zkUtil.updateData(path, revisionData,
-                        ZKUtil.UpdateMode.KEEP_ALIVE);
-                }
-
-            }
-        } catch (KeeperException e) {
-            throw new IOException("Exception while obtaining lock.", e);
-        } catch (InterruptedException e) {
-            throw new IOException("Exception while obtaining lock.", e);
-        } finally {
-            wLock.unlock();
-        }
-
-    }
-
-    /* This method allows the user to create latest snapshot of a
-    /* table.
-    /* @param tableName The table whose snapshot is being created.
-    /* @return TableSnapshot An instance of TableSnaphot
-    /* @throws IOException
-     * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String)
-     */
-    public TableSnapshot createSnapshot(String tableName) throws IOException {
-        refreshTransactionList(tableName);
-        long latestID = zkUtil.currentID(tableName);
-        HashMap<String, Long> cfMap = new HashMap<String, Long>();
-        List<String> columnFamilyNames = zkUtil.getColumnFamiliesOfTable(tableName);
-
-        for (String cfName : columnFamilyNames) {
-            String cfPath = PathUtil.getRunningTxnInfoPath(baseDir, tableName, cfName);
-            List<FamilyRevision> tranxList = zkUtil.getTransactionList(cfPath);
-            long version;
-            if (!tranxList.isEmpty()) {
-                Collections.sort(tranxList);
-                // get the smallest running Transaction ID
-                long runningVersion = tranxList.get(0).getRevision();
-                version = runningVersion - 1;
-            } else {
-                version = latestID;
-            }
-            cfMap.put(cfName, version);
-        }
-
-        TableSnapshot snapshot = new TableSnapshot(tableName, cfMap, latestID);
-        LOG.debug("Created snapshot For table: " + tableName + " snapshot: " + snapshot);
-        return snapshot;
-    }
-
-    /* This method allows the user to create snapshot of a
-    /* table with a given revision number.
-    /* @param tableName
-    /* @param revision
-    /* @return TableSnapshot
-    /* @throws IOException
-     * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String, long)
-     */
-    public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
-
-        long currentID = zkUtil.currentID(tableName);
-        if (revision > currentID) {
-            throw new IOException(
-                "The revision specified in the snapshot is higher than the current revision of the table.");
-        }
-        refreshTransactionList(tableName);
-        HashMap<String, Long> cfMap = new HashMap<String, Long>();
-        List<String> columnFamilies = zkUtil.getColumnFamiliesOfTable(tableName);
+  * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#keepAlive(org.apache.hcatalog.hbase.snapshot.Transaction)
+  */
+  public void keepAlive(Transaction transaction)
+    throws IOException {
+
+    refreshTransactionList(transaction.getTableName());
+    transaction.keepAliveTransaction();
+    String lockPath = prepareLockNode(transaction.getTableName());
+    WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+      Ids.OPEN_ACL_UNSAFE);
+    RMLockListener myLockListener = new RMLockListener();
+    wLock.setLockListener(myLockListener);
+    try {
+      boolean lockGrabbed = wLock.lock();
+      if (lockGrabbed == false) {
+        //TO DO : Let this request queue up and try obtaining lock.
+        throw new IOException(
+          "Unable to obtain lock for keep alive of transaction. "
+            + transaction.toString());
+      } else {
+        String tableName = transaction.getTableName();
+        List<String> colFamilies = transaction.getColumnFamilies();
+        FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+        for (String cfamily : colFamilies) {
+          String path = PathUtil.getRunningTxnInfoPath(
+            baseDir, tableName, cfamily);
+          zkUtil.updateData(path, revisionData,
+            ZKUtil.UpdateMode.KEEP_ALIVE);
+        }
+
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Exception while obtaining lock.", e);
+    } catch (InterruptedException e) {
+      throw new IOException("Exception while obtaining lock.", e);
+    } finally {
+      wLock.unlock();
+    }
+
+  }
+
+  /* This method allows the user to create latest snapshot of a
+  /* table.
+  /* @param tableName The table whose snapshot is being created.
+  /* @return TableSnapshot An instance of TableSnaphot
+  /* @throws IOException
+   * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String)
+   */
+  public TableSnapshot createSnapshot(String tableName) throws IOException {
+    refreshTransactionList(tableName);
+    long latestID = zkUtil.currentID(tableName);
+    HashMap<String, Long> cfMap = new HashMap<String, Long>();
+    List<String> columnFamilyNames = zkUtil.getColumnFamiliesOfTable(tableName);
+
+    for (String cfName : columnFamilyNames) {
+      String cfPath = PathUtil.getRunningTxnInfoPath(baseDir, tableName, cfName);
+      List<FamilyRevision> tranxList = zkUtil.getTransactionList(cfPath);
+      long version;
+      if (!tranxList.isEmpty()) {
+        Collections.sort(tranxList);
+        // get the smallest running Transaction ID
+        long runningVersion = tranxList.get(0).getRevision();
+        version = runningVersion - 1;
+      } else {
+        version = latestID;
+      }
+      cfMap.put(cfName, version);
+    }
+
+    TableSnapshot snapshot = new TableSnapshot(tableName, cfMap, latestID);
+    LOG.debug("Created snapshot For table: " + tableName + " snapshot: " + snapshot);
+    return snapshot;
+  }
+
+  /* This method allows the user to create snapshot of a
+  /* table with a given revision number.
+  /* @param tableName
+  /* @param revision
+  /* @return TableSnapshot
+  /* @throws IOException
+   * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String, long)
+   */
+  public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
+
+    long currentID = zkUtil.currentID(tableName);
+    if (revision > currentID) {
+      throw new IOException(
+        "The revision specified in the snapshot is higher than the current revision of the table.");
+    }
+    refreshTransactionList(tableName);
+    HashMap<String, Long> cfMap = new HashMap<String, Long>();
+    List<String> columnFamilies = zkUtil.getColumnFamiliesOfTable(tableName);
+
+    for (String cf : columnFamilies) {
+      cfMap.put(cf, revision);
+    }
+
+    return new TableSnapshot(tableName, cfMap, revision);
+  }
+
+  /**
+   * Get the list of in-progress Transactions for a column family
+   * @param table the table name
+   * @param columnFamily the column family name
+   * @return a list of in-progress WriteTransactions
+   * @throws java.io.IOException
+   */
+  List<FamilyRevision> getRunningTransactions(String table,
+                        String columnFamily) throws IOException {
+    String path = PathUtil.getRunningTxnInfoPath(baseDir, table,
+      columnFamily);
+    return zkUtil.getTransactionList(path);
+  }
+
+  @Override
+  public List<FamilyRevision> getAbortedWriteTransactions(String table,
+                              String columnFamily) throws IOException {
+    String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
+    return zkUtil.getTransactionList(path);
+  }
+
+  private void refreshTransactionList(String tableName) throws IOException {
+    String lockPath = prepareLockNode(tableName);
+    WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+      Ids.OPEN_ACL_UNSAFE);
+    RMLockListener myLockListener = new RMLockListener();
+    wLock.setLockListener(myLockListener);
+    try {
+      boolean lockGrabbed = wLock.lock();
+      if (lockGrabbed == false) {
+        //TO DO : Let this request queue up and try obtaining lock.
+        throw new IOException(
+          "Unable to obtain lock while refreshing transactions of table "
+            + tableName + ".");
+      } else {
+        List<String> cfPaths = zkUtil
+          .getColumnFamiliesOfTable(tableName);
+        for (String cf : cfPaths) {
+          String runningDataPath = PathUtil.getRunningTxnInfoPath(
+            baseDir, tableName, cf);
+          zkUtil.refreshTransactions(runningDataPath);
+        }
+
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Exception while obtaining lock.", e);
+    } catch (InterruptedException e) {
+      throw new IOException("Exception while obtaining lock.", e);
+    } finally {
+      wLock.unlock();
+    }
+
+  }
+
+  private String prepareLockNode(String tableName) throws IOException {
+    String txnDataPath = PathUtil.getTxnDataPath(this.baseDir, tableName);
+    String lockPath = PathUtil.getLockManagementNode(txnDataPath);
+    zkUtil.ensurePathExists(lockPath, null, Ids.OPEN_ACL_UNSAFE,
+      CreateMode.PERSISTENT);
+    return lockPath;
+  }
+
+  /*
+   * This class is a listener class for the locks used in revision management.
+   * TBD: Use the following class to signal that that the lock is actually
+   * been granted.
+   */
+  class RMLockListener implements LockListener {
 
-        for (String cf : columnFamilies) {
-            cfMap.put(cf, revision);
-        }
-
-        return new TableSnapshot(tableName, cfMap, revision);
-    }
-
-    /**
-     * Get the list of in-progress Transactions for a column family
-     * @param table the table name
-     * @param columnFamily the column family name
-     * @return a list of in-progress WriteTransactions
-     * @throws java.io.IOException
+    /*
+     * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
      */
-    List<FamilyRevision> getRunningTransactions(String table,
-                                                String columnFamily) throws IOException {
-        String path = PathUtil.getRunningTxnInfoPath(baseDir, table,
-            columnFamily);
-        return zkUtil.getTransactionList(path);
-    }
-
     @Override
-    public List<FamilyRevision> getAbortedWriteTransactions(String table,
-                                                            String columnFamily) throws IOException {
-        String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
-        return zkUtil.getTransactionList(path);
-    }
-
-    private void refreshTransactionList(String tableName) throws IOException {
-        String lockPath = prepareLockNode(tableName);
-        WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
-            Ids.OPEN_ACL_UNSAFE);
-        RMLockListener myLockListener = new RMLockListener();
-        wLock.setLockListener(myLockListener);
-        try {
-            boolean lockGrabbed = wLock.lock();
-            if (lockGrabbed == false) {
-                //TO DO : Let this request queue up and try obtaining lock.
-                throw new IOException(
-                    "Unable to obtain lock while refreshing transactions of table "
-                        + tableName + ".");
-            } else {
-                List<String> cfPaths = zkUtil
-                    .getColumnFamiliesOfTable(tableName);
-                for (String cf : cfPaths) {
-                    String runningDataPath = PathUtil.getRunningTxnInfoPath(
-                        baseDir, tableName, cf);
-                    zkUtil.refreshTransactions(runningDataPath);
-                }
-
-            }
-        } catch (KeeperException e) {
-            throw new IOException("Exception while obtaining lock.", e);
-        } catch (InterruptedException e) {
-            throw new IOException("Exception while obtaining lock.", e);
-        } finally {
-            wLock.unlock();
-        }
-
-    }
+    public void lockAcquired() {
 
-    private String prepareLockNode(String tableName) throws IOException {
-        String txnDataPath = PathUtil.getTxnDataPath(this.baseDir, tableName);
-        String lockPath = PathUtil.getLockManagementNode(txnDataPath);
-        zkUtil.ensurePathExists(lockPath, null, Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-        return lockPath;
     }
 
     /*
-     * This class is a listener class for the locks used in revision management.
-     * TBD: Use the following class to signal that that the lock is actually
-     * been granted.
+     * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased()
      */
-    class RMLockListener implements LockListener {
-
-        /*
-         * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
-         */
-        @Override
-        public void lockAcquired() {
-
-        }
-
-        /*
-         * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased()
-         */
-        @Override
-        public void lockReleased() {
-
-        }
+    @Override
+    public void lockReleased() {
 
     }
 
+  }
+
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java Thu Sep 12 01:21:10 2013
@@ -44,476 +44,482 @@ import org.slf4j.LoggerFactory;
 
 class ZKUtil {
 
-    private int DEFAULT_SESSION_TIMEOUT = 1000000;
-    private ZooKeeper zkSession;
-    private String baseDir;
-    private String connectString;
-    private static final Logger LOG = LoggerFactory.getLogger(ZKUtil.class);
-
-    static enum UpdateMode {
-        APPEND, REMOVE, KEEP_ALIVE
-    }
-
-    ;
-
-    ZKUtil(String connection, String baseDir) {
-        this.connectString = connection;
-        this.baseDir = baseDir;
-    }
-
-    /**
-     * This method creates znodes related to table.
-     *
-     * @param table The name of the table.
-     * @param families The list of column families of the table.
-     * @throws IOException
-     */
-    void setUpZnodesForTable(String table, List<String> families)
-        throws IOException {
-
-        String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir, table);
-        ensurePathExists(transactionDataTablePath, null, Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-        for (String cf : families) {
-            String runningDataPath = PathUtil.getRunningTxnInfoPath(
-                this.baseDir, table, cf);
-            ensurePathExists(runningDataPath, null, Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
-            String abortDataPath = PathUtil.getAbortInformationPath(
-                this.baseDir, table, cf);
-            ensurePathExists(abortDataPath, null, Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
-        }
-
-    }
-
-    /**
-     * This method ensures that a given path exists in zookeeper. If the path
-     * does not exists, it creates one.
-     *
-     * @param path The path of znode that is required to exist.
-     * @param data The data to be associated with the znode.
-     * @param acl The ACLs required.
-     * @param flags The CreateMode for the znode.
-     * @throws IOException
-     */
-    void ensurePathExists(String path, byte[] data, List<ACL> acl,
-                          CreateMode flags) throws IOException {
-        String[] dirs = path.split("/");
-        String parentPath = "";
-        for (String subDir : dirs) {
-            if (subDir.equals("") == false) {
-                parentPath = parentPath + "/" + subDir;
-                try {
-                    Stat stat = getSession().exists(parentPath, false);
-                    if (stat == null) {
-                        getSession().create(parentPath, data, acl, flags);
-                    }
-                } catch (Exception e) {
-                    throw new IOException("Exception while creating path "
-                        + parentPath, e);
-                }
-            }
-        }
-
-    }
-
-    /**
-     * This method returns a list of columns of a table which were used in any
-     * of the transactions.
-     *
-     * @param tableName The name of table.
-     * @return List<String> The list of column families in table.
-     * @throws IOException
-     */
-    List<String> getColumnFamiliesOfTable(String tableName) throws IOException {
-        String path = PathUtil.getTxnDataPath(baseDir, tableName);
-        List<String> children = null;
-        List<String> columnFamlies = new ArrayList<String>();
-        try {
-            children = getSession().getChildren(path, false);
-        } catch (KeeperException e) {
-            LOG.warn("Caught: ", e);
-            throw new IOException("Exception while obtaining columns of table.", e);
-        } catch (InterruptedException e) {
-            LOG.warn("Caught: ", e);
-            throw new IOException("Exception while obtaining columns of table.", e);
-        }
-
-        for (String child : children) {
-            if ((child.contains("idgen") == false)
-                && (child.contains("_locknode_") == false)) {
-                columnFamlies.add(child);
-            }
-        }
-        return columnFamlies;
-    }
-
-    /**
-     * This method returns a time stamp for use by the transactions.
-     *
-     * @return long The current timestamp in zookeeper.
-     * @throws IOException
-     */
-    long getTimeStamp() throws IOException {
-        long timeStamp;
-        Stat stat;
-        String clockPath = PathUtil.getClockPath(this.baseDir);
-        ensurePathExists(clockPath, null, Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-        try {
-            getSession().exists(clockPath, false);
-            stat = getSession().setData(clockPath, null, -1);
-
-        } catch (KeeperException e) {
-            LOG.warn("Caught: ", e);
-            throw new IOException("Exception while obtaining timestamp ", e);
-        } catch (InterruptedException e) {
-            LOG.warn("Caught: ", e);
-            throw new IOException("Exception while obtaining timestamp ", e);
-        }
-        timeStamp = stat.getMtime();
-        return timeStamp;
-    }
-
-    /**
-     * This method returns the next revision number to be used for any
-     * transaction purposes.
-     *
-     * @param tableName The name of the table.
-     * @return revision number The revision number last used by any transaction.
-     * @throws IOException
-     */
-    long nextId(String tableName) throws IOException {
-        String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
-        ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-        String lockNode = PathUtil.getLockManagementNode(idNode);
-        ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-        IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
-        long id = idf.obtainID();
-        return id;
-    }
-
-    /**
-     * The latest used revision id of the table.
-     *
-     * @param tableName The name of the table.
-     * @return the long The revision number to use by any transaction.
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    long currentID(String tableName) throws IOException {
-        String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
-        ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-        String lockNode = PathUtil.getLockManagementNode(idNode);
-        ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-        IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
-        long id = idf.readID();
-        return id;
-    }
-
-    /**
-     * This methods retrieves the list of transaction information associated
-     * with each column/column family of a table.
-     *
-     * @param path The znode path
-     * @return List of FamilyRevision The list of transactions in the given path.
-     * @throws IOException
-     */
-    List<FamilyRevision> getTransactionList(String path)
-        throws IOException {
-
-        byte[] data = getRawData(path, new Stat());
-        ArrayList<FamilyRevision> wtxnList = new ArrayList<FamilyRevision>();
-        if (data == null) {
-            return wtxnList;
-        }
-        StoreFamilyRevisionList txnList = new StoreFamilyRevisionList();
-        deserialize(txnList, data);
-        Iterator<StoreFamilyRevision> itr = txnList.getRevisionListIterator();
-
-        while (itr.hasNext()) {
-            StoreFamilyRevision wtxn = itr.next();
-            wtxnList.add(new FamilyRevision(wtxn.getRevision(), wtxn
-                .getTimestamp()));
-        }
-
-        return wtxnList;
-    }
-
-    /**
-     * This method returns the data associated with the path in zookeeper.
-     *
-     * @param path The znode path
-     * @param stat Zookeeper stat
-     * @return byte array The data stored in the znode.
-     * @throws IOException
-     */
-    byte[] getRawData(String path, Stat stat) throws IOException {
-        byte[] data = null;
+  private int DEFAULT_SESSION_TIMEOUT = 1000000;
+  private ZooKeeper zkSession;
+  private String baseDir;
+  private String connectString;
+  private static final Logger LOG = LoggerFactory.getLogger(ZKUtil.class);
+
+  static enum UpdateMode {
+    APPEND, REMOVE, KEEP_ALIVE
+  }
+
+  ;
+
+  ZKUtil(String connection, String baseDir) {
+    this.connectString = connection;
+    this.baseDir = baseDir;
+  }
+
+  /**
+   * This method creates znodes related to table.
+   *
+   * @param table The name of the table.
+   * @param families The list of column families of the table.
+   * @throws IOException
+   */
+  void setUpZnodesForTable(String table, List<String> families)
+    throws IOException {
+
+    String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir, table);
+    ensurePathExists(transactionDataTablePath, null, Ids.OPEN_ACL_UNSAFE,
+      CreateMode.PERSISTENT);
+    for (String cf : families) {
+      String runningDataPath = PathUtil.getRunningTxnInfoPath(
+        this.baseDir, table, cf);
+      ensurePathExists(runningDataPath, null, Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+      String abortDataPath = PathUtil.getAbortInformationPath(
+        this.baseDir, table, cf);
+      ensurePathExists(abortDataPath, null, Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+    }
+
+  }
+
+  /**
+   * This method ensures that a given path exists in zookeeper. If the path
+   * does not exists, it creates one.
+   *
+   * @param path The path of znode that is required to exist.
+   * @param data The data to be associated with the znode.
+   * @param acl The ACLs required.
+   * @param flags The CreateMode for the znode.
+   * @throws IOException
+   */
+  void ensurePathExists(String path, byte[] data, List<ACL> acl,
+              CreateMode flags) throws IOException {
+    String[] dirs = path.split("/");
+    String parentPath = "";
+    for (String subDir : dirs) {
+      if (subDir.equals("") == false) {
+        parentPath = parentPath + "/" + subDir;
         try {
-            data = getSession().getData(path, false, stat);
+          Stat stat = getSession().exists(parentPath, false);
+          if (stat == null) {
+            getSession().create(parentPath, data, acl, flags);
+          }
         } catch (Exception e) {
-            throw new IOException(
-                "Exception while obtaining raw data from zookeeper path "
-                    + path, e);
+          throw new IOException("Exception while creating path "
+            + parentPath, e);
         }
-        return data;
+      }
     }
 
-    /**
-     * This method created the basic znodes in zookeeper for revision
-     * management.
-     *
-     * @throws IOException
-     */
-    void createRootZNodes() throws IOException {
-        String txnBaseNode = PathUtil.getTransactionBasePath(this.baseDir);
-        String clockNode = PathUtil.getClockPath(this.baseDir);
-        ensurePathExists(txnBaseNode, null, Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-        ensurePathExists(clockNode, null, Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-    }
-
-    /**
-     * This method closes the zookeeper session.
-     */
-    void closeZKConnection() {
-        if (zkSession != null) {
-            try {
-                zkSession.close();
-            } catch (InterruptedException e) {
-                LOG.warn("Close failed: ", e);
-            }
-            zkSession = null;
-            LOG.info("Disconnected to ZooKeeper");
-        }
-    }
+  }
 
-    /**
-     * This method returns a zookeeper session. If the current session is closed,
-     * then a new session is created.
-     *
-     * @return ZooKeeper An instance of zookeeper client.
-     * @throws IOException
-     */
-    ZooKeeper getSession() throws IOException {
+  /**
+   * This method returns a list of columns of a table which were used in any
+   * of the transactions.
+   *
+   * @param tableName The name of table.
+   * @return List<String> The list of column families in table.
+   * @throws IOException
+   */
+  List<String> getColumnFamiliesOfTable(String tableName) throws IOException {
+    String path = PathUtil.getTxnDataPath(baseDir, tableName);
+    List<String> children = null;
+    List<String> columnFamlies = new ArrayList<String>();
+    try {
+      children = getSession().getChildren(path, false);
+    } catch (KeeperException e) {
+      LOG.warn("Caught: ", e);
+      throw new IOException("Exception while obtaining columns of table.", e);
+    } catch (InterruptedException e) {
+      LOG.warn("Caught: ", e);
+      throw new IOException("Exception while obtaining columns of table.", e);
+    }
+
+    for (String child : children) {
+      if ((child.contains("idgen") == false)
+        && (child.contains("_locknode_") == false)) {
+        columnFamlies.add(child);
+      }
+    }
+    return columnFamlies;
+  }
+
+  /**
+   * This method returns a time stamp for use by the transactions.
+   *
+   * @return long The current timestamp in zookeeper.
+   * @throws IOException
+   */
+  long getTimeStamp() throws IOException {
+    long timeStamp;
+    Stat stat;
+    String clockPath = PathUtil.getClockPath(this.baseDir);
+    ensurePathExists(clockPath, null, Ids.OPEN_ACL_UNSAFE,
+      CreateMode.PERSISTENT);
+    try {
+      getSession().exists(clockPath, false);
+      stat = getSession().setData(clockPath, null, -1);
+
+    } catch (KeeperException e) {
+      LOG.warn("Caught: ", e);
+      throw new IOException("Exception while obtaining timestamp ", e);
+    } catch (InterruptedException e) {
+      LOG.warn("Caught: ", e);
+      throw new IOException("Exception while obtaining timestamp ", e);
+    }
+    timeStamp = stat.getMtime();
+    return timeStamp;
+  }
+
+  /**
+   * This method returns the next revision number to be used for any
+   * transaction purposes.
+   *
+   * @param tableName The name of the table.
+   * @return revision number The revision number last used by any transaction.
+   * @throws IOException
+   */
+  long nextId(String tableName) throws IOException {
+    String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
+    ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
+      CreateMode.PERSISTENT);
+    String lockNode = PathUtil.getLockManagementNode(idNode);
+    ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
+      CreateMode.PERSISTENT);
+    IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
+    long id = idf.obtainID();
+    return id;
+  }
+
+  /**
+   * The latest used revision id of the table.
+   *
+   * @param tableName The name of the table.
+   * @return the long The revision number to use by any transaction.
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  long currentID(String tableName) throws IOException {
+    String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
+    ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
+      CreateMode.PERSISTENT);
+    String lockNode = PathUtil.getLockManagementNode(idNode);
+    ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
+      CreateMode.PERSISTENT);
+    IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
+    long id = idf.readID();
+    return id;
+  }
+
+  /**
+   * This methods retrieves the list of transaction information associated
+   * with each column/column family of a table.
+   *
+   * @param path The znode path
+   * @return List of FamilyRevision The list of transactions in the given path.
+   * @throws IOException
+   */
+  List<FamilyRevision> getTransactionList(String path)
+    throws IOException {
+
+    byte[] data = getRawData(path, new Stat());
+    ArrayList<FamilyRevision> wtxnList = new ArrayList<FamilyRevision>();
+    if (data == null) {
+      return wtxnList;
+    }
+    StoreFamilyRevisionList txnList = new StoreFamilyRevisionList();
+    deserialize(txnList, data);
+    Iterator<StoreFamilyRevision> itr = txnList.getRevisionListIterator();
+
+    while (itr.hasNext()) {
+      StoreFamilyRevision wtxn = itr.next();
+      wtxnList.add(new FamilyRevision(wtxn.getRevision(), wtxn
+        .getTimestamp()));
+    }
+
+    return wtxnList;
+  }
+
+  /**
+   * This method returns the data associated with the path in zookeeper.
+   *
+   * @param path The znode path
+   * @param stat Zookeeper stat
+   * @return byte array The data stored in the znode.
+   * @throws IOException
+   */
+  byte[] getRawData(String path, Stat stat) throws IOException {
+    byte[] data = null;
+    try {
+      data = getSession().getData(path, false, stat);
+    } catch (Exception e) {
+      throw new IOException(
+        "Exception while obtaining raw data from zookeeper path "
+          + path, e);
+    }
+    return data;
+  }
+
+  /**
+   * This method created the basic znodes in zookeeper for revision
+   * management.
+   *
+   * @throws IOException
+   */
+  void createRootZNodes() throws IOException {
+    String txnBaseNode = PathUtil.getTransactionBasePath(this.baseDir);
+    String clockNode = PathUtil.getClockPath(this.baseDir);
+    ensurePathExists(txnBaseNode, null, Ids.OPEN_ACL_UNSAFE,
+      CreateMode.PERSISTENT);
+    ensurePathExists(clockNode, null, Ids.OPEN_ACL_UNSAFE,
+      CreateMode.PERSISTENT);
+  }
+
+  /**
+   * This method closes the zookeeper session.
+   */
+  void closeZKConnection() {
+    if (zkSession != null) {
+      try {
+        zkSession.close();
+      } catch (InterruptedException e) {
+        LOG.warn("Close failed: ", e);
+      }
+      zkSession = null;
+      LOG.info("Disconnected to ZooKeeper");
+    }
+  }
+
+  /**
+   * This method returns a zookeeper session. If the current session is closed,
+   * then a new session is created.
+   *
+   * @return ZooKeeper An instance of zookeeper client.
+   * @throws IOException
+   */
+  ZooKeeper getSession() throws IOException {
+    if (zkSession == null || zkSession.getState() == States.CLOSED) {
+      synchronized (this) {
         if (zkSession == null || zkSession.getState() == States.CLOSED) {
-            synchronized (this) {
-                if (zkSession == null || zkSession.getState() == States.CLOSED) {
-                    zkSession = new ZooKeeper(this.connectString,
-                        this.DEFAULT_SESSION_TIMEOUT, new ZKWatcher());
-                }
-            }
-        }
-        return zkSession;
-    }
-
-    /**
-     * This method updates the transaction data related to a znode.
-     *
-     * @param path The path to the transaction data.
-     * @param updateTx The FamilyRevision to be updated.
-     * @param mode The mode to update like append, update, remove.
-     * @throws IOException
-     */
-    void updateData(String path, FamilyRevision updateTx, UpdateMode mode)
-        throws IOException {
-
-        if (updateTx == null) {
-            throw new IOException(
-                "The transaction to be updated found to be null.");
-        }
-        List<FamilyRevision> currentData = getTransactionList(path);
-        List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
-        boolean dataFound = false;
-        long updateVersion = updateTx.getRevision();
-        for (FamilyRevision tranx : currentData) {
-            if (tranx.getRevision() != updateVersion) {
-                newData.add(tranx);
-            } else {
-                dataFound = true;
-            }
-        }
-        switch (mode) {
-        case REMOVE:
-            if (dataFound == false) {
-                throw new IOException(
-                    "The transaction to be removed not found in the data.");
-            }
-            LOG.info("Removed trasaction : " + updateTx.toString());
-            break;
-        case KEEP_ALIVE:
-            if (dataFound == false) {
-                throw new IOException(
-                    "The transaction to be kept alove not found in the data. It might have been expired.");
-            }
-            newData.add(updateTx);
-            LOG.info("keep alive of transaction : " + updateTx.toString());
-            break;
-        case APPEND:
-            if (dataFound == true) {
-                throw new IOException(
-                    "The data to be appended already exists.");
-            }
-            newData.add(updateTx);
-            LOG.info("Added transaction : " + updateTx.toString());
-            break;
-        }
-
-        // For serialization purposes.
-        List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
-        for (FamilyRevision wtxn : newData) {
-            StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
-                wtxn.getExpireTimestamp());
-            newTxnList.add(newTxn);
-        }
-        StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
-        byte[] newByteData = serialize(wtxnList);
-
-        Stat stat = null;
-        try {
-            stat = zkSession.setData(path, newByteData, -1);
-        } catch (KeeperException e) {
-            throw new IOException(
-                "Exception while updating trasactional data. ", e);
-        } catch (InterruptedException e) {
-            throw new IOException(
-                "Exception while updating trasactional data. ", e);
-        }
-
-        if (stat != null) {
-            LOG.info("Transaction list stored at " + path + ".");
-        }
-
-    }
-
-    /**
-     * Refresh transactions on a given transaction data path.
-     *
-     * @param path The path to the transaction data.
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    void refreshTransactions(String path) throws IOException {
-        List<FamilyRevision> currentData = getTransactionList(path);
-        List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
-
-        for (FamilyRevision tranx : currentData) {
-            if (tranx.getExpireTimestamp() > getTimeStamp()) {
-                newData.add(tranx);
-            }
-        }
-
-        if (newData.equals(currentData) == false) {
-            List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
-            for (FamilyRevision wtxn : newData) {
-                StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
-                    wtxn.getExpireTimestamp());
-                newTxnList.add(newTxn);
-            }
-            StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
-            byte[] newByteData = serialize(wtxnList);
-
+          zkSession = new ZooKeeper(this.connectString,
+            this.DEFAULT_SESSION_TIMEOUT, new ZKWatcher());
+          while (zkSession.getState() == States.CONNECTING) {
             try {
-                zkSession.setData(path, newByteData, -1);
-            } catch (KeeperException e) {
-                throw new IOException(
-                    "Exception while updating trasactional data. ", e);
+              Thread.sleep(1000);
             } catch (InterruptedException e) {
-                throw new IOException(
-                    "Exception while updating trasactional data. ", e);
-            }
-
-        }
-
-    }
-
-    /**
-     * Delete table znodes.
-     *
-     * @param tableName the hbase table name
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    void deleteZNodes(String tableName) throws IOException {
-        String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir,
-            tableName);
-        deleteRecursively(transactionDataTablePath);
-    }
-
-    void deleteRecursively(String path) throws IOException {
-        try {
-            List<String> children = getSession().getChildren(path, false);
-            if (children.size() != 0) {
-                for (String child : children) {
-                    deleteRecursively(path + "/" + child);
-                }
             }
-            getSession().delete(path, -1);
-        } catch (KeeperException e) {
-            throw new IOException(
-                "Exception while deleting path " + path + ".", e);
-        } catch (InterruptedException e) {
-            throw new IOException(
-                "Exception while deleting path " + path + ".", e);
-        }
-    }
-
-    /**
-     * This method serializes a given instance of TBase object.
-     *
-     * @param obj An instance of TBase
-     * @return byte array The serialized data.
-     * @throws IOException
-     */
-    static byte[] serialize(TBase obj) throws IOException {
-        if (obj == null)
-            return new byte[0];
-        try {
-            TSerializer serializer = new TSerializer(
-                new TBinaryProtocol.Factory());
-            byte[] bytes = serializer.serialize(obj);
-            return bytes;
-        } catch (Exception e) {
-            throw new IOException("Serialization error: ", e);
+          }
         }
+      }
     }
+    return zkSession;
+  }
 
-
-    /**
-     * This method deserializes the given byte array into the TBase object.
-     *
-     * @param obj An instance of TBase
-     * @param data Output of deserialization.
-     * @throws IOException
-     */
-    static void deserialize(TBase obj, byte[] data) throws IOException {
-        if (data == null || data.length == 0)
-            return;
-        try {
-            TDeserializer deserializer = new TDeserializer(
-                new TBinaryProtocol.Factory());
-            deserializer.deserialize(obj, data);
-        } catch (Exception e) {
-            throw new IOException("Deserialization error: " + e.getMessage(), e);
+  /**
+   * This method updates the transaction data related to a znode.
+   *
+   * @param path The path to the transaction data.
+   * @param updateTx The FamilyRevision to be updated.
+   * @param mode The mode to update like append, update, remove.
+   * @throws IOException
+   */
+  void updateData(String path, FamilyRevision updateTx, UpdateMode mode)
+    throws IOException {
+
+    if (updateTx == null) {
+      throw new IOException(
+        "The transaction to be updated found to be null.");
+    }
+    List<FamilyRevision> currentData = getTransactionList(path);
+    List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
+    boolean dataFound = false;
+    long updateVersion = updateTx.getRevision();
+    for (FamilyRevision tranx : currentData) {
+      if (tranx.getRevision() != updateVersion) {
+        newData.add(tranx);
+      } else {
+        dataFound = true;
+      }
+    }
+    switch (mode) {
+    case REMOVE:
+      if (dataFound == false) {
+        throw new IOException(
+          "The transaction to be removed not found in the data.");
+      }
+      LOG.info("Removed trasaction : " + updateTx.toString());
+      break;
+    case KEEP_ALIVE:
+      if (dataFound == false) {
+        throw new IOException(
+          "The transaction to be kept alove not found in the data. It might have been expired.");
+      }
+      newData.add(updateTx);
+      LOG.info("keep alive of transaction : " + updateTx.toString());
+      break;
+    case APPEND:
+      if (dataFound == true) {
+        throw new IOException(
+          "The data to be appended already exists.");
+      }
+      newData.add(updateTx);
+      LOG.info("Added transaction : " + updateTx.toString());
+      break;
+    }
+
+    // For serialization purposes.
+    List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
+    for (FamilyRevision wtxn : newData) {
+      StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
+        wtxn.getExpireTimestamp());
+      newTxnList.add(newTxn);
+    }
+    StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
+    byte[] newByteData = serialize(wtxnList);
+
+    Stat stat = null;
+    try {
+      stat = zkSession.setData(path, newByteData, -1);
+    } catch (KeeperException e) {
+      throw new IOException(
+        "Exception while updating trasactional data. ", e);
+    } catch (InterruptedException e) {
+      throw new IOException(
+        "Exception while updating trasactional data. ", e);
+    }
+
+    if (stat != null) {
+      LOG.info("Transaction list stored at " + path + ".");
+    }
+
+  }
+
+  /**
+   * Refresh transactions on a given transaction data path.
+   *
+   * @param path The path to the transaction data.
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  void refreshTransactions(String path) throws IOException {
+    List<FamilyRevision> currentData = getTransactionList(path);
+    List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
+
+    for (FamilyRevision tranx : currentData) {
+      if (tranx.getExpireTimestamp() > getTimeStamp()) {
+        newData.add(tranx);
+      }
+    }
+
+    if (newData.equals(currentData) == false) {
+      List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
+      for (FamilyRevision wtxn : newData) {
+        StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
+          wtxn.getExpireTimestamp());
+        newTxnList.add(newTxn);
+      }
+      StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
+      byte[] newByteData = serialize(wtxnList);
+
+      try {
+        zkSession.setData(path, newByteData, -1);
+      } catch (KeeperException e) {
+        throw new IOException(
+          "Exception while updating trasactional data. ", e);
+      } catch (InterruptedException e) {
+        throw new IOException(
+          "Exception while updating trasactional data. ", e);
+      }
+
+    }
+
+  }
+
+  /**
+   * Delete table znodes.
+   *
+   * @param tableName the hbase table name
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  void deleteZNodes(String tableName) throws IOException {
+    String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir,
+      tableName);
+    deleteRecursively(transactionDataTablePath);
+  }
+
+  void deleteRecursively(String path) throws IOException {
+    try {
+      List<String> children = getSession().getChildren(path, false);
+      if (children.size() != 0) {
+        for (String child : children) {
+          deleteRecursively(path + "/" + child);
         }
-    }
-
-    private class ZKWatcher implements Watcher {
-        public void process(WatchedEvent event) {
-            switch (event.getState()) {
-            case Expired:
-                LOG.info("The client session has expired. Try opening a new "
-                    + "session and connecting again.");
-                zkSession = null;
-                break;
-            default:
+      }
+      getSession().delete(path, -1);
+    } catch (KeeperException e) {
+      throw new IOException(
+        "Exception while deleting path " + path + ".", e);
+    } catch (InterruptedException e) {
+      throw new IOException(
+        "Exception while deleting path " + path + ".", e);
+    }
+  }
+
+  /**
+   * This method serializes a given instance of TBase object.
+   *
+   * @param obj An instance of TBase
+   * @return byte array The serialized data.
+   * @throws IOException
+   */
+  static byte[] serialize(TBase obj) throws IOException {
+    if (obj == null)
+      return new byte[0];
+    try {
+      TSerializer serializer = new TSerializer(
+        new TBinaryProtocol.Factory());
+      byte[] bytes = serializer.serialize(obj);
+      return bytes;
+    } catch (Exception e) {
+      throw new IOException("Serialization error: ", e);
+    }
+  }
+
+
+  /**
+   * This method deserializes the given byte array into the TBase object.
+   *
+   * @param obj An instance of TBase
+   * @param data Output of deserialization.
+   * @throws IOException
+   */
+  static void deserialize(TBase obj, byte[] data) throws IOException {
+    if (data == null || data.length == 0)
+      return;
+    try {
+      TDeserializer deserializer = new TDeserializer(
+        new TBinaryProtocol.Factory());
+      deserializer.deserialize(obj, data);
+    } catch (Exception e) {
+      throw new IOException("Deserialization error: " + e.getMessage(), e);
+    }
+  }
+
+  private class ZKWatcher implements Watcher {
+    public void process(WatchedEvent event) {
+      switch (event.getState()) {
+      case Expired:
+        LOG.info("The client session has expired. Try opening a new "
+          + "session and connecting again.");
+        zkSession = null;
+        break;
+      default:
 
-            }
-        }
+      }
     }
+  }
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/LockListener.java Thu Sep 12 01:21:10 2013
@@ -27,15 +27,15 @@ package org.apache.hcatalog.hbase.snapsh
  *  in the package name.
  */
 public interface LockListener {
-    /**
-     * call back called when the lock
-     * is acquired
-     */
-    public void lockAcquired();
+  /**
+   * call back called when the lock
+   * is acquired
+   */
+  public void lockAcquired();
 
-    /**
-     * call back called when the lock is
-     * released.
-     */
-    public void lockReleased();
+  /**
+   * call back called when the lock is
+   * released.
+   */
+  public void lockReleased();
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/lock/ProtocolSupport.java Thu Sep 12 01:21:10 2013
@@ -40,156 +40,156 @@ import java.util.concurrent.atomic.Atomi
  *  changes in the retry delay, retry count values and package name.
  */
 class ProtocolSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(ProtocolSupport.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ProtocolSupport.class);
 
-    protected final ZooKeeper zookeeper;
-    private AtomicBoolean closed = new AtomicBoolean(false);
-    private long retryDelay = 500L;
-    private int retryCount = 3;
-    private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
-
-    public ProtocolSupport(ZooKeeper zookeeper) {
-        this.zookeeper = zookeeper;
-    }
-
-    /**
-     * Closes this strategy and releases any ZooKeeper resources; but keeps the
-     *  ZooKeeper instance open
-     */
-    public void close() {
-        if (closed.compareAndSet(false, true)) {
-            doClose();
-        }
-    }
-
-    /**
-     * return zookeeper client instance
-     * @return zookeeper client instance
-     */
-    public ZooKeeper getZookeeper() {
-        return zookeeper;
-    }
-
-    /**
-     * return the acl its using
-     * @return the acl.
-     */
-    public List<ACL> getAcl() {
-        return acl;
-    }
-
-    /**
-     * set the acl
-     * @param acl the acl to set to
-     */
-    public void setAcl(List<ACL> acl) {
-        this.acl = acl;
-    }
-
-    /**
-     * get the retry delay in milliseconds
-     * @return the retry delay
-     */
-    public long getRetryDelay() {
-        return retryDelay;
-    }
-
-    /**
-     * Sets the time waited between retry delays
-     * @param retryDelay the retry delay
-     */
-    public void setRetryDelay(long retryDelay) {
-        this.retryDelay = retryDelay;
-    }
-
-    /**
-     * Allow derived classes to perform
-     * some custom closing operations to release resources
-     */
-    protected void doClose() {
-    }
-
-
-    /**
-     * Perform the given operation, retrying if the connection fails
-     * @return object. it needs to be cast to the callee's expected
-     * return type.
-     */
-    protected Object retryOperation(ZooKeeperOperation operation)
-        throws KeeperException, InterruptedException {
-        KeeperException exception = null;
-        for (int i = 0; i < retryCount; i++) {
-            try {
-                return operation.execute();
-            } catch (KeeperException.SessionExpiredException e) {
-                LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e);
-                throw e;
-            } catch (KeeperException.ConnectionLossException e) {
-                if (exception == null) {
-                    exception = e;
-                }
-                LOG.debug("Attempt " + i + " failed with connection loss so " +
-                		"attempting to reconnect: " + e, e);
-                retryDelay(i);
-            }
-        }
-        throw exception;
-    }
-
-    /**
-     * Ensures that the given path exists with no data, the current
-     * ACL and no flags
-     * @param path
-     */
-    protected void ensurePathExists(String path) {
-        ensureExists(path, null, acl, CreateMode.PERSISTENT);
-    }
-
-    /**
-     * Ensures that the given path exists with the given data, ACL and flags
-     * @param path
-     * @param acl
-     * @param flags
-     */
-    protected void ensureExists(final String path, final byte[] data,
-            final List<ACL> acl, final CreateMode flags) {
-        try {
-            retryOperation(new ZooKeeperOperation() {
-                public boolean execute() throws KeeperException, InterruptedException {
-                    Stat stat = zookeeper.exists(path, false);
-                    if (stat != null) {
-                        return true;
-                    }
-                    zookeeper.create(path, data, acl, flags);
-                    return true;
-                }
-            });
-        } catch (KeeperException e) {
-            LOG.warn("Caught: " + e, e);
-        } catch (InterruptedException e) {
-            LOG.warn("Caught: " + e, e);
+  protected final ZooKeeper zookeeper;
+  private AtomicBoolean closed = new AtomicBoolean(false);
+  private long retryDelay = 500L;
+  private int retryCount = 3;
+  private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+  public ProtocolSupport(ZooKeeper zookeeper) {
+    this.zookeeper = zookeeper;
+  }
+
+  /**
+   * Closes this strategy and releases any ZooKeeper resources; but keeps the
+   *  ZooKeeper instance open
+   */
+  public void close() {
+    if (closed.compareAndSet(false, true)) {
+      doClose();
+    }
+  }
+
+  /**
+   * return zookeeper client instance
+   * @return zookeeper client instance
+   */
+  public ZooKeeper getZookeeper() {
+    return zookeeper;
+  }
+
+  /**
+   * return the acl its using
+   * @return the acl.
+   */
+  public List<ACL> getAcl() {
+    return acl;
+  }
+
+  /**
+   * set the acl
+   * @param acl the acl to set to
+   */
+  public void setAcl(List<ACL> acl) {
+    this.acl = acl;
+  }
+
+  /**
+   * get the retry delay in milliseconds
+   * @return the retry delay
+   */
+  public long getRetryDelay() {
+    return retryDelay;
+  }
+
+  /**
+   * Sets the time waited between retry delays
+   * @param retryDelay the retry delay
+   */
+  public void setRetryDelay(long retryDelay) {
+    this.retryDelay = retryDelay;
+  }
+
+  /**
+   * Allow derived classes to perform
+   * some custom closing operations to release resources
+   */
+  protected void doClose() {
+  }
+
+
+  /**
+   * Perform the given operation, retrying if the connection fails
+   * @return object. it needs to be cast to the callee's expected
+   * return type.
+   */
+  protected Object retryOperation(ZooKeeperOperation operation)
+    throws KeeperException, InterruptedException {
+    KeeperException exception = null;
+    for (int i = 0; i < retryCount; i++) {
+      try {
+        return operation.execute();
+      } catch (KeeperException.SessionExpiredException e) {
+        LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e);
+        throw e;
+      } catch (KeeperException.ConnectionLossException e) {
+        if (exception == null) {
+          exception = e;
         }
-    }
-
-    /**
-     * Returns true if this protocol has been closed
-     * @return true if this protocol is closed
-     */
-    protected boolean isClosed() {
-        return closed.get();
-    }
-
-    /**
-     * Performs a retry delay if this is not the first attempt
-     * @param attemptCount the number of the attempts performed so far
-     */
-    protected void retryDelay(int attemptCount) {
-        if (attemptCount > 0) {
-            try {
-                Thread.sleep(attemptCount * retryDelay);
-            } catch (InterruptedException e) {
-                LOG.debug("Failed to sleep: " + e, e);
-            }
+        LOG.debug("Attempt " + i + " failed with connection loss so " +
+                "attempting to reconnect: " + e, e);
+        retryDelay(i);
+      }
+    }
+    throw exception;
+  }
+
+  /**
+   * Ensures that the given path exists with no data, the current
+   * ACL and no flags
+   * @param path
+   */
+  protected void ensurePathExists(String path) {
+    ensureExists(path, null, acl, CreateMode.PERSISTENT);
+  }
+
+  /**
+   * Ensures that the given path exists with the given data, ACL and flags
+   * @param path
+   * @param acl
+   * @param flags
+   */
+  protected void ensureExists(final String path, final byte[] data,
+      final List<ACL> acl, final CreateMode flags) {
+    try {
+      retryOperation(new ZooKeeperOperation() {
+        public boolean execute() throws KeeperException, InterruptedException {
+          Stat stat = zookeeper.exists(path, false);
+          if (stat != null) {
+            return true;
+          }
+          zookeeper.create(path, data, acl, flags);
+          return true;
         }
+      });
+    } catch (KeeperException e) {
+      LOG.warn("Caught: " + e, e);
+    } catch (InterruptedException e) {
+      LOG.warn("Caught: " + e, e);
+    }
+  }
+
+  /**
+   * Returns true if this protocol has been closed
+   * @return true if this protocol is closed
+   */
+  protected boolean isClosed() {
+    return closed.get();
+  }
+
+  /**
+   * Performs a retry delay if this is not the first attempt
+   * @param attemptCount the number of the attempts performed so far
+   */
+  protected void retryDelay(int attemptCount) {
+    if (attemptCount > 0) {
+      try {
+        Thread.sleep(attemptCount * retryDelay);
+      } catch (InterruptedException e) {
+        LOG.debug("Failed to sleep: " + e, e);
+      }
     }
+  }
 }



Mime
View raw message