incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tra...@apache.org
Subject svn commit: r1383152 [22/27] - in /incubator/hcatalog/trunk: ./ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ ...
Date Mon, 10 Sep 2012 23:29:03 GMT
Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java Mon Sep 10 23:28:55 2012
@@ -64,12 +64,12 @@ class ImportSequenceFile {
     private final static String IMPORTER_WORK_DIR = "_IMPORTER_MR_WORK_DIR";
 
 
-    private static class SequenceFileImporter  extends Mapper<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> {
+    private static class SequenceFileImporter extends Mapper<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> {
 
         @Override
         public void map(ImmutableBytesWritable rowKey, Put value,
                         Context context)
-                throws IOException {
+            throws IOException {
             try {
                 context.write(new ImmutableBytesWritable(value.getRow()), value);
             } catch (InterruptedException e) {
@@ -112,7 +112,7 @@ class ImportSequenceFile {
                 @Override
                 public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
                     try {
-                        baseOutputCommitter.abortJob(jobContext,state);
+                        baseOutputCommitter.abortJob(jobContext, state);
                     } finally {
                         cleanupScratch(jobContext);
                     }
@@ -124,13 +124,13 @@ class ImportSequenceFile {
                         baseOutputCommitter.commitJob(jobContext);
                         Configuration conf = jobContext.getConfiguration();
                         try {
-                        //import hfiles
-                        new LoadIncrementalHFiles(conf)
+                            //import hfiles
+                            new LoadIncrementalHFiles(conf)
                                 .doBulkLoad(HFileOutputFormat.getOutputPath(jobContext),
-                                                   new HTable(conf,
-                                                                      conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY)));
+                                    new HTable(conf,
+                                        conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY)));
                         } catch (Exception e) {
-                        	throw new IOException("BulkLoad failed.", e);
+                            throw new IOException("BulkLoad failed.", e);
                         }
                     } finally {
                         cleanupScratch(jobContext);
@@ -146,16 +146,16 @@ class ImportSequenceFile {
                     }
                 }
 
-                private void cleanupScratch(JobContext context) throws IOException{
+                private void cleanupScratch(JobContext context) throws IOException {
                     FileSystem fs = FileSystem.get(context.getConfiguration());
-                    fs.delete(HFileOutputFormat.getOutputPath(context),true);
+                    fs.delete(HFileOutputFormat.getOutputPath(context), true);
                 }
             };
         }
     }
 
     private static Job createSubmittableJob(Configuration conf, String tableName, Path inputDir, Path scratchDir, boolean localMode)
-            throws IOException {
+        throws IOException {
         Job job = new Job(conf, NAME + "_" + tableName);
         job.setJarByClass(SequenceFileImporter.class);
         FileInputFormat.setInputPaths(job, inputDir);
@@ -172,16 +172,16 @@ class ImportSequenceFile {
         job.setOutputFormatClass(ImporterOutputFormat.class);
 
         //local mode doesn't support symbolic links so we have to manually set the actual path
-        if(localMode) {
+        if (localMode) {
             String partitionFile = null;
-            for(URI uri: DistributedCache.getCacheFiles(job.getConfiguration())) {
-                if(DEFAULT_PATH.equals(uri.getFragment())) {
+            for (URI uri : DistributedCache.getCacheFiles(job.getConfiguration())) {
+                if (DEFAULT_PATH.equals(uri.getFragment())) {
                     partitionFile = uri.toString();
                     break;
                 }
             }
-            partitionFile = partitionFile.substring(0,partitionFile.lastIndexOf("#"));
-            job.getConfiguration().set(TotalOrderPartitioner.PARTITIONER_PATH,partitionFile.toString());
+            partitionFile = partitionFile.substring(0, partitionFile.lastIndexOf("#"));
+            job.getConfiguration().set(TotalOrderPartitioner.PARTITIONER_PATH, partitionFile.toString());
         }
 
         return job;
@@ -190,7 +190,7 @@ class ImportSequenceFile {
     /**
      * Method to run the Importer MapReduce Job. Normally will be called by another MR job
      * during OutputCommitter.commitJob().
-      * @param parentContext JobContext of the parent job
+     * @param parentContext JobContext of the parent job
      * @param tableName name of table to bulk load data into
      * @param InputDir path of SequenceFile formatted data to read
      * @param scratchDir temporary path for the Importer MR job to build the HFiles which will be imported
@@ -199,21 +199,21 @@ class ImportSequenceFile {
     static boolean runJob(JobContext parentContext, String tableName, Path InputDir, Path scratchDir) {
         Configuration parentConf = parentContext.getConfiguration();
         Configuration conf = new Configuration();
-        for(Map.Entry<String,String> el: parentConf) {
-            if(el.getKey().startsWith("hbase."))
-                conf.set(el.getKey(),el.getValue());
-            if(el.getKey().startsWith("mapred.cache.archives"))
-                conf.set(el.getKey(),el.getValue());
+        for (Map.Entry<String, String> el : parentConf) {
+            if (el.getKey().startsWith("hbase."))
+                conf.set(el.getKey(), el.getValue());
+            if (el.getKey().startsWith("mapred.cache.archives"))
+                conf.set(el.getKey(), el.getValue());
         }
 
         //Inherit jar dependencies added to distributed cache loaded by parent job
-        conf.set("mapred.job.classpath.archives",parentConf.get("mapred.job.classpath.archives", ""));
-        conf.set("mapreduce.job.cache.archives.visibilities",parentConf.get("mapreduce.job.cache.archives.visibilities",""));
+        conf.set("mapred.job.classpath.archives", parentConf.get("mapred.job.classpath.archives", ""));
+        conf.set("mapreduce.job.cache.archives.visibilities", parentConf.get("mapreduce.job.cache.archives.visibilities", ""));
 
         //Temporary fix until hbase security is ready
         //We need the written HFile to be world readable so
         //hbase regionserver user has the privileges to perform a hdfs move
-        if(parentConf.getBoolean("hadoop.security.authorization", false)) {
+        if (parentConf.getBoolean("hadoop.security.authorization", false)) {
             FsPermission.setUMask(conf, FsPermission.valueOf("----------"));
         }
 
@@ -225,25 +225,24 @@ class ImportSequenceFile {
         boolean success = false;
         try {
             FileSystem fs = FileSystem.get(parentConf);
-            Path workDir = new Path(new Job(parentConf).getWorkingDirectory(),IMPORTER_WORK_DIR);
-            if(!fs.mkdirs(workDir))
-                throw new IOException("Importer work directory already exists: "+workDir);
+            Path workDir = new Path(new Job(parentConf).getWorkingDirectory(), IMPORTER_WORK_DIR);
+            if (!fs.mkdirs(workDir))
+                throw new IOException("Importer work directory already exists: " + workDir);
             Job job = createSubmittableJob(conf, tableName, InputDir, scratchDir, localMode);
             job.setWorkingDirectory(workDir);
             job.getCredentials().addAll(parentContext.getCredentials());
             success = job.waitForCompletion(true);
             fs.delete(workDir, true);
             //We only cleanup on success because failure might've been caused by existence of target directory
-            if(localMode && success)
-            {
-                new ImporterOutputFormat().getOutputCommitter(org.apache.hadoop.mapred.HCatMapRedUtil.createTaskAttemptContext(conf,new TaskAttemptID())).commitJob(job);
+            if (localMode && success) {
+                new ImporterOutputFormat().getOutputCommitter(org.apache.hadoop.mapred.HCatMapRedUtil.createTaskAttemptContext(conf, new TaskAttemptID())).commitJob(job);
             }
         } catch (InterruptedException e) {
             LOG.error("ImportSequenceFile Failed", e);
         } catch (ClassNotFoundException e) {
-            LOG.error("ImportSequenceFile Failed",e);
+            LOG.error("ImportSequenceFile Failed", e);
         } catch (IOException e) {
-            LOG.error("ImportSequenceFile Failed",e);
+            LOG.error("ImportSequenceFile Failed", e);
         }
         return success;
     }

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java Mon Sep 10 23:28:55 2012
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 package org.apache.hcatalog.hbase.snapshot;
+
 import java.io.IOException;
 import java.nio.charset.Charset;
 
@@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory;
 /**
  * This class generates revision id's for transactions.
  */
-class IDGenerator implements LockListener{
+class IDGenerator implements LockListener {
 
     private ZooKeeper zookeeper;
     private String zNodeDataLoc;
@@ -41,7 +42,7 @@ class IDGenerator implements LockListene
     private static final Logger LOG = LoggerFactory.getLogger(IDGenerator.class);
 
     IDGenerator(ZooKeeper zookeeper, String tableName, String idGenNode)
-            throws IOException {
+        throws IOException {
         this.zookeeper = zookeeper;
         this.zNodeDataLoc = idGenNode;
         this.zNodeLockBasePath = PathUtil.getLockManagementNode(idGenNode);
@@ -53,7 +54,7 @@ class IDGenerator implements LockListene
      * @return revision ID
      * @throws IOException
      */
-    public long obtainID() throws IOException{
+    public long obtainID() throws IOException {
         WriteLock wLock = new WriteLock(zookeeper, zNodeLockBasePath, Ids.OPEN_ACL_UNSAFE);
         wLock.setLockListener(this);
         try {
@@ -62,7 +63,7 @@ class IDGenerator implements LockListene
                 //TO DO : Let this request queue up and try obtaining lock.
                 throw new IOException("Unable to obtain lock to obtain id.");
             } else {
-                    id = incrementAndReadCounter();
+                id = incrementAndReadCounter();
             }
         } catch (KeeperException e) {
             LOG.warn("Exception while obtaining lock for ID.", e);
@@ -82,34 +83,34 @@ class IDGenerator implements LockListene
      * @return revision ID
      * @throws IOException
      */
-    public long readID() throws IOException{
+    public long readID() throws IOException {
         long curId;
         try {
             Stat stat = new Stat();
             byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
-            curId = Long.parseLong(new String(data,Charset.forName("UTF-8")));
+            curId = Long.parseLong(new String(data, Charset.forName("UTF-8")));
         } catch (KeeperException e) {
             LOG.warn("Exception while reading current revision id.", e);
             throw new IOException("Exception while reading current revision id.", e);
         } catch (InterruptedException e) {
             LOG.warn("Exception while reading current revision id.", e);
-            throw new IOException("Exception while reading current revision id.",e);
+            throw new IOException("Exception while reading current revision id.", e);
         }
 
         return curId;
     }
 
 
-    private long incrementAndReadCounter() throws IOException{
+    private long incrementAndReadCounter() throws IOException {
 
         long curId, usedId;
         try {
             Stat stat = new Stat();
             byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
-            usedId = Long.parseLong((new String(data,Charset.forName("UTF-8"))));
-            curId = usedId +1;
+            usedId = Long.parseLong((new String(data, Charset.forName("UTF-8"))));
+            curId = usedId + 1;
             String lastUsedID = String.valueOf(curId);
-            zookeeper.setData(this.zNodeDataLoc, lastUsedID.getBytes(Charset.forName("UTF-8")), -1 );
+            zookeeper.setData(this.zNodeDataLoc, lastUsedID.getBytes(Charset.forName("UTF-8")), -1);
 
         } catch (KeeperException e) {
             LOG.warn("Exception while incrementing revision id.", e);

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java Mon Sep 10 23:28:55 2012
@@ -33,10 +33,10 @@ package org.apache.hcatalog.hbase.snapsh
  * baseDir/TrasactionBasePath/TableB/columnFamily-1/abortedTxns
 
  */
-public class PathUtil{
+public class PathUtil {
 
-    static final String      DATA_DIR = "/data";
-    static final String      CLOCK_NODE   = "/clock";
+    static final String DATA_DIR = "/data";
+    static final String CLOCK_NODE = "/clock";
 
     /**
      * This method returns the data path associated with the currently
@@ -47,10 +47,10 @@ public class PathUtil{
      * @return The path of the running transactions data.
      */
     static String getRunningTxnInfoPath(String baseDir, String tableName,
-            String columnFamily) {
+                                        String columnFamily) {
         String txnBasePath = getTransactionBasePath(baseDir);
         String path = txnBasePath + "/" + tableName + "/" + columnFamily
-                + "/runningTxns";
+            + "/runningTxns";
         return path;
     }
 
@@ -63,10 +63,10 @@ public class PathUtil{
      * @return The path of the aborted transactions data.
      */
     static String getAbortInformationPath(String baseDir, String tableName,
-            String columnFamily) {
+                                          String columnFamily) {
         String txnBasePath = getTransactionBasePath(baseDir);
         String path = txnBasePath + "/" + tableName + "/" + columnFamily
-                + "/abortData";
+            + "/abortData";
         return path;
     }
 
@@ -83,13 +83,13 @@ public class PathUtil{
         return revisionIDNode;
     }
 
-   /**
-    * Gets the lock management node for any znode that needs to be locked.
-    *
-    * @param path the path of the znode.
-    * @return the lock management node path.
-    */
-   static String getLockManagementNode(String path) {
+    /**
+     * Gets the lock management node for any znode that needs to be locked.
+     *
+     * @param path the path of the znode.
+     * @return the lock management node path.
+     */
+    static String getLockManagementNode(String path) {
         String lockNode = path + "_locknode_";
         return lockNode;
     }
@@ -112,7 +112,7 @@ public class PathUtil{
      * @param tableName the table name
      * @return the txn data path for the table.
      */
-    static String getTxnDataPath(String baseDir, String tableName){
+    static String getTxnDataPath(String baseDir, String tableName) {
         String txnBasePath = getTransactionBasePath(baseDir);
         String path = txnBasePath + "/" + tableName;
         return path;

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RMConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RMConstants.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RMConstants.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RMConstants.java Mon Sep 10 23:28:55 2012
@@ -19,11 +19,11 @@
 package org.apache.hcatalog.hbase.snapshot;
 
 public class RMConstants {
-  public static final String REVISION_MGR_ENDPOINT_IMPL_CLASS = "revision.manager.endpoint.impl.class";
+    public static final String REVISION_MGR_ENDPOINT_IMPL_CLASS = "revision.manager.endpoint.impl.class";
 
-  public static final String WRITE_TRANSACTION_TIMEOUT = "revision.manager.writeTxn.timeout";
+    public static final String WRITE_TRANSACTION_TIMEOUT = "revision.manager.writeTxn.timeout";
 
-  public static final String ZOOKEEPER_HOSTLIST = "revision.manager.zk.hostList";
+    public static final String ZOOKEEPER_HOSTLIST = "revision.manager.zk.hostList";
 
-  public static final String ZOOKEEPER_DATADIR = "revision.manager.zk.dataDir";
+    public static final String ZOOKEEPER_DATADIR = "revision.manager.zk.dataDir";
 }

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Mon Sep 10 23:28:55 2012
@@ -73,7 +73,7 @@ public interface RevisionManager {
      * @throws IOException
      */
     public Transaction beginWriteTransaction(String table, List<String> families)
-            throws IOException;
+        throws IOException;
 
     /**
      * Start the write transaction.
@@ -85,7 +85,7 @@ public interface RevisionManager {
      * @throws IOException
      */
     public Transaction beginWriteTransaction(String table,
-            List<String> families, long keepAlive) throws IOException;
+                                             List<String> families, long keepAlive) throws IOException;
 
     /**
      * Commit the write transaction.
@@ -94,7 +94,7 @@ public interface RevisionManager {
      * @throws IOException
      */
     public void commitWriteTransaction(Transaction transaction)
-            throws IOException;
+        throws IOException;
 
     /**
      * Abort the write transaction.
@@ -103,7 +103,7 @@ public interface RevisionManager {
      * @throws IOException
      */
     public void abortWriteTransaction(Transaction transaction)
-            throws IOException;
+        throws IOException;
 
     /**
      * Get the list of aborted Transactions for a column family
@@ -114,7 +114,7 @@ public interface RevisionManager {
      * @throws java.io.IOException
      */
     public List<FamilyRevision> getAbortedWriteTransactions(String table,
-        String columnFamily) throws IOException;
+                                                            String columnFamily) throws IOException;
 
     /**
      * Create the latest snapshot of the table.
@@ -134,7 +134,7 @@ public interface RevisionManager {
      * @throws IOException
      */
     public TableSnapshot createSnapshot(String tableName, long revision)
-            throws IOException;
+        throws IOException;
 
     /**
      * Extends the expiration of a transaction by the time indicated by keep alive.

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerConfiguration.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerConfiguration.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerConfiguration.java Mon Sep 10 23:28:55 2012
@@ -24,36 +24,35 @@ import org.apache.hadoop.hbase.HBaseConf
 public class RevisionManagerConfiguration {
 
 
+    public static Configuration addResources(Configuration conf) {
+        conf.addDefaultResource("revision-manager-default.xml");
+        conf.addResource("revision-manager-site.xml");
+        return conf;
+    }
 
-  public static Configuration addResources(Configuration conf) {
-    conf.addDefaultResource("revision-manager-default.xml");
-    conf.addResource("revision-manager-site.xml");
-    return conf;
-  }
+    /**
+     * Creates a Configuration with Revision Manager resources
+     * @return a Configuration with Revision Manager resources
+     */
+    public static Configuration create() {
+        Configuration conf = new Configuration();
+        return addResources(conf);
+    }
 
-  /**
-   * Creates a Configuration with Revision Manager resources
-   * @return a Configuration with Revision Manager resources
-   */
-  public static Configuration create() {
-    Configuration conf = new Configuration();
-    return addResources(conf);
-  }
-
-  /**
-   * Creates a clone of passed configuration.
-   * @param that Configuration to clone.
-   * @return a Configuration created with the revision-manager-*.xml files plus
-   * the given configuration.
-   */
-  public static Configuration create(final Configuration that) {
-    Configuration conf = create();
-    //we need to merge things instead of doing new Configuration(that)
-    //because of a bug in Configuration wherein the config
-    //set on the MR fronted will get loaded on the backend as resouce called job.xml
-    //hence adding resources on the backed could potentially overwrite properties
-    //set on the frontend which we shouldn't be doing here
-    HBaseConfiguration.merge(conf, that);
-    return conf;
-  }
+    /**
+     * Creates a clone of passed configuration.
+     * @param that Configuration to clone.
+     * @return a Configuration created with the revision-manager-*.xml files plus
+     * the given configuration.
+     */
+    public static Configuration create(final Configuration that) {
+        Configuration conf = create();
+        //we need to merge things instead of doing new Configuration(that)
+        //because of a bug in Configuration wherein the config
+        //set on the MR fronted will get loaded on the backend as resouce called job.xml
+        //hence adding resources on the backed could potentially overwrite properties
+        //set on the frontend which we shouldn't be doing here
+        HBaseConfiguration.merge(conf, that);
+        return conf;
+    }
 }

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java Mon Sep 10 23:28:55 2012
@@ -35,106 +35,106 @@ import org.slf4j.LoggerFactory;
  */
 public class RevisionManagerEndpoint extends BaseEndpointCoprocessor implements RevisionManagerProtocol {
 
-  private static final Logger LOGGER =
-		      LoggerFactory.getLogger(RevisionManagerEndpoint.class.getName());
-  
-  private RevisionManager rmImpl = null;
-
-  @Override
-  public void start(CoprocessorEnvironment env) {
-    super.start(env);
-    try {
-      Configuration conf = RevisionManagerConfiguration.create(env.getConfiguration());
-      String className = conf.get(RMConstants.REVISION_MGR_ENDPOINT_IMPL_CLASS,
-          ZKBasedRevisionManager.class.getName());
-      LOGGER.debug("Using Revision Manager implementation: {}",className);
-      rmImpl = RevisionManagerFactory.getOpenedRevisionManager(className, conf);
-    } catch (IOException e) {
-      LOGGER.error("Failed to initialize revision manager", e);
-    }
-  }
-
-  @Override
-  public void stop(CoprocessorEnvironment env) {
-    if (rmImpl != null) {
-      try {
-        rmImpl.close();
-      } catch (IOException e) {
-        LOGGER.warn("Error closing revision manager.", e);
-      }
-    }
-    super.stop(env);
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    // do nothing, HBase controls life cycle
-  }
-
-  @Override
-  public void open() throws IOException {
-    // do nothing, HBase controls life cycle
-  }
-
-  @Override
-  public void close() throws IOException {
-    // do nothing, HBase controls life cycle
-  }
-
-  @Override
-  public void createTable(String table, List<String> columnFamilies) throws IOException {
-    rmImpl.createTable(table, columnFamilies);
-  }
-
-  @Override
-  public void dropTable(String table) throws IOException {
-    rmImpl.dropTable(table);
-  }
-
-  @Override
-  public Transaction beginWriteTransaction(String table, List<String> families)
-      throws IOException {
-    return rmImpl.beginWriteTransaction(table, families);
-  }
-
-  @Override
-  public Transaction beginWriteTransaction(String table,
-      List<String> families, long keepAlive) throws IOException {
-    return rmImpl.beginWriteTransaction(table, families, keepAlive);
-  }
-
-  @Override
-  public void commitWriteTransaction(Transaction transaction)
-      throws IOException {
-    rmImpl.commitWriteTransaction(transaction);
-  }
-
-  @Override
-  public void abortWriteTransaction(Transaction transaction)
-      throws IOException {
-    rmImpl.abortWriteTransaction(transaction);
-  }
-
-  @Override
-  public TableSnapshot createSnapshot(String tableName) throws IOException {
-    return rmImpl.createSnapshot(tableName);
-  }
-
-  @Override
-  public TableSnapshot createSnapshot(String tableName, long revision)
-      throws IOException {
-    return rmImpl.createSnapshot(tableName, revision);
-  }
-
-  @Override
-  public void keepAlive(Transaction transaction) throws IOException {
-    rmImpl.keepAlive(transaction);
-  }
-
-  @Override
-  public List<FamilyRevision> getAbortedWriteTransactions(String table,
-      String columnFamily) throws IOException {
-    return rmImpl.getAbortedWriteTransactions(table, columnFamily);
-  }
+    private static final Logger LOGGER =
+        LoggerFactory.getLogger(RevisionManagerEndpoint.class.getName());
+
+    private RevisionManager rmImpl = null;
+
+    @Override
+    public void start(CoprocessorEnvironment env) {
+        super.start(env);
+        try {
+            Configuration conf = RevisionManagerConfiguration.create(env.getConfiguration());
+            String className = conf.get(RMConstants.REVISION_MGR_ENDPOINT_IMPL_CLASS,
+                ZKBasedRevisionManager.class.getName());
+            LOGGER.debug("Using Revision Manager implementation: {}", className);
+            rmImpl = RevisionManagerFactory.getOpenedRevisionManager(className, conf);
+        } catch (IOException e) {
+            LOGGER.error("Failed to initialize revision manager", e);
+        }
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment env) {
+        if (rmImpl != null) {
+            try {
+                rmImpl.close();
+            } catch (IOException e) {
+                LOGGER.warn("Error closing revision manager.", e);
+            }
+        }
+        super.stop(env);
+    }
+
+    @Override
+    public void initialize(Configuration conf) {
+        // do nothing, HBase controls life cycle
+    }
+
+    @Override
+    public void open() throws IOException {
+        // do nothing, HBase controls life cycle
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing, HBase controls life cycle
+    }
+
+    @Override
+    public void createTable(String table, List<String> columnFamilies) throws IOException {
+        rmImpl.createTable(table, columnFamilies);
+    }
+
+    @Override
+    public void dropTable(String table) throws IOException {
+        rmImpl.dropTable(table);
+    }
+
+    @Override
+    public Transaction beginWriteTransaction(String table, List<String> families)
+        throws IOException {
+        return rmImpl.beginWriteTransaction(table, families);
+    }
+
+    @Override
+    public Transaction beginWriteTransaction(String table,
+                                             List<String> families, long keepAlive) throws IOException {
+        return rmImpl.beginWriteTransaction(table, families, keepAlive);
+    }
+
+    @Override
+    public void commitWriteTransaction(Transaction transaction)
+        throws IOException {
+        rmImpl.commitWriteTransaction(transaction);
+    }
+
+    @Override
+    public void abortWriteTransaction(Transaction transaction)
+        throws IOException {
+        rmImpl.abortWriteTransaction(transaction);
+    }
+
+    @Override
+    public TableSnapshot createSnapshot(String tableName) throws IOException {
+        return rmImpl.createSnapshot(tableName);
+    }
+
+    @Override
+    public TableSnapshot createSnapshot(String tableName, long revision)
+        throws IOException {
+        return rmImpl.createSnapshot(tableName, revision);
+    }
+
+    @Override
+    public void keepAlive(Transaction transaction) throws IOException {
+        rmImpl.keepAlive(transaction);
+    }
+
+    @Override
+    public List<FamilyRevision> getAbortedWriteTransactions(String table,
+                                                            String columnFamily) throws IOException {
+        return rmImpl.getAbortedWriteTransactions(table, columnFamily);
+    }
 
 }

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java Mon Sep 10 23:28:55 2012
@@ -33,92 +33,92 @@ import org.apache.hadoop.hbase.util.Byte
  */
 public class RevisionManagerEndpointClient implements RevisionManager, Configurable {
 
-  private Configuration conf = null;
-  private RevisionManager rmProxy;
+    private Configuration conf = null;
+    private RevisionManager rmProxy;
 
-  @Override
-  public Configuration getConf() {
-    return this.conf;
-  }
-
-  @Override
-  public void setConf(Configuration arg0) {
-    this.conf = arg0;
-  }
-
-  @Override
-  public void initialize(Configuration conf) {
-    // do nothing
-  }
-
-  @Override
-  public void open() throws IOException {
-    // clone to adjust RPC settings unique to proxy
-    Configuration clonedConf = new Configuration(conf);
-    // conf.set("hbase.ipc.client.connect.max.retries", "0");
-    // conf.setInt(HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, 1);
-    clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // do not retry RPC
-    HTable table = new HTable(clonedConf, HConstants.ROOT_TABLE_NAME);
-    rmProxy = table.coprocessorProxy(RevisionManagerProtocol.class,
-        Bytes.toBytes("anyRow"));
-    rmProxy.open();
-  }
-
-  @Override
-  public void close() throws IOException {
-    rmProxy.close();
-  }
-
-  @Override
-  public void createTable(String table, List<String> columnFamilies) throws IOException {
-    rmProxy.createTable(table, columnFamilies);
-  }
-
-  @Override
-  public void dropTable(String table) throws IOException {
-    rmProxy.dropTable(table);
-  }
-
-  @Override
-  public Transaction beginWriteTransaction(String table, List<String> families) throws IOException {
-    return rmProxy.beginWriteTransaction(table, families);
-  }
-
-  @Override
-  public Transaction beginWriteTransaction(String table, List<String> families, long keepAlive)
-      throws IOException {
-    return rmProxy.beginWriteTransaction(table, families, keepAlive);
-  }
-
-  @Override
-  public void commitWriteTransaction(Transaction transaction) throws IOException {
-    rmProxy.commitWriteTransaction(transaction);
-  }
-
-  @Override
-  public void abortWriteTransaction(Transaction transaction) throws IOException {
-    rmProxy.abortWriteTransaction(transaction);
-  }
-
-  @Override
-  public List<FamilyRevision> getAbortedWriteTransactions(String table, String columnFamily)
-      throws IOException {
-    return rmProxy.getAbortedWriteTransactions(table, columnFamily);
-  }
-
-  @Override
-  public TableSnapshot createSnapshot(String tableName) throws IOException {
-    return rmProxy.createSnapshot(tableName);
-  }
-
-  @Override
-  public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
-    return rmProxy.createSnapshot(tableName, revision);
-  }
-
-  @Override
-  public void keepAlive(Transaction transaction) throws IOException {
-    rmProxy.keepAlive(transaction);
-  }
+    @Override
+    public Configuration getConf() {
+        return this.conf;
+    }
+
+    @Override
+    public void setConf(Configuration arg0) {
+        this.conf = arg0;
+    }
+
+    @Override
+    public void initialize(Configuration conf) {
+        // do nothing
+    }
+
+    @Override
+    public void open() throws IOException {
+        // clone to adjust RPC settings unique to proxy
+        Configuration clonedConf = new Configuration(conf);
+        // conf.set("hbase.ipc.client.connect.max.retries", "0");
+        // conf.setInt(HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, 1);
+        clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // do not retry RPC
+        HTable table = new HTable(clonedConf, HConstants.ROOT_TABLE_NAME);
+        rmProxy = table.coprocessorProxy(RevisionManagerProtocol.class,
+            Bytes.toBytes("anyRow"));
+        rmProxy.open();
+    }
+
+    @Override
+    public void close() throws IOException {
+        rmProxy.close();
+    }
+
+    @Override
+    public void createTable(String table, List<String> columnFamilies) throws IOException {
+        rmProxy.createTable(table, columnFamilies);
+    }
+
+    @Override
+    public void dropTable(String table) throws IOException {
+        rmProxy.dropTable(table);
+    }
+
+    @Override
+    public Transaction beginWriteTransaction(String table, List<String> families) throws IOException {
+        return rmProxy.beginWriteTransaction(table, families);
+    }
+
+    @Override
+    public Transaction beginWriteTransaction(String table, List<String> families, long keepAlive)
+        throws IOException {
+        return rmProxy.beginWriteTransaction(table, families, keepAlive);
+    }
+
+    @Override
+    public void commitWriteTransaction(Transaction transaction) throws IOException {
+        rmProxy.commitWriteTransaction(transaction);
+    }
+
+    @Override
+    public void abortWriteTransaction(Transaction transaction) throws IOException {
+        rmProxy.abortWriteTransaction(transaction);
+    }
+
+    @Override
+    public List<FamilyRevision> getAbortedWriteTransactions(String table, String columnFamily)
+        throws IOException {
+        return rmProxy.getAbortedWriteTransactions(table, columnFamily);
+    }
+
+    @Override
+    public TableSnapshot createSnapshot(String tableName) throws IOException {
+        return rmProxy.createSnapshot(tableName);
+    }
+
+    @Override
+    public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
+        return rmProxy.createSnapshot(tableName, revision);
+    }
+
+    @Override
+    public void keepAlive(Transaction transaction) throws IOException {
+        rmProxy.keepAlive(transaction);
+    }
 
 }

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java Mon Sep 10 23:28:55 2012
@@ -28,77 +28,77 @@ import org.apache.hadoop.conf.Configurat
  */
 public class RevisionManagerFactory {
 
-  public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class";
+    public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class";
 
-  /**
-   * Gets an instance of revision manager.
-   *
-   * @param conf The configuration required to created the revision manager.
-   * @return the revision manager An instance of revision manager.
-   * @throws IOException Signals that an I/O exception has occurred.
-   */
-   private static RevisionManager getRevisionManager(String className, Configuration conf) throws IOException{
+    /**
+     * Gets an instance of revision manager.
+     *
+     * @param conf The configuration required to created the revision manager.
+     * @return the revision manager An instance of revision manager.
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    private static RevisionManager getRevisionManager(String className, Configuration conf) throws IOException {
 
         RevisionManager revisionMgr;
         ClassLoader classLoader = Thread.currentThread()
-                .getContextClassLoader();
+            .getContextClassLoader();
         if (classLoader == null) {
             classLoader = RevisionManagerFactory.class.getClassLoader();
         }
         try {
             Class<? extends RevisionManager> revisionMgrClass = Class
-                    .forName(className, true , classLoader).asSubclass(RevisionManager.class);
+                .forName(className, true, classLoader).asSubclass(RevisionManager.class);
             revisionMgr = (RevisionManager) revisionMgrClass.newInstance();
             revisionMgr.initialize(conf);
         } catch (ClassNotFoundException e) {
             throw new IOException(
-                    "The implementation class of revision manager not found.",
-                    e);
+                "The implementation class of revision manager not found.",
+                e);
         } catch (InstantiationException e) {
             throw new IOException(
-                    "Exception encountered during instantiating revision manager implementation.",
-                    e);
+                "Exception encountered during instantiating revision manager implementation.",
+                e);
         } catch (IllegalAccessException e) {
             throw new IOException(
-                    "IllegalAccessException encountered during instantiating revision manager implementation.",
-                    e);
+                "IllegalAccessException encountered during instantiating revision manager implementation.",
+                e);
         } catch (IllegalArgumentException e) {
             throw new IOException(
-                    "IllegalArgumentException encountered during instantiating revision manager implementation.",
-                    e);
+                "IllegalArgumentException encountered during instantiating revision manager implementation.",
+                e);
         }
         return revisionMgr;
     }
 
-   /**
-    * Internally used by endpoint implementation to instantiate from different configuration setting.
-    * @param className
-    * @param conf
-    * @return
-    * @throws IOException
-    */
-   static RevisionManager getOpenedRevisionManager(String className, Configuration conf) throws IOException {
-
-       RevisionManager revisionMgr = RevisionManagerFactory.getRevisionManager(className, conf);
-       if (revisionMgr instanceof Configurable) {
-         ((Configurable)revisionMgr).setConf(conf);
-       }
-       revisionMgr.open();
-       return revisionMgr;
-   }
-
-   /**
-    * Gets an instance of revision manager which is opened.
-    * The revision manager implementation can be specified as {@link #REVISION_MGR_IMPL_CLASS},
-    * default is {@link ZKBasedRevisionManager}.
-    * @param conf revision manager configuration
-    * @return RevisionManager An instance of revision manager.
-    * @throws IOException
-    */
-   public static RevisionManager getOpenedRevisionManager(Configuration conf) throws IOException {
-     String className = conf.get(RevisionManagerFactory.REVISION_MGR_IMPL_CLASS,
-         ZKBasedRevisionManager.class.getName());
-     return getOpenedRevisionManager(className, conf);
-   }
+    /**
+     * Internally used by endpoint implementation to instantiate from different configuration setting.
+     * @param className
+     * @param conf
+     * @return
+     * @throws IOException
+     */
+    static RevisionManager getOpenedRevisionManager(String className, Configuration conf) throws IOException {
+
+        RevisionManager revisionMgr = RevisionManagerFactory.getRevisionManager(className, conf);
+        if (revisionMgr instanceof Configurable) {
+            ((Configurable) revisionMgr).setConf(conf);
+        }
+        revisionMgr.open();
+        return revisionMgr;
+    }
+
+    /**
+     * Gets an instance of revision manager which is opened.
+     * The revision manager implementation can be specified as {@link #REVISION_MGR_IMPL_CLASS},
+     * default is {@link ZKBasedRevisionManager}.
+     * @param conf revision manager configuration
+     * @return RevisionManager An instance of revision manager.
+     * @throws IOException
+     */
+    public static RevisionManager getOpenedRevisionManager(Configuration conf) throws IOException {
+        String className = conf.get(RevisionManagerFactory.REVISION_MGR_IMPL_CLASS,
+            ZKBasedRevisionManager.class.getName());
+        return getOpenedRevisionManager(className, conf);
+    }
 
 }

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java Mon Sep 10 23:28:55 2012
@@ -37,7 +37,7 @@ public class TableSnapshot implements Se
     public TableSnapshot(String name, Map<String, Long> cfRevMap, long latestRevision) {
         this.name = name;
         if (cfRevMap == null) {
-          throw new IllegalArgumentException("revision map cannot be null");
+            throw new IllegalArgumentException("revision map cannot be null");
         }
         this.cfRevisionMap = cfRevMap;
         this.latestRevision = latestRevision;

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java Mon Sep 10 23:28:55 2012
@@ -41,35 +41,35 @@ public class Transaction implements Seri
         this.revision = revision;
     }
 
-   /**
+    /**
      * @return The revision number associated with a transaction.
      */
-   public long getRevisionNumber(){
-       return this.revision;
-   }
+    public long getRevisionNumber() {
+        return this.revision;
+    }
 
     /**
      * @return The table name associated with a transaction.
      */
-   public String getTableName() {
+    public String getTableName() {
         return tableName;
     }
 
     /**
      * @return The column families associated with a transaction.
      */
-   public List<String> getColumnFamilies() {
+    public List<String> getColumnFamilies() {
         return columnFamilies;
     }
 
     /**
      * @return The expire timestamp associated with a transaction.
      */
-   long getTransactionExpireTimeStamp(){
+    long getTransactionExpireTimeStamp() {
         return this.timeStamp + this.keepAlive;
     }
 
-    void setKeepAlive(long seconds){
+    void setKeepAlive(long seconds) {
         this.keepAlive = seconds;
     }
 
@@ -78,7 +78,7 @@ public class Transaction implements Seri
      *
      * @return long  The keep alive value for the transaction.
      */
-    public long getKeepAliveValue(){
+    public long getKeepAliveValue() {
         return this.keepAlive;
     }
 
@@ -87,15 +87,15 @@ public class Transaction implements Seri
      *
      * @return FamilyRevision An instance of FamilyRevision associated with the transaction.
      */
-    FamilyRevision getFamilyRevisionInfo(){
+    FamilyRevision getFamilyRevisionInfo() {
         return new FamilyRevision(revision, getTransactionExpireTimeStamp());
     }
 
-   /**
-    * Keep alive transaction. This methods extends the expire timestamp of a
-    * transaction by the "keep alive" amount.
-    */
-   void keepAliveTransaction(){
+    /**
+     * Keep alive transaction. This methods extends the expire timestamp of a
+     * transaction by the "keep alive" amount.
+     */
+    void keepAliveTransaction() {
         this.timeStamp = this.timeStamp + this.keepAlive;
     }
 

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Mon Sep 10 23:28:55 2012
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The service for providing revision management to Hbase tables.
  */
-public class ZKBasedRevisionManager implements RevisionManager{
+public class ZKBasedRevisionManager implements RevisionManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(ZKBasedRevisionManager.class);
     private String zkHostList;
@@ -51,19 +51,19 @@ public class ZKBasedRevisionManager impl
     public void initialize(Configuration conf) {
         conf = new Configuration(conf);
         if (conf.get(RMConstants.ZOOKEEPER_HOSTLIST) == null) {
-           String zkHostList = conf.get(HConstants.ZOOKEEPER_QUORUM);
-           int port = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
-                   HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-           String[] splits = zkHostList.split(",");
-           StringBuffer sb = new StringBuffer();
-           for (String split : splits) {
-               sb.append(split);
-               sb.append(':');
-               sb.append(port);
-               sb.append(',');
-           }
-           sb.deleteCharAt(sb.length() - 1);
-           conf.set(RMConstants.ZOOKEEPER_HOSTLIST, sb.toString());
+            String zkHostList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+            int port = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
+                HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+            String[] splits = zkHostList.split(",");
+            StringBuffer sb = new StringBuffer();
+            for (String split : splits) {
+                sb.append(split);
+                sb.append(':');
+                sb.append(port);
+                sb.append(',');
+            }
+            sb.deleteCharAt(sb.length() - 1);
+            conf.set(RMConstants.ZOOKEEPER_HOSTLIST, sb.toString());
         }
         this.zkHostList = conf.get(RMConstants.ZOOKEEPER_HOSTLIST);
         this.baseDir = conf.get(RMConstants.ZOOKEEPER_DATADIR);
@@ -91,11 +91,11 @@ public class ZKBasedRevisionManager impl
     private void checkInputParams(String table, List<String> families) {
         if (table == null) {
             throw new IllegalArgumentException(
-                    "The table name must be specified for reading.");
+                "The table name must be specified for reading.");
         }
         if (families == null || families.isEmpty()) {
             throw new IllegalArgumentException(
-                    "At least one column family should be specified for reading.");
+                "At least one column family should be specified for reading.");
         }
     }
 
@@ -118,14 +118,14 @@ public class ZKBasedRevisionManager impl
      * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List, long)
      */
     public Transaction beginWriteTransaction(String table,
-            List<String> families, long keepAlive) throws IOException {
+                                             List<String> families, long keepAlive) throws IOException {
 
         checkInputParams(table, families);
         zkUtil.setUpZnodesForTable(table, families);
         long nextId = zkUtil.nextId(table);
         long expireTimestamp = zkUtil.getTimeStamp();
         Transaction transaction = new Transaction(table, families, nextId,
-                expireTimestamp);
+            expireTimestamp);
         if (keepAlive != -1) {
             transaction.setKeepAlive(keepAlive);
         } else {
@@ -135,32 +135,31 @@ public class ZKBasedRevisionManager impl
         refreshTransactionList(transaction.getTableName());
         String lockPath = prepareLockNode(table);
         WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
-                Ids.OPEN_ACL_UNSAFE);
+            Ids.OPEN_ACL_UNSAFE);
         RMLockListener myLockListener = new RMLockListener();
         wLock.setLockListener(myLockListener);
         try {
             boolean lockGrabbed = wLock.lock();
             if (lockGrabbed == false) {
-              //TO DO : Let this request queue up and try obtaining lock.
+                //TO DO : Let this request queue up and try obtaining lock.
                 throw new IOException(
-                        "Unable to obtain lock while beginning transaction. "
-                                + transaction.toString());
+                    "Unable to obtain lock while beginning transaction. "
+                        + transaction.toString());
             } else {
                 List<String> colFamilies = transaction.getColumnFamilies();
                 FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
                 for (String cfamily : colFamilies) {
                     String path = PathUtil.getRunningTxnInfoPath(
-                            baseDir, table, cfamily);
+                        baseDir, table, cfamily);
                     zkUtil.updateData(path, revisionData,
-                            ZKUtil.UpdateMode.APPEND);
+                        ZKUtil.UpdateMode.APPEND);
                 }
             }
         } catch (KeeperException e) {
             throw new IOException("Exception while obtaining lock.", e);
         } catch (InterruptedException e) {
             throw new IOException("Exception while obtaining lock.", e);
-        }
-        finally {
+        } finally {
             wLock.unlock();
         }
 
@@ -174,7 +173,7 @@ public class ZKBasedRevisionManager impl
      * @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List)
      */
     public Transaction beginWriteTransaction(String table, List<String> families)
-            throws IOException {
+        throws IOException {
         return beginWriteTransaction(table, families, -1);
     }
 
@@ -188,25 +187,25 @@ public class ZKBasedRevisionManager impl
 
         String lockPath = prepareLockNode(transaction.getTableName());
         WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
-                Ids.OPEN_ACL_UNSAFE);
+            Ids.OPEN_ACL_UNSAFE);
         RMLockListener myLockListener = new RMLockListener();
         wLock.setLockListener(myLockListener);
         try {
             boolean lockGrabbed = wLock.lock();
             if (lockGrabbed == false) {
-              //TO DO : Let this request queue up and try obtaining lock.
+                //TO DO : Let this request queue up and try obtaining lock.
                 throw new IOException(
-                        "Unable to obtain lock while commiting transaction. "
-                                + transaction.toString());
+                    "Unable to obtain lock while commiting transaction. "
+                        + transaction.toString());
             } else {
                 String tableName = transaction.getTableName();
                 List<String> colFamilies = transaction.getColumnFamilies();
                 FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
                 for (String cfamily : colFamilies) {
                     String path = PathUtil.getRunningTxnInfoPath(
-                            baseDir, tableName, cfamily);
+                        baseDir, tableName, cfamily);
                     zkUtil.updateData(path, revisionData,
-                            ZKUtil.UpdateMode.REMOVE);
+                        ZKUtil.UpdateMode.REMOVE);
                 }
 
             }
@@ -214,8 +213,7 @@ public class ZKBasedRevisionManager impl
             throw new IOException("Exception while obtaining lock.", e);
         } catch (InterruptedException e) {
             throw new IOException("Exception while obtaining lock.", e);
-        }
-        finally {
+        } finally {
             wLock.unlock();
         }
         LOG.info("Write Transaction committed: " + transaction.toString());
@@ -231,30 +229,30 @@ public class ZKBasedRevisionManager impl
         refreshTransactionList(transaction.getTableName());
         String lockPath = prepareLockNode(transaction.getTableName());
         WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
-                Ids.OPEN_ACL_UNSAFE);
+            Ids.OPEN_ACL_UNSAFE);
         RMLockListener myLockListener = new RMLockListener();
         wLock.setLockListener(myLockListener);
         try {
             boolean lockGrabbed = wLock.lock();
             if (lockGrabbed == false) {
-              //TO DO : Let this request queue up and try obtaining lock.
+                //TO DO : Let this request queue up and try obtaining lock.
                 throw new IOException(
-                        "Unable to obtain lock while aborting transaction. "
-                                + transaction.toString());
+                    "Unable to obtain lock while aborting transaction. "
+                        + transaction.toString());
             } else {
                 String tableName = transaction.getTableName();
                 List<String> colFamilies = transaction.getColumnFamilies();
                 FamilyRevision revisionData = transaction
-                        .getFamilyRevisionInfo();
+                    .getFamilyRevisionInfo();
                 for (String cfamily : colFamilies) {
                     String path = PathUtil.getRunningTxnInfoPath(
-                            baseDir, tableName, cfamily);
+                        baseDir, tableName, cfamily);
                     zkUtil.updateData(path, revisionData,
-                            ZKUtil.UpdateMode.REMOVE);
+                        ZKUtil.UpdateMode.REMOVE);
                     path = PathUtil.getAbortInformationPath(baseDir,
-                            tableName, cfamily);
+                        tableName, cfamily);
                     zkUtil.updateData(path, revisionData,
-                            ZKUtil.UpdateMode.APPEND);
+                        ZKUtil.UpdateMode.APPEND);
                 }
 
             }
@@ -262,54 +260,53 @@ public class ZKBasedRevisionManager impl
             throw new IOException("Exception while obtaining lock.", e);
         } catch (InterruptedException e) {
             throw new IOException("Exception while obtaining lock.", e);
-        }
-        finally {
+        } finally {
             wLock.unlock();
         }
         LOG.info("Write Transaction aborted: " + transaction.toString());
     }
 
 
-     /* @param transaction
-     /* @throws IOException
-      * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#keepAlive(org.apache.hcatalog.hbase.snapshot.Transaction)
-      */
-     public void keepAlive(Transaction transaction)
-            throws IOException {
-
-         refreshTransactionList(transaction.getTableName());
-         transaction.keepAliveTransaction();
-         String lockPath = prepareLockNode(transaction.getTableName());
-         WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
-                 Ids.OPEN_ACL_UNSAFE);
-         RMLockListener myLockListener = new RMLockListener();
-         wLock.setLockListener(myLockListener);
-         try {
-             boolean lockGrabbed = wLock.lock();
-             if (lockGrabbed == false) {
-               //TO DO : Let this request queue up and try obtaining lock.
-                 throw new IOException(
-                         "Unable to obtain lock for keep alive of transaction. "
-                                 + transaction.toString());
-             }else {
-                 String tableName = transaction.getTableName();
-                 List<String> colFamilies = transaction.getColumnFamilies();
-                 FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
-                 for (String cfamily : colFamilies) {
-                     String path = PathUtil.getRunningTxnInfoPath(
-                             baseDir, tableName, cfamily);
-                     zkUtil.updateData(path, revisionData,
-                             ZKUtil.UpdateMode.KEEP_ALIVE);
-                 }
-
-             }
-         } catch (KeeperException e) {
-             throw new IOException("Exception while obtaining lock.", e);
-         } catch (InterruptedException e) {
-             throw new IOException("Exception while obtaining lock.", e);
-         }finally {
-             wLock.unlock();
-         }
+    /* @param transaction
+   /* @throws IOException
+    * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#keepAlive(org.apache.hcatalog.hbase.snapshot.Transaction)
+    */
+    public void keepAlive(Transaction transaction)
+        throws IOException {
+
+        refreshTransactionList(transaction.getTableName());
+        transaction.keepAliveTransaction();
+        String lockPath = prepareLockNode(transaction.getTableName());
+        WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
+            Ids.OPEN_ACL_UNSAFE);
+        RMLockListener myLockListener = new RMLockListener();
+        wLock.setLockListener(myLockListener);
+        try {
+            boolean lockGrabbed = wLock.lock();
+            if (lockGrabbed == false) {
+                //TO DO : Let this request queue up and try obtaining lock.
+                throw new IOException(
+                    "Unable to obtain lock for keep alive of transaction. "
+                        + transaction.toString());
+            } else {
+                String tableName = transaction.getTableName();
+                List<String> colFamilies = transaction.getColumnFamilies();
+                FamilyRevision revisionData = transaction.getFamilyRevisionInfo();
+                for (String cfamily : colFamilies) {
+                    String path = PathUtil.getRunningTxnInfoPath(
+                        baseDir, tableName, cfamily);
+                    zkUtil.updateData(path, revisionData,
+                        ZKUtil.UpdateMode.KEEP_ALIVE);
+                }
+
+            }
+        } catch (KeeperException e) {
+            throw new IOException("Exception while obtaining lock.", e);
+        } catch (InterruptedException e) {
+            throw new IOException("Exception while obtaining lock.", e);
+        } finally {
+            wLock.unlock();
+        }
 
     }
 
@@ -320,13 +317,13 @@ public class ZKBasedRevisionManager impl
     /* @throws IOException
      * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String)
      */
-    public TableSnapshot createSnapshot(String tableName) throws IOException{
+    public TableSnapshot createSnapshot(String tableName) throws IOException {
         refreshTransactionList(tableName);
         long latestID = zkUtil.currentID(tableName);
         HashMap<String, Long> cfMap = new HashMap<String, Long>();
         List<String> columnFamilyNames = zkUtil.getColumnFamiliesOfTable(tableName);
 
-        for(String cfName: columnFamilyNames){
+        for (String cfName : columnFamilyNames) {
             String cfPath = PathUtil.getRunningTxnInfoPath(baseDir, tableName, cfName);
             List<FamilyRevision> tranxList = zkUtil.getTransactionList(cfPath);
             long version;
@@ -334,15 +331,15 @@ public class ZKBasedRevisionManager impl
                 Collections.sort(tranxList);
                 // get the smallest running Transaction ID
                 long runningVersion = tranxList.get(0).getRevision();
-                version = runningVersion -1;
+                version = runningVersion - 1;
             } else {
                 version = latestID;
             }
             cfMap.put(cfName, version);
         }
 
-        TableSnapshot snapshot = new TableSnapshot(tableName, cfMap,latestID);
-        LOG.debug("Created snapshot For table: "+tableName+" snapshot: "+snapshot);
+        TableSnapshot snapshot = new TableSnapshot(tableName, cfMap, latestID);
+        LOG.debug("Created snapshot For table: " + tableName + " snapshot: " + snapshot);
         return snapshot;
     }
 
@@ -354,18 +351,18 @@ public class ZKBasedRevisionManager impl
     /* @throws IOException
      * @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String, long)
      */
-    public TableSnapshot createSnapshot(String tableName, long revision) throws IOException{
+    public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
 
         long currentID = zkUtil.currentID(tableName);
         if (revision > currentID) {
             throw new IOException(
-                    "The revision specified in the snapshot is higher than the current revision of the table.");
+                "The revision specified in the snapshot is higher than the current revision of the table.");
         }
         refreshTransactionList(tableName);
         HashMap<String, Long> cfMap = new HashMap<String, Long>();
         List<String> columnFamilies = zkUtil.getColumnFamiliesOfTable(tableName);
 
-        for(String cf: columnFamilies){
+        for (String cf : columnFamilies) {
             cfMap.put(cf, revision);
         }
 
@@ -380,40 +377,40 @@ public class ZKBasedRevisionManager impl
      * @throws java.io.IOException
      */
     List<FamilyRevision> getRunningTransactions(String table,
-            String columnFamily) throws IOException {
+                                                String columnFamily) throws IOException {
         String path = PathUtil.getRunningTxnInfoPath(baseDir, table,
-                columnFamily);
+            columnFamily);
         return zkUtil.getTransactionList(path);
     }
 
     @Override
-     public List<FamilyRevision> getAbortedWriteTransactions(String table,
-            String columnFamily) throws IOException {
-         String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
-         return zkUtil.getTransactionList(path);
+    public List<FamilyRevision> getAbortedWriteTransactions(String table,
+                                                            String columnFamily) throws IOException {
+        String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
+        return zkUtil.getTransactionList(path);
     }
 
-     private void refreshTransactionList(String tableName) throws IOException{
+    private void refreshTransactionList(String tableName) throws IOException {
         String lockPath = prepareLockNode(tableName);
         WriteLock wLock = new WriteLock(zkUtil.getSession(), lockPath,
-                Ids.OPEN_ACL_UNSAFE);
+            Ids.OPEN_ACL_UNSAFE);
         RMLockListener myLockListener = new RMLockListener();
         wLock.setLockListener(myLockListener);
         try {
             boolean lockGrabbed = wLock.lock();
             if (lockGrabbed == false) {
-              //TO DO : Let this request queue up and try obtaining lock.
+                //TO DO : Let this request queue up and try obtaining lock.
                 throw new IOException(
-                        "Unable to obtain lock while refreshing transactions of table "
-                                + tableName + ".");
-            }else {
+                    "Unable to obtain lock while refreshing transactions of table "
+                        + tableName + ".");
+            } else {
                 List<String> cfPaths = zkUtil
-                        .getColumnFamiliesOfTable(tableName);
+                    .getColumnFamiliesOfTable(tableName);
                 for (String cf : cfPaths) {
                     String runningDataPath = PathUtil.getRunningTxnInfoPath(
-                            baseDir, tableName, cf);
+                        baseDir, tableName, cf);
                     zkUtil.refreshTransactions(runningDataPath);
-        }
+                }
 
             }
         } catch (KeeperException e) {
@@ -424,22 +421,22 @@ public class ZKBasedRevisionManager impl
             wLock.unlock();
         }
 
-     }
+    }
 
-     private String prepareLockNode(String tableName) throws IOException{
-         String txnDataPath = PathUtil.getTxnDataPath(this.baseDir, tableName);
-         String lockPath = PathUtil.getLockManagementNode(txnDataPath);
-         zkUtil.ensurePathExists(lockPath, null, Ids.OPEN_ACL_UNSAFE,
-                 CreateMode.PERSISTENT);
-         return lockPath;
-     }
+    private String prepareLockNode(String tableName) throws IOException {
+        String txnDataPath = PathUtil.getTxnDataPath(this.baseDir, tableName);
+        String lockPath = PathUtil.getLockManagementNode(txnDataPath);
+        zkUtil.ensurePathExists(lockPath, null, Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT);
+        return lockPath;
+    }
 
     /*
      * This class is a listener class for the locks used in revision management.
      * TBD: Use the following class to signal that that the lock is actually
      * been granted.
      */
-     class RMLockListener implements LockListener {
+    class RMLockListener implements LockListener {
 
         /*
          * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
@@ -457,7 +454,7 @@ public class ZKBasedRevisionManager impl
 
         }
 
-     }
+    }
 
 
 }

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKUtil.java Mon Sep 10 23:28:55 2012
@@ -43,15 +43,17 @@ import org.slf4j.LoggerFactory;
 
 class ZKUtil {
 
-    private int              DEFAULT_SESSION_TIMEOUT = 1000000;
-    private ZooKeeper        zkSession;
-    private String           baseDir;
-    private String           connectString;
+    private int DEFAULT_SESSION_TIMEOUT = 1000000;
+    private ZooKeeper zkSession;
+    private String baseDir;
+    private String connectString;
     private static final Logger LOG = LoggerFactory.getLogger(ZKUtil.class);
 
     static enum UpdateMode {
         APPEND, REMOVE, KEEP_ALIVE
-    };
+    }
+
+    ;
 
     ZKUtil(String connection, String baseDir) {
         this.connectString = connection;
@@ -66,20 +68,20 @@ class ZKUtil {
      * @throws IOException
      */
     void setUpZnodesForTable(String table, List<String> families)
-            throws IOException {
+        throws IOException {
 
         String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir, table);
         ensurePathExists(transactionDataTablePath, null, Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+            CreateMode.PERSISTENT);
         for (String cf : families) {
             String runningDataPath = PathUtil.getRunningTxnInfoPath(
-                    this.baseDir, table, cf);
+                this.baseDir, table, cf);
             ensurePathExists(runningDataPath, null, Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
+                CreateMode.PERSISTENT);
             String abortDataPath = PathUtil.getAbortInformationPath(
-                    this.baseDir, table, cf);
+                this.baseDir, table, cf);
             ensurePathExists(abortDataPath, null, Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
+                CreateMode.PERSISTENT);
         }
 
     }
@@ -95,7 +97,7 @@ class ZKUtil {
      * @throws IOException
      */
     void ensurePathExists(String path, byte[] data, List<ACL> acl,
-            CreateMode flags) throws IOException {
+                          CreateMode flags) throws IOException {
         String[] dirs = path.split("/");
         String parentPath = "";
         for (String subDir : dirs) {
@@ -108,7 +110,7 @@ class ZKUtil {
                     }
                 } catch (Exception e) {
                     throw new IOException("Exception while creating path "
-                            + parentPath, e);
+                        + parentPath, e);
                 }
             }
         }
@@ -131,15 +133,15 @@ class ZKUtil {
             children = getSession().getChildren(path, false);
         } catch (KeeperException e) {
             LOG.warn("Caught: ", e);
-            throw new IOException("Exception while obtaining columns of table.",e);
+            throw new IOException("Exception while obtaining columns of table.", e);
         } catch (InterruptedException e) {
             LOG.warn("Caught: ", e);
-            throw new IOException("Exception while obtaining columns of table.",e);
+            throw new IOException("Exception while obtaining columns of table.", e);
         }
 
         for (String child : children) {
             if ((child.contains("idgen") == false)
-                    && (child.contains("_locknode_") == false)) {
+                && (child.contains("_locknode_") == false)) {
                 columnFamlies.add(child);
             }
         }
@@ -157,7 +159,7 @@ class ZKUtil {
         Stat stat;
         String clockPath = PathUtil.getClockPath(this.baseDir);
         ensurePathExists(clockPath, null, Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+            CreateMode.PERSISTENT);
         try {
             getSession().exists(clockPath, false);
             stat = getSession().setData(clockPath, null, -1);
@@ -184,10 +186,10 @@ class ZKUtil {
     long nextId(String tableName) throws IOException {
         String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
         ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+            CreateMode.PERSISTENT);
         String lockNode = PathUtil.getLockManagementNode(idNode);
         ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+            CreateMode.PERSISTENT);
         IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
         long id = idf.obtainID();
         return id;
@@ -200,13 +202,13 @@ class ZKUtil {
      * @return the long The revision number to use by any transaction.
      * @throws IOException Signals that an I/O exception has occurred.
      */
-    long currentID(String tableName) throws IOException{
+    long currentID(String tableName) throws IOException {
         String idNode = PathUtil.getRevisionIDNode(this.baseDir, tableName);
         ensurePathExists(idNode, Bytes.toBytes("0"), Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+            CreateMode.PERSISTENT);
         String lockNode = PathUtil.getLockManagementNode(idNode);
         ensurePathExists(lockNode, null, Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+            CreateMode.PERSISTENT);
         IDGenerator idf = new IDGenerator(getSession(), tableName, idNode);
         long id = idf.readID();
         return id;
@@ -221,7 +223,7 @@ class ZKUtil {
      * @throws IOException
      */
     List<FamilyRevision> getTransactionList(String path)
-            throws IOException {
+        throws IOException {
 
         byte[] data = getRawData(path, new Stat());
         ArrayList<FamilyRevision> wtxnList = new ArrayList<FamilyRevision>();
@@ -235,7 +237,7 @@ class ZKUtil {
         while (itr.hasNext()) {
             StoreFamilyRevision wtxn = itr.next();
             wtxnList.add(new FamilyRevision(wtxn.getRevision(), wtxn
-                    .getTimestamp()));
+                .getTimestamp()));
         }
 
         return wtxnList;
@@ -255,8 +257,8 @@ class ZKUtil {
             data = getSession().getData(path, false, stat);
         } catch (Exception e) {
             throw new IOException(
-                    "Exception while obtaining raw data from zookeeper path "
-                            + path, e);
+                "Exception while obtaining raw data from zookeeper path "
+                    + path, e);
         }
         return data;
     }
@@ -271,9 +273,9 @@ class ZKUtil {
         String txnBaseNode = PathUtil.getTransactionBasePath(this.baseDir);
         String clockNode = PathUtil.getClockPath(this.baseDir);
         ensurePathExists(txnBaseNode, null, Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+            CreateMode.PERSISTENT);
         ensurePathExists(clockNode, null, Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+            CreateMode.PERSISTENT);
     }
 
     /**
@@ -298,12 +300,12 @@ class ZKUtil {
      * @return ZooKeeper An instance of zookeeper client.
      * @throws IOException
      */
-     ZooKeeper getSession() throws IOException {
+    ZooKeeper getSession() throws IOException {
         if (zkSession == null || zkSession.getState() == States.CLOSED) {
             synchronized (this) {
                 if (zkSession == null || zkSession.getState() == States.CLOSED) {
                     zkSession = new ZooKeeper(this.connectString,
-                            this.DEFAULT_SESSION_TIMEOUT, new ZKWatcher());
+                        this.DEFAULT_SESSION_TIMEOUT, new ZKWatcher());
                 }
             }
         }
@@ -319,11 +321,11 @@ class ZKUtil {
      * @throws IOException
      */
     void updateData(String path, FamilyRevision updateTx, UpdateMode mode)
-            throws IOException {
+        throws IOException {
 
         if (updateTx == null) {
             throw new IOException(
-                    "The transaction to be updated found to be null.");
+                "The transaction to be updated found to be null.");
         }
         List<FamilyRevision> currentData = getTransactionList(path);
         List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
@@ -337,36 +339,36 @@ class ZKUtil {
             }
         }
         switch (mode) {
-            case REMOVE:
-                if (dataFound == false) {
-                    throw new IOException(
-                            "The transaction to be removed not found in the data.");
-                }
-                LOG.info("Removed trasaction : " + updateTx.toString());
-                break;
-            case KEEP_ALIVE:
-                if (dataFound == false) {
-                    throw new IOException(
-                            "The transaction to be kept alove not found in the data. It might have been expired.");
-                }
-                newData.add(updateTx);
-                LOG.info("keep alive of transaction : " + updateTx.toString());
-                break;
-            case APPEND:
-                if (dataFound == true) {
-                    throw new IOException(
-                            "The data to be appended already exists.");
-                }
-                newData.add(updateTx);
-                LOG.info("Added transaction : " + updateTx.toString());
-                break;
+        case REMOVE:
+            if (dataFound == false) {
+                throw new IOException(
+                    "The transaction to be removed not found in the data.");
+            }
+            LOG.info("Removed trasaction : " + updateTx.toString());
+            break;
+        case KEEP_ALIVE:
+            if (dataFound == false) {
+                throw new IOException(
+                    "The transaction to be kept alove not found in the data. It might have been expired.");
+            }
+            newData.add(updateTx);
+            LOG.info("keep alive of transaction : " + updateTx.toString());
+            break;
+        case APPEND:
+            if (dataFound == true) {
+                throw new IOException(
+                    "The data to be appended already exists.");
+            }
+            newData.add(updateTx);
+            LOG.info("Added transaction : " + updateTx.toString());
+            break;
         }
 
         // For serialization purposes.
         List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
         for (FamilyRevision wtxn : newData) {
             StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
-                    wtxn.getExpireTimestamp());
+                wtxn.getExpireTimestamp());
             newTxnList.add(newTxn);
         }
         StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
@@ -377,10 +379,10 @@ class ZKUtil {
             stat = zkSession.setData(path, newByteData, -1);
         } catch (KeeperException e) {
             throw new IOException(
-                    "Exception while updating trasactional data. ", e);
+                "Exception while updating trasactional data. ", e);
         } catch (InterruptedException e) {
             throw new IOException(
-                    "Exception while updating trasactional data. ", e);
+                "Exception while updating trasactional data. ", e);
         }
 
         if (stat != null) {
@@ -395,7 +397,7 @@ class ZKUtil {
      * @param path The path to the transaction data.
      * @throws IOException Signals that an I/O exception has occurred.
      */
-    void refreshTransactions(String path) throws IOException{
+    void refreshTransactions(String path) throws IOException {
         List<FamilyRevision> currentData = getTransactionList(path);
         List<FamilyRevision> newData = new ArrayList<FamilyRevision>();
 
@@ -405,11 +407,11 @@ class ZKUtil {
             }
         }
 
-        if(newData.equals(currentData) == false){
+        if (newData.equals(currentData) == false) {
             List<StoreFamilyRevision> newTxnList = new ArrayList<StoreFamilyRevision>();
             for (FamilyRevision wtxn : newData) {
                 StoreFamilyRevision newTxn = new StoreFamilyRevision(wtxn.getRevision(),
-                        wtxn.getExpireTimestamp());
+                    wtxn.getExpireTimestamp());
                 newTxnList.add(newTxn);
             }
             StoreFamilyRevisionList wtxnList = new StoreFamilyRevisionList(newTxnList);
@@ -419,10 +421,10 @@ class ZKUtil {
                 zkSession.setData(path, newByteData, -1);
             } catch (KeeperException e) {
                 throw new IOException(
-                        "Exception while updating trasactional data. ", e);
+                    "Exception while updating trasactional data. ", e);
             } catch (InterruptedException e) {
                 throw new IOException(
-                        "Exception while updating trasactional data. ", e);
+                    "Exception while updating trasactional data. ", e);
             }
 
         }
@@ -437,7 +439,7 @@ class ZKUtil {
      */
     void deleteZNodes(String tableName) throws IOException {
         String transactionDataTablePath = PathUtil.getTxnDataPath(baseDir,
-                tableName);
+            tableName);
         deleteRecursively(transactionDataTablePath);
     }
 
@@ -452,10 +454,10 @@ class ZKUtil {
             getSession().delete(path, -1);
         } catch (KeeperException e) {
             throw new IOException(
-                    "Exception while deleting path " + path + ".", e);
+                "Exception while deleting path " + path + ".", e);
         } catch (InterruptedException e) {
             throw new IOException(
-                    "Exception while deleting path " + path + ".", e);
+                "Exception while deleting path " + path + ".", e);
         }
     }
 
@@ -471,7 +473,7 @@ class ZKUtil {
             return new byte[0];
         try {
             TSerializer serializer = new TSerializer(
-                    new TBinaryProtocol.Factory());
+                new TBinaryProtocol.Factory());
             byte[] bytes = serializer.serialize(obj);
             return bytes;
         } catch (Exception e) {
@@ -492,7 +494,7 @@ class ZKUtil {
             return;
         try {
             TDeserializer deserializer = new TDeserializer(
-                    new TBinaryProtocol.Factory());
+                new TBinaryProtocol.Factory());
             deserializer.deserialize(obj, data);
         } catch (Exception e) {
             throw new IOException("Deserialization error: " + e.getMessage(), e);
@@ -502,12 +504,12 @@ class ZKUtil {
     private class ZKWatcher implements Watcher {
         public void process(WatchedEvent event) {
             switch (event.getState()) {
-                case Expired:
-                    LOG.info("The client session has expired. Try opening a new "
-                            + "session and connecting again.");
-                    zkSession = null;
-                    break;
-                default:
+            case Expired:
+                LOG.info("The client session has expired. Try opening a new "
+                    + "session and connecting again.");
+                zkSession = null;
+                break;
+            default:
 
             }
         }



Mime
View raw message