hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1522098 [6/30] - in /hive/branches/vectorization: ./ beeline/src/test/org/apache/hive/beeline/src/test/ bin/ bin/ext/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/ap...
Date Thu, 12 Sep 2013 01:21:29 GMT
Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Thu Sep 12 01:21:10 2013
@@ -38,11 +38,11 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
 import org.apache.hcatalog.hbase.snapshot.RevisionManager;
 import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
-import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hive.hcatalog.mapreduce.InputJobInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,204 +52,204 @@ import org.slf4j.LoggerFactory;
  */
 class HbaseSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, Result> {
 
-    static final Logger LOG = LoggerFactory.getLogger(HbaseSnapshotRecordReader.class);
-    private final InputJobInfo inpJobInfo;
-    private final Configuration conf;
-    private final int maxRevisions = 1;
-    private ResultScanner scanner;
-    private Scan scan;
-    private HTable htable;
-    private TableSnapshot snapshot;
-    private Iterator<Result> resultItr;
-    private Set<Long> allAbortedTransactions;
-    private DataOutputBuffer valueOut = new DataOutputBuffer();
-    private DataInputBuffer valueIn = new DataInputBuffer();
-
-    HbaseSnapshotRecordReader(InputJobInfo inputJobInfo, Configuration conf) throws IOException {
-        this.inpJobInfo = inputJobInfo;
-        this.conf = conf;
-        String snapshotString = conf.get(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
-        HCatTableSnapshot hcatSnapshot = (HCatTableSnapshot) HCatUtil
-            .deserialize(snapshotString);
-        this.snapshot = HBaseRevisionManagerUtil.convertSnapshot(hcatSnapshot,
-            inpJobInfo.getTableInfo());
-    }
-
-    public void init() throws IOException {
-        restart(scan.getStartRow());
-    }
-
-    public void restart(byte[] firstRow) throws IOException {
-        allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan);
-        long maxValidRevision = getMaximumRevision(scan, snapshot);
-        while (allAbortedTransactions.contains(maxValidRevision)) {
-            maxValidRevision--;
-        }
-        Scan newScan = new Scan(scan);
-        newScan.setStartRow(firstRow);
-        //TODO: See if filters in 0.92 can be used to optimize the scan
-        //TODO: Consider create a custom snapshot filter
-        //TODO: Make min revision a constant in RM
-        newScan.setTimeRange(0, maxValidRevision + 1);
-        newScan.setMaxVersions();
-        this.scanner = this.htable.getScanner(newScan);
-        resultItr = this.scanner.iterator();
-    }
-
-    private Set<Long> getAbortedTransactions(String tableName, Scan scan) throws IOException {
-        Set<Long> abortedTransactions = new HashSet<Long>();
-        RevisionManager rm = null;
-        try {
-            rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
-            byte[][] families = scan.getFamilies();
-            for (byte[] familyKey : families) {
-                String family = Bytes.toString(familyKey);
-                List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
-                    tableName, family);
-                if (abortedWriteTransactions != null) {
-                    for (FamilyRevision revision : abortedWriteTransactions) {
-                        abortedTransactions.add(revision.getRevision());
-                    }
-                }
-            }
-            return abortedTransactions;
-        } finally {
-            HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
-        }
-    }
-
-    private long getMaximumRevision(Scan scan, TableSnapshot snapshot) {
-        long maxRevision = 0;
-        byte[][] families = scan.getFamilies();
-        for (byte[] familyKey : families) {
-            String family = Bytes.toString(familyKey);
-            long revision = snapshot.getRevision(family);
-            if (revision > maxRevision)
-                maxRevision = revision;
-        }
-        return maxRevision;
-    }
-
-    /*
-     * @param htable The HTable ( of HBase) to use for the record reader.
-     *
-     */
-    public void setHTable(HTable htable) {
-        this.htable = htable;
-    }
-
-    /*
-     * @param scan The scan to be used for reading records.
-     *
-     */
-    public void setScan(Scan scan) {
-        this.scan = scan;
-    }
-
-    @Override
-    public ImmutableBytesWritable createKey() {
-        return new ImmutableBytesWritable();
-    }
-
-    @Override
-    public Result createValue() {
-        return new Result();
-    }
-
-    @Override
-    public long getPos() {
-        // This should be the ordinal tuple in the range;
-        // not clear how to calculate...
-        return 0;
-    }
-
-    @Override
-    public float getProgress() throws IOException {
-        // Depends on the total number of tuples
-        return 0;
-    }
-
-    @Override
-    public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
-        if (this.resultItr == null) {
-            LOG.warn("The HBase result iterator is found null. It is possible"
-                + " that the record reader has already been closed.");
-        } else {
-            while (resultItr.hasNext()) {
-                Result temp = resultItr.next();
-                Result hbaseRow = prepareResult(temp.list());
-                if (hbaseRow != null) {
-                    // Update key and value. Currently no way to avoid serialization/de-serialization
-                    // as no setters are available.
-                    key.set(hbaseRow.getRow());
-                    valueOut.reset();
-                    hbaseRow.write(valueOut);
-                    valueIn.reset(valueOut.getData(), valueOut.getLength());
-                    value.readFields(valueIn);
-                    return true;
-                }
-
-            }
-        }
-        return false;
-    }
-
-    private Result prepareResult(List<KeyValue> keyvalues) {
-
-        List<KeyValue> finalKeyVals = new ArrayList<KeyValue>();
-        Map<String, List<KeyValue>> qualValMap = new HashMap<String, List<KeyValue>>();
-        for (KeyValue kv : keyvalues) {
-            byte[] cf = kv.getFamily();
-            byte[] qualifier = kv.getQualifier();
-            String key = Bytes.toString(cf) + ":" + Bytes.toString(qualifier);
-            List<KeyValue> kvs;
-            if (qualValMap.containsKey(key)) {
-                kvs = qualValMap.get(key);
-            } else {
-                kvs = new ArrayList<KeyValue>();
-            }
-
-            String family = Bytes.toString(kv.getFamily());
-            //Ignore aborted transactions
-            if (allAbortedTransactions.contains(kv.getTimestamp())) {
-                continue;
-            }
-
-            long desiredTS = snapshot.getRevision(family);
-            if (kv.getTimestamp() <= desiredTS) {
-                kvs.add(kv);
-            }
-            qualValMap.put(key, kvs);
-        }
-
-        Set<String> keys = qualValMap.keySet();
-        for (String cf : keys) {
-            List<KeyValue> kvs = qualValMap.get(cf);
-            if (maxRevisions <= kvs.size()) {
-                for (int i = 0; i < maxRevisions; i++) {
-                    finalKeyVals.add(kvs.get(i));
-                }
-            } else {
-                finalKeyVals.addAll(kvs);
-            }
-        }
-
-        if (finalKeyVals.size() == 0) {
-            return null;
-        } else {
-            KeyValue[] kvArray = new KeyValue[finalKeyVals.size()];
-            finalKeyVals.toArray(kvArray);
-            return new Result(kvArray);
-        }
-    }
-
-    /*
-     * @see org.apache.hadoop.hbase.mapred.TableRecordReader#close()
-     */
-    @Override
-    public void close() {
-        this.resultItr = null;
-        this.scanner.close();
-    }
+  static final Logger LOG = LoggerFactory.getLogger(HbaseSnapshotRecordReader.class);
+  private final InputJobInfo inpJobInfo;
+  private final Configuration conf;
+  private final int maxRevisions = 1;
+  private ResultScanner scanner;
+  private Scan scan;
+  private HTable htable;
+  private TableSnapshot snapshot;
+  private Iterator<Result> resultItr;
+  private Set<Long> allAbortedTransactions;
+  private DataOutputBuffer valueOut = new DataOutputBuffer();
+  private DataInputBuffer valueIn = new DataInputBuffer();
+
+  HbaseSnapshotRecordReader(InputJobInfo inputJobInfo, Configuration conf) throws IOException {
+    this.inpJobInfo = inputJobInfo;
+    this.conf = conf;
+    String snapshotString = conf.get(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+    HCatTableSnapshot hcatSnapshot = (HCatTableSnapshot) HCatUtil
+      .deserialize(snapshotString);
+    this.snapshot = HBaseRevisionManagerUtil.convertSnapshot(hcatSnapshot,
+      inpJobInfo.getTableInfo());
+  }
+
+  public void init() throws IOException {
+    restart(scan.getStartRow());
+  }
+
+  public void restart(byte[] firstRow) throws IOException {
+    allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan);
+    long maxValidRevision = getMaximumRevision(scan, snapshot);
+    while (allAbortedTransactions.contains(maxValidRevision)) {
+      maxValidRevision--;
+    }
+    Scan newScan = new Scan(scan);
+    newScan.setStartRow(firstRow);
+    //TODO: See if filters in 0.92 can be used to optimize the scan
+    //TODO: Consider create a custom snapshot filter
+    //TODO: Make min revision a constant in RM
+    newScan.setTimeRange(0, maxValidRevision + 1);
+    newScan.setMaxVersions();
+    this.scanner = this.htable.getScanner(newScan);
+    resultItr = this.scanner.iterator();
+  }
+
+  private Set<Long> getAbortedTransactions(String tableName, Scan scan) throws IOException {
+    Set<Long> abortedTransactions = new HashSet<Long>();
+    RevisionManager rm = null;
+    try {
+      rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+      byte[][] families = scan.getFamilies();
+      for (byte[] familyKey : families) {
+        String family = Bytes.toString(familyKey);
+        List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
+          tableName, family);
+        if (abortedWriteTransactions != null) {
+          for (FamilyRevision revision : abortedWriteTransactions) {
+            abortedTransactions.add(revision.getRevision());
+          }
+        }
+      }
+      return abortedTransactions;
+    } finally {
+      HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
+    }
+  }
+
+  private long getMaximumRevision(Scan scan, TableSnapshot snapshot) {
+    long maxRevision = 0;
+    byte[][] families = scan.getFamilies();
+    for (byte[] familyKey : families) {
+      String family = Bytes.toString(familyKey);
+      long revision = snapshot.getRevision(family);
+      if (revision > maxRevision)
+        maxRevision = revision;
+    }
+    return maxRevision;
+  }
+
+  /*
+   * @param htable The HTable ( of HBase) to use for the record reader.
+   *
+   */
+  public void setHTable(HTable htable) {
+    this.htable = htable;
+  }
+
+  /*
+   * @param scan The scan to be used for reading records.
+   *
+   */
+  public void setScan(Scan scan) {
+    this.scan = scan;
+  }
+
+  @Override
+  public ImmutableBytesWritable createKey() {
+    return new ImmutableBytesWritable();
+  }
+
+  @Override
+  public Result createValue() {
+    return new Result();
+  }
+
+  @Override
+  public long getPos() {
+    // This should be the ordinal tuple in the range;
+    // not clear how to calculate...
+    return 0;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    // Depends on the total number of tuples
+    return 0;
+  }
+
+  @Override
+  public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
+    if (this.resultItr == null) {
+      LOG.warn("The HBase result iterator is found null. It is possible"
+        + " that the record reader has already been closed.");
+    } else {
+      while (resultItr.hasNext()) {
+        Result temp = resultItr.next();
+        Result hbaseRow = prepareResult(temp.list());
+        if (hbaseRow != null) {
+          // Update key and value. Currently no way to avoid serialization/de-serialization
+          // as no setters are available.
+          key.set(hbaseRow.getRow());
+          valueOut.reset();
+          hbaseRow.write(valueOut);
+          valueIn.reset(valueOut.getData(), valueOut.getLength());
+          value.readFields(valueIn);
+          return true;
+        }
+
+      }
+    }
+    return false;
+  }
+
+  private Result prepareResult(List<KeyValue> keyvalues) {
+
+    List<KeyValue> finalKeyVals = new ArrayList<KeyValue>();
+    Map<String, List<KeyValue>> qualValMap = new HashMap<String, List<KeyValue>>();
+    for (KeyValue kv : keyvalues) {
+      byte[] cf = kv.getFamily();
+      byte[] qualifier = kv.getQualifier();
+      String key = Bytes.toString(cf) + ":" + Bytes.toString(qualifier);
+      List<KeyValue> kvs;
+      if (qualValMap.containsKey(key)) {
+        kvs = qualValMap.get(key);
+      } else {
+        kvs = new ArrayList<KeyValue>();
+      }
+
+      String family = Bytes.toString(kv.getFamily());
+      //Ignore aborted transactions
+      if (allAbortedTransactions.contains(kv.getTimestamp())) {
+        continue;
+      }
+
+      long desiredTS = snapshot.getRevision(family);
+      if (kv.getTimestamp() <= desiredTS) {
+        kvs.add(kv);
+      }
+      qualValMap.put(key, kvs);
+    }
+
+    Set<String> keys = qualValMap.keySet();
+    for (String cf : keys) {
+      List<KeyValue> kvs = qualValMap.get(cf);
+      if (maxRevisions <= kvs.size()) {
+        for (int i = 0; i < maxRevisions; i++) {
+          finalKeyVals.add(kvs.get(i));
+        }
+      } else {
+        finalKeyVals.addAll(kvs);
+      }
+    }
+
+    if (finalKeyVals.size() == 0) {
+      return null;
+    } else {
+      KeyValue[] kvArray = new KeyValue[finalKeyVals.size()];
+      finalKeyVals.toArray(kvArray);
+      return new Result(kvArray);
+    }
+  }
+
+  /*
+   * @see org.apache.hadoop.hbase.mapred.TableRecordReader#close()
+   */
+  @Override
+  public void close() {
+    this.resultItr = null;
+    this.scanner.close();
+  }
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java Thu Sep 12 01:21:10 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hive.hcatalog.mapreduce.HCatMapRedUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,192 +61,192 @@ import static org.apache.hadoop.hbase.ma
  * and data needs to be bulk loaded onto HBase.
  */
 class ImportSequenceFile {
-    private final static Logger LOG = LoggerFactory.getLogger(ImportSequenceFile.class);
-    private final static String NAME = "HCatImportSequenceFile";
-    private final static String IMPORTER_WORK_DIR = "_IMPORTER_MR_WORK_DIR";
+  private final static Logger LOG = LoggerFactory.getLogger(ImportSequenceFile.class);
+  private final static String NAME = "HCatImportSequenceFile";
+  private final static String IMPORTER_WORK_DIR = "_IMPORTER_MR_WORK_DIR";
+
+
+  private static class SequenceFileImporter extends Mapper<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> {
+
+    @Override
+    public void map(ImmutableBytesWritable rowKey, Put value,
+            Context context)
+      throws IOException {
+      try {
+        context.write(new ImmutableBytesWritable(value.getRow()), value);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private static class ImporterOutputFormat extends HFileOutputFormat {
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
+      final OutputCommitter baseOutputCommitter = super.getOutputCommitter(context);
+
+      return new OutputCommitter() {
+        @Override
+        public void setupJob(JobContext jobContext) throws IOException {
+          baseOutputCommitter.setupJob(jobContext);
+        }
+
+        @Override
+        public void setupTask(TaskAttemptContext taskContext) throws IOException {
+          baseOutputCommitter.setupTask(taskContext);
+        }
 
+        @Override
+        public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+          return baseOutputCommitter.needsTaskCommit(taskContext);
+        }
+
+        @Override
+        public void commitTask(TaskAttemptContext taskContext) throws IOException {
+          baseOutputCommitter.commitTask(taskContext);
+        }
 
-    private static class SequenceFileImporter extends Mapper<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> {
+        @Override
+        public void abortTask(TaskAttemptContext taskContext) throws IOException {
+          baseOutputCommitter.abortTask(taskContext);
+        }
 
         @Override
-        public void map(ImmutableBytesWritable rowKey, Put value,
-                        Context context)
-            throws IOException {
+        public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+          try {
+            baseOutputCommitter.abortJob(jobContext, state);
+          } finally {
+            cleanupScratch(jobContext);
+          }
+        }
+
+        @Override
+        public void commitJob(JobContext jobContext) throws IOException {
+          try {
+            baseOutputCommitter.commitJob(jobContext);
+            Configuration conf = jobContext.getConfiguration();
             try {
-                context.write(new ImmutableBytesWritable(value.getRow()), value);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
+              //import hfiles
+              new LoadIncrementalHFiles(conf)
+                .doBulkLoad(HFileOutputFormat.getOutputPath(jobContext),
+                  new HTable(conf,
+                    conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY)));
+            } catch (Exception e) {
+              throw new IOException("BulkLoad failed.", e);
             }
+          } finally {
+            cleanupScratch(jobContext);
+          }
         }
-    }
 
-    private static class ImporterOutputFormat extends HFileOutputFormat {
         @Override
-        public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
-            final OutputCommitter baseOutputCommitter = super.getOutputCommitter(context);
+        public void cleanupJob(JobContext context) throws IOException {
+          try {
+            baseOutputCommitter.cleanupJob(context);
+          } finally {
+            cleanupScratch(context);
+          }
+        }
 
-            return new OutputCommitter() {
-                @Override
-                public void setupJob(JobContext jobContext) throws IOException {
-                    baseOutputCommitter.setupJob(jobContext);
-                }
-
-                @Override
-                public void setupTask(TaskAttemptContext taskContext) throws IOException {
-                    baseOutputCommitter.setupTask(taskContext);
-                }
-
-                @Override
-                public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
-                    return baseOutputCommitter.needsTaskCommit(taskContext);
-                }
-
-                @Override
-                public void commitTask(TaskAttemptContext taskContext) throws IOException {
-                    baseOutputCommitter.commitTask(taskContext);
-                }
-
-                @Override
-                public void abortTask(TaskAttemptContext taskContext) throws IOException {
-                    baseOutputCommitter.abortTask(taskContext);
-                }
-
-                @Override
-                public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
-                    try {
-                        baseOutputCommitter.abortJob(jobContext, state);
-                    } finally {
-                        cleanupScratch(jobContext);
-                    }
-                }
-
-                @Override
-                public void commitJob(JobContext jobContext) throws IOException {
-                    try {
-                        baseOutputCommitter.commitJob(jobContext);
-                        Configuration conf = jobContext.getConfiguration();
-                        try {
-                            //import hfiles
-                            new LoadIncrementalHFiles(conf)
-                                .doBulkLoad(HFileOutputFormat.getOutputPath(jobContext),
-                                    new HTable(conf,
-                                        conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY)));
-                        } catch (Exception e) {
-                            throw new IOException("BulkLoad failed.", e);
-                        }
-                    } finally {
-                        cleanupScratch(jobContext);
-                    }
-                }
-
-                @Override
-                public void cleanupJob(JobContext context) throws IOException {
-                    try {
-                        baseOutputCommitter.cleanupJob(context);
-                    } finally {
-                        cleanupScratch(context);
-                    }
-                }
-
-                private void cleanupScratch(JobContext context) throws IOException {
-                    FileSystem fs = FileSystem.get(context.getConfiguration());
-                    fs.delete(HFileOutputFormat.getOutputPath(context), true);
-                }
-            };
-        }
-    }
-
-    private static Job createSubmittableJob(Configuration conf, String tableName, Path inputDir, Path scratchDir, boolean localMode)
-        throws IOException {
-        Job job = new Job(conf, NAME + "_" + tableName);
-        job.setJarByClass(SequenceFileImporter.class);
-        FileInputFormat.setInputPaths(job, inputDir);
-        job.setInputFormatClass(SequenceFileInputFormat.class);
-        job.setMapperClass(SequenceFileImporter.class);
-
-        HTable table = new HTable(conf, tableName);
-        job.setReducerClass(PutSortReducer.class);
-        FileOutputFormat.setOutputPath(job, scratchDir);
-        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-        job.setMapOutputValueClass(Put.class);
-        HFileOutputFormat.configureIncrementalLoad(job, table);
-        //override OutputFormatClass with our own so we can include cleanup in the committer
-        job.setOutputFormatClass(ImporterOutputFormat.class);
-
-        //local mode doesn't support symbolic links so we have to manually set the actual path
-        if (localMode) {
-            String partitionFile = null;
-            for (URI uri : DistributedCache.getCacheFiles(job.getConfiguration())) {
-                if (DEFAULT_PATH.equals(uri.getFragment())) {
-                    partitionFile = uri.toString();
-                    break;
-                }
-            }
-            partitionFile = partitionFile.substring(0, partitionFile.lastIndexOf("#"));
-            job.getConfiguration().set(TotalOrderPartitioner.PARTITIONER_PATH, partitionFile.toString());
+        private void cleanupScratch(JobContext context) throws IOException {
+          FileSystem fs = FileSystem.get(context.getConfiguration());
+          fs.delete(HFileOutputFormat.getOutputPath(context), true);
         }
+      };
+    }
+  }
 
-        return job;
+  private static Job createSubmittableJob(Configuration conf, String tableName, Path inputDir, Path scratchDir, boolean localMode)
+    throws IOException {
+    Job job = new Job(conf, NAME + "_" + tableName);
+    job.setJarByClass(SequenceFileImporter.class);
+    FileInputFormat.setInputPaths(job, inputDir);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setMapperClass(SequenceFileImporter.class);
+
+    HTable table = new HTable(conf, tableName);
+    job.setReducerClass(PutSortReducer.class);
+    FileOutputFormat.setOutputPath(job, scratchDir);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    job.setMapOutputValueClass(Put.class);
+    HFileOutputFormat.configureIncrementalLoad(job, table);
+    //override OutputFormatClass with our own so we can include cleanup in the committer
+    job.setOutputFormatClass(ImporterOutputFormat.class);
+
+    //local mode doesn't support symbolic links so we have to manually set the actual path
+    if (localMode) {
+      String partitionFile = null;
+      for (URI uri : DistributedCache.getCacheFiles(job.getConfiguration())) {
+        if (DEFAULT_PATH.equals(uri.getFragment())) {
+          partitionFile = uri.toString();
+          break;
+        }
+      }
+      partitionFile = partitionFile.substring(0, partitionFile.lastIndexOf("#"));
+      job.getConfiguration().set(TotalOrderPartitioner.PARTITIONER_PATH, partitionFile.toString());
     }
 
-    /**
-     * Method to run the Importer MapReduce Job. Normally will be called by another MR job
-     * during OutputCommitter.commitJob().
-     * @param parentContext JobContext of the parent job
-     * @param tableName name of table to bulk load data into
-     * @param InputDir path of SequenceFile formatted data to read
-     * @param scratchDir temporary path for the Importer MR job to build the HFiles which will be imported
-     * @return
-     */
-    static boolean runJob(JobContext parentContext, String tableName, Path InputDir, Path scratchDir) {
-        Configuration parentConf = parentContext.getConfiguration();
-        Configuration conf = new Configuration();
-        for (Map.Entry<String, String> el : parentConf) {
-            if (el.getKey().startsWith("hbase."))
-                conf.set(el.getKey(), el.getValue());
-            if (el.getKey().startsWith("mapred.cache.archives"))
-                conf.set(el.getKey(), el.getValue());
-        }
-
-        //Inherit jar dependencies added to distributed cache loaded by parent job
-        conf.set("mapred.job.classpath.archives", parentConf.get("mapred.job.classpath.archives", ""));
-        conf.set("mapreduce.job.cache.archives.visibilities", parentConf.get("mapreduce.job.cache.archives.visibilities", ""));
-
-        //Temporary fix until hbase security is ready
-        //We need the written HFile to be world readable so
-        //hbase regionserver user has the privileges to perform a hdfs move
-        if (parentConf.getBoolean("hadoop.security.authorization", false)) {
-            FsPermission.setUMask(conf, FsPermission.valueOf("----------"));
-        }
-
-        conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
-        conf.setBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, false);
-
-        boolean localMode = "local".equals(conf.get("mapred.job.tracker"));
-
-        boolean success = false;
-        try {
-            FileSystem fs = FileSystem.get(parentConf);
-            Path workDir = new Path(new Job(parentConf).getWorkingDirectory(), IMPORTER_WORK_DIR);
-            if (!fs.mkdirs(workDir))
-                throw new IOException("Importer work directory already exists: " + workDir);
-            Job job = createSubmittableJob(conf, tableName, InputDir, scratchDir, localMode);
-            job.setWorkingDirectory(workDir);
-            job.getCredentials().addAll(parentContext.getCredentials());
-            success = job.waitForCompletion(true);
-            fs.delete(workDir, true);
-            //We only cleanup on success because failure might've been caused by existence of target directory
-            if (localMode && success) {
-                new ImporterOutputFormat().getOutputCommitter(org.apache.hadoop.mapred.HCatMapRedUtil.createTaskAttemptContext(conf, new TaskAttemptID())).commitJob(job);
-            }
-        } catch (InterruptedException e) {
-            LOG.error("ImportSequenceFile Failed", e);
-        } catch (ClassNotFoundException e) {
-            LOG.error("ImportSequenceFile Failed", e);
-        } catch (IOException e) {
-            LOG.error("ImportSequenceFile Failed", e);
-        }
-        return success;
+    return job;
+  }
+
+  /**
+   * Method to run the Importer MapReduce Job. Normally will be called by another MR job
+   * during OutputCommitter.commitJob().
+   * @param parentContext JobContext of the parent job
+   * @param tableName name of table to bulk load data into
+   * @param InputDir path of SequenceFile formatted data to read
+   * @param scratchDir temporary path for the Importer MR job to build the HFiles which will be imported
+   * @return
+   */
+  static boolean runJob(JobContext parentContext, String tableName, Path InputDir, Path scratchDir) {
+    Configuration parentConf = parentContext.getConfiguration();
+    Configuration conf = new Configuration();
+    for (Map.Entry<String, String> el : parentConf) {
+      if (el.getKey().startsWith("hbase."))
+        conf.set(el.getKey(), el.getValue());
+      if (el.getKey().startsWith("mapred.cache.archives"))
+        conf.set(el.getKey(), el.getValue());
+    }
+
+    //Inherit jar dependencies added to distributed cache loaded by parent job
+    conf.set("mapred.job.classpath.archives", parentConf.get("mapred.job.classpath.archives", ""));
+    conf.set("mapreduce.job.cache.archives.visibilities", parentConf.get("mapreduce.job.cache.archives.visibilities", ""));
+
+    //Temporary fix until hbase security is ready
+    //We need the written HFile to be world readable so
+    //hbase regionserver user has the privileges to perform a hdfs move
+    if (parentConf.getBoolean("hadoop.security.authorization", false)) {
+      FsPermission.setUMask(conf, FsPermission.valueOf("----------"));
+    }
+
+    conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+    conf.setBoolean(JobContext.JOB_CANCEL_DELEGATION_TOKEN, false);
+
+    boolean localMode = "local".equals(conf.get("mapred.job.tracker"));
+
+    boolean success = false;
+    try {
+      FileSystem fs = FileSystem.get(parentConf);
+      Path workDir = new Path(new Job(parentConf).getWorkingDirectory(), IMPORTER_WORK_DIR);
+      if (!fs.mkdirs(workDir))
+        throw new IOException("Importer work directory already exists: " + workDir);
+      Job job = createSubmittableJob(conf, tableName, InputDir, scratchDir, localMode);
+      job.setWorkingDirectory(workDir);
+      job.getCredentials().addAll(parentContext.getCredentials());
+      success = job.waitForCompletion(true);
+      fs.delete(workDir, true);
+      //We only cleanup on success because failure might've been caused by existence of target directory
+      if (localMode && success) {
+        new ImporterOutputFormat().getOutputCommitter(HCatMapRedUtil.createTaskAttemptContext(conf, new TaskAttemptID())).commitJob(job);
+      }
+    } catch (InterruptedException e) {
+      LOG.error("ImportSequenceFile Failed", e);
+    } catch (ClassNotFoundException e) {
+      LOG.error("ImportSequenceFile Failed", e);
+    } catch (IOException e) {
+      LOG.error("ImportSequenceFile Failed", e);
     }
+    return success;
+  }
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ResultConverter.java Thu Sep 12 01:21:10 2013
@@ -21,7 +21,7 @@ package org.apache.hcatalog.hbase;
 
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
 
 import java.io.IOException;
 
@@ -32,27 +32,27 @@ import java.io.IOException;
  */
 interface ResultConverter {
 
-    /**
-     * convert HCatRecord instance to an HBase Put, used when writing out data.
-     * @param record instance to convert
-     * @return converted Put instance
-     * @throws IOException
-     */
-    Put convert(HCatRecord record) throws IOException;
-
-    /**
-     * convert HBase Result to HCatRecord instance, used when reading data.
-     * @param result instance to convert
-     * @return converted Result instance
-     * @throws IOException
-     */
-    HCatRecord convert(Result result) throws IOException;
-
-    /**
-     * Returns the hbase columns that are required for the scan.
-     * @return String containing hbase columns delimited by space.
-     * @throws IOException
-     */
-    String getHBaseScanColumns() throws IOException;
+  /**
+   * convert HCatRecord instance to an HBase Put, used when writing out data.
+   * @param record instance to convert
+   * @return converted Put instance
+   * @throws IOException
+   */
+  Put convert(HCatRecord record) throws IOException;
+
+  /**
+   * convert HBase Result to HCatRecord instance, used when reading data.
+   * @param result instance to convert
+   * @return converted Result instance
+   * @throws IOException
+   */
+  HCatRecord convert(Result result) throws IOException;
+
+  /**
+   * Returns the hbase columns that are required for the scan.
+   * @return String containing hbase columns delimited by space.
+   * @throws IOException
+   */
+  String getHBaseScanColumns() throws IOException;
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java Thu Sep 12 01:21:10 2013
@@ -27,45 +27,45 @@ package org.apache.hcatalog.hbase.snapsh
  * committed, the transaction object is removed from the list.
  */
 public class FamilyRevision implements
-        Comparable<FamilyRevision> {
+    Comparable<FamilyRevision> {
 
-    private long revision;
+  private long revision;
 
-    private long timestamp;
+  private long timestamp;
 
-    /**
-     * Create a FamilyRevision object
-     * @param rev revision number
-     * @param ts expiration timestamp
-     */
-    FamilyRevision(long rev, long ts) {
-        this.revision = rev;
-        this.timestamp = ts;
-    }
-
-    public long getRevision() {
-        return revision;
-    }
-
-    public long getExpireTimestamp() {
-        return timestamp;
-    }
-
-    void setExpireTimestamp(long ts) {
-        timestamp = ts;
-    }
-
-    @Override
-    public String toString() {
-        String description = "revision: " + revision + " ts: " + timestamp;
-        return description;
-    }
-
-    @Override
-    public int compareTo(FamilyRevision o) {
-        long d = revision - o.getRevision();
-        return (d < 0) ? -1 : (d > 0) ? 1 : 0;
-    }
+  /**
+   * Create a FamilyRevision object
+   * @param rev revision number
+   * @param ts expiration timestamp
+   */
+  FamilyRevision(long rev, long ts) {
+    this.revision = rev;
+    this.timestamp = ts;
+  }
+
+  public long getRevision() {
+    return revision;
+  }
+
+  public long getExpireTimestamp() {
+    return timestamp;
+  }
+
+  void setExpireTimestamp(long ts) {
+    timestamp = ts;
+  }
+
+  @Override
+  public String toString() {
+    String description = "revision: " + revision + " ts: " + timestamp;
+    return description;
+  }
+
+  @Override
+  public int compareTo(FamilyRevision o) {
+    long d = revision - o.getRevision();
+    return (d < 0) ? -1 : (d > 0) ? 1 : 0;
+  }
 
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/IDGenerator.java Thu Sep 12 01:21:10 2013
@@ -36,110 +36,110 @@ import org.slf4j.LoggerFactory;
  */
 class IDGenerator implements LockListener {
 
-    private ZooKeeper zookeeper;
-    private String zNodeDataLoc;
-    private String zNodeLockBasePath;
-    private long id;
-    private static final Logger LOG = LoggerFactory.getLogger(IDGenerator.class);
-
-    IDGenerator(ZooKeeper zookeeper, String tableName, String idGenNode)
-        throws IOException {
-        this.zookeeper = zookeeper;
-        this.zNodeDataLoc = idGenNode;
-        this.zNodeLockBasePath = PathUtil.getLockManagementNode(idGenNode);
-    }
-
-    /**
-     * This method obtains a revision id for a transaction.
-     *
-     * @return revision ID
-     * @throws IOException
-     */
-    public long obtainID() throws IOException {
-        WriteLock wLock = new WriteLock(zookeeper, zNodeLockBasePath, Ids.OPEN_ACL_UNSAFE);
-        wLock.setLockListener(this);
-        try {
-            boolean lockGrabbed = wLock.lock();
-            if (lockGrabbed == false) {
-                //TO DO : Let this request queue up and try obtaining lock.
-                throw new IOException("Unable to obtain lock to obtain id.");
-            } else {
-                id = incrementAndReadCounter();
-            }
-        } catch (KeeperException e) {
-            LOG.warn("Exception while obtaining lock for ID.", e);
-            throw new IOException("Exception while obtaining lock for ID.", e);
-        } catch (InterruptedException e) {
-            LOG.warn("Exception while obtaining lock for ID.", e);
-            throw new IOException("Exception while obtaining lock for ID.", e);
-        } finally {
-            wLock.unlock();
-        }
-        return id;
-    }
-
-    /**
-     * This method reads the latest revision ID that has been used. The ID
-     * returned by this method cannot be used for transaction.
-     * @return revision ID
-     * @throws IOException
-     */
-    public long readID() throws IOException {
-        long curId;
-        try {
-            Stat stat = new Stat();
-            byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
-            curId = Long.parseLong(new String(data, Charset.forName("UTF-8")));
-        } catch (KeeperException e) {
-            LOG.warn("Exception while reading current revision id.", e);
-            throw new IOException("Exception while reading current revision id.", e);
-        } catch (InterruptedException e) {
-            LOG.warn("Exception while reading current revision id.", e);
-            throw new IOException("Exception while reading current revision id.", e);
-        }
-
-        return curId;
-    }
-
-
-    private long incrementAndReadCounter() throws IOException {
-
-        long curId, usedId;
-        try {
-            Stat stat = new Stat();
-            byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
-            usedId = Long.parseLong((new String(data, Charset.forName("UTF-8"))));
-            curId = usedId + 1;
-            String lastUsedID = String.valueOf(curId);
-            zookeeper.setData(this.zNodeDataLoc, lastUsedID.getBytes(Charset.forName("UTF-8")), -1);
-
-        } catch (KeeperException e) {
-            LOG.warn("Exception while incrementing revision id.", e);
-            throw new IOException("Exception while incrementing revision id. ", e);
-        } catch (InterruptedException e) {
-            LOG.warn("Exception while incrementing revision id.", e);
-            throw new IOException("Exception while incrementing revision id. ", e);
-        }
+  private ZooKeeper zookeeper;
+  private String zNodeDataLoc;
+  private String zNodeLockBasePath;
+  private long id;
+  private static final Logger LOG = LoggerFactory.getLogger(IDGenerator.class);
+
+  IDGenerator(ZooKeeper zookeeper, String tableName, String idGenNode)
+    throws IOException {
+    this.zookeeper = zookeeper;
+    this.zNodeDataLoc = idGenNode;
+    this.zNodeLockBasePath = PathUtil.getLockManagementNode(idGenNode);
+  }
+
+  /**
+   * This method obtains a revision id for a transaction.
+   *
+   * @return revision ID
+   * @throws IOException
+   */
+  public long obtainID() throws IOException {
+    WriteLock wLock = new WriteLock(zookeeper, zNodeLockBasePath, Ids.OPEN_ACL_UNSAFE);
+    wLock.setLockListener(this);
+    try {
+      boolean lockGrabbed = wLock.lock();
+      if (lockGrabbed == false) {
+        //TO DO : Let this request queue up and try obtaining lock.
+        throw new IOException("Unable to obtain lock to obtain id.");
+      } else {
+        id = incrementAndReadCounter();
+      }
+    } catch (KeeperException e) {
+      LOG.warn("Exception while obtaining lock for ID.", e);
+      throw new IOException("Exception while obtaining lock for ID.", e);
+    } catch (InterruptedException e) {
+      LOG.warn("Exception while obtaining lock for ID.", e);
+      throw new IOException("Exception while obtaining lock for ID.", e);
+    } finally {
+      wLock.unlock();
+    }
+    return id;
+  }
+
+  /**
+   * This method reads the latest revision ID that has been used. The ID
+   * returned by this method cannot be used for transaction.
+   * @return revision ID
+   * @throws IOException
+   */
+  public long readID() throws IOException {
+    long curId;
+    try {
+      Stat stat = new Stat();
+      byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
+      curId = Long.parseLong(new String(data, Charset.forName("UTF-8")));
+    } catch (KeeperException e) {
+      LOG.warn("Exception while reading current revision id.", e);
+      throw new IOException("Exception while reading current revision id.", e);
+    } catch (InterruptedException e) {
+      LOG.warn("Exception while reading current revision id.", e);
+      throw new IOException("Exception while reading current revision id.", e);
+    }
+
+    return curId;
+  }
+
+
+  private long incrementAndReadCounter() throws IOException {
+
+    long curId, usedId;
+    try {
+      Stat stat = new Stat();
+      byte[] data = zookeeper.getData(this.zNodeDataLoc, false, stat);
+      usedId = Long.parseLong((new String(data, Charset.forName("UTF-8"))));
+      curId = usedId + 1;
+      String lastUsedID = String.valueOf(curId);
+      zookeeper.setData(this.zNodeDataLoc, lastUsedID.getBytes(Charset.forName("UTF-8")), -1);
+
+    } catch (KeeperException e) {
+      LOG.warn("Exception while incrementing revision id.", e);
+      throw new IOException("Exception while incrementing revision id. ", e);
+    } catch (InterruptedException e) {
+      LOG.warn("Exception while incrementing revision id.", e);
+      throw new IOException("Exception while incrementing revision id. ", e);
+    }
+
+    return curId;
+  }
+
+  /*
+   * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
+   */
+  @Override
+  public void lockAcquired() {
+
+
+  }
+
+  /*
+   * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased()
+   */
+  @Override
+  public void lockReleased() {
 
-        return curId;
-    }
-
-    /*
-     * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockAcquired()
-     */
-    @Override
-    public void lockAcquired() {
-
-
-    }
-
-    /*
-     * @see org.apache.hcatalog.hbase.snapshot.lock.LockListener#lockReleased()
-     */
-    @Override
-    public void lockReleased() {
-
-    }
+  }
 
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/PathUtil.java Thu Sep 12 01:21:10 2013
@@ -36,97 +36,97 @@ package org.apache.hcatalog.hbase.snapsh
  */
 public class PathUtil {
 
-    static final String DATA_DIR = "/data";
-    static final String CLOCK_NODE = "/clock";
+  static final String DATA_DIR = "/data";
+  static final String CLOCK_NODE = "/clock";
 
-    /**
-     * This method returns the data path associated with the currently
-     * running transactions of a given table and column/column family.
-     * @param baseDir
-     * @param tableName
-     * @param columnFamily
-     * @return The path of the running transactions data.
-     */
-    static String getRunningTxnInfoPath(String baseDir, String tableName,
-                                        String columnFamily) {
-        String txnBasePath = getTransactionBasePath(baseDir);
-        String path = txnBasePath + "/" + tableName + "/" + columnFamily
-            + "/runningTxns";
-        return path;
-    }
-
-    /**
-     * This method returns the data path associated with the aborted
-     * transactions of a given table and column/column family.
-     * @param baseDir The base directory for revision management.
-     * @param tableName The name of the table.
-     * @param columnFamily
-     * @return The path of the aborted transactions data.
-     */
-    static String getAbortInformationPath(String baseDir, String tableName,
-                                          String columnFamily) {
-        String txnBasePath = getTransactionBasePath(baseDir);
-        String path = txnBasePath + "/" + tableName + "/" + columnFamily
-            + "/abortData";
-        return path;
-    }
-
-    /**
-     * Gets the revision id node for a given table.
-     *
-     * @param baseDir the base dir for revision management.
-     * @param tableName the table name
-     * @return the revision id node path.
-     */
-    static String getRevisionIDNode(String baseDir, String tableName) {
-        String rmBasePath = getTransactionBasePath(baseDir);
-        String revisionIDNode = rmBasePath + "/" + tableName + "/idgen";
-        return revisionIDNode;
-    }
-
-    /**
-     * Gets the lock management node for any znode that needs to be locked.
-     *
-     * @param path the path of the znode.
-     * @return the lock management node path.
-     */
-    static String getLockManagementNode(String path) {
-        String lockNode = path + "_locknode_";
-        return lockNode;
-    }
-
-    /**
-     * This method returns the base path for the transaction data.
-     *
-     * @param baseDir The base dir for revision management.
-     * @return The base path for the transaction data.
-     */
-    static String getTransactionBasePath(String baseDir) {
-        String txnBaseNode = baseDir + DATA_DIR;
-        return txnBaseNode;
-    }
-
-    /**
-     * Gets the txn data path for a given table.
-     *
-     * @param baseDir the base dir for revision management.
-     * @param tableName the table name
-     * @return the txn data path for the table.
-     */
-    static String getTxnDataPath(String baseDir, String tableName) {
-        String txnBasePath = getTransactionBasePath(baseDir);
-        String path = txnBasePath + "/" + tableName;
-        return path;
-    }
-
-    /**
-     * This method returns the data path for clock node.
-     *
-     * @param baseDir
-     * @return The data path for clock.
-     */
-    static String getClockPath(String baseDir) {
-        String clockNode = baseDir + CLOCK_NODE;
-        return clockNode;
-    }
+  /**
+   * This method returns the data path associated with the currently
+   * running transactions of a given table and column/column family.
+   * @param baseDir
+   * @param tableName
+   * @param columnFamily
+   * @return The path of the running transactions data.
+   */
+  static String getRunningTxnInfoPath(String baseDir, String tableName,
+                    String columnFamily) {
+    String txnBasePath = getTransactionBasePath(baseDir);
+    String path = txnBasePath + "/" + tableName + "/" + columnFamily
+      + "/runningTxns";
+    return path;
+  }
+
+  /**
+   * This method returns the data path associated with the aborted
+   * transactions of a given table and column/column family.
+   * @param baseDir The base directory for revision management.
+   * @param tableName The name of the table.
+   * @param columnFamily
+   * @return The path of the aborted transactions data.
+   */
+  static String getAbortInformationPath(String baseDir, String tableName,
+                      String columnFamily) {
+    String txnBasePath = getTransactionBasePath(baseDir);
+    String path = txnBasePath + "/" + tableName + "/" + columnFamily
+      + "/abortData";
+    return path;
+  }
+
+  /**
+   * Gets the revision id node for a given table.
+   *
+   * @param baseDir the base dir for revision management.
+   * @param tableName the table name
+   * @return the revision id node path.
+   */
+  static String getRevisionIDNode(String baseDir, String tableName) {
+    String rmBasePath = getTransactionBasePath(baseDir);
+    String revisionIDNode = rmBasePath + "/" + tableName + "/idgen";
+    return revisionIDNode;
+  }
+
+  /**
+   * Gets the lock management node for any znode that needs to be locked.
+   *
+   * @param path the path of the znode.
+   * @return the lock management node path.
+   */
+  static String getLockManagementNode(String path) {
+    String lockNode = path + "_locknode_";
+    return lockNode;
+  }
+
+  /**
+   * This method returns the base path for the transaction data.
+   *
+   * @param baseDir The base dir for revision management.
+   * @return The base path for the transaction data.
+   */
+  static String getTransactionBasePath(String baseDir) {
+    String txnBaseNode = baseDir + DATA_DIR;
+    return txnBaseNode;
+  }
+
+  /**
+   * Gets the txn data path for a given table.
+   *
+   * @param baseDir the base dir for revision management.
+   * @param tableName the table name
+   * @return the txn data path for the table.
+   */
+  static String getTxnDataPath(String baseDir, String tableName) {
+    String txnBasePath = getTransactionBasePath(baseDir);
+    String path = txnBasePath + "/" + tableName;
+    return path;
+  }
+
+  /**
+   * This method returns the data path for clock node.
+   *
+   * @param baseDir
+   * @return The data path for clock.
+   */
+  static String getClockPath(String baseDir) {
+    String clockNode = baseDir + CLOCK_NODE;
+    return clockNode;
+  }
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RMConstants.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RMConstants.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RMConstants.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RMConstants.java Thu Sep 12 01:21:10 2013
@@ -20,11 +20,11 @@
 package org.apache.hcatalog.hbase.snapshot;
 
 public class RMConstants {
-    public static final String REVISION_MGR_ENDPOINT_IMPL_CLASS = "revision.manager.endpoint.impl.class";
+  public static final String REVISION_MGR_ENDPOINT_IMPL_CLASS = "revision.manager.endpoint.impl.class";
 
-    public static final String WRITE_TRANSACTION_TIMEOUT = "revision.manager.writeTxn.timeout";
+  public static final String WRITE_TRANSACTION_TIMEOUT = "revision.manager.writeTxn.timeout";
 
-    public static final String ZOOKEEPER_HOSTLIST = "revision.manager.zk.hostList";
+  public static final String ZOOKEEPER_HOSTLIST = "revision.manager.zk.hostList";
 
-    public static final String ZOOKEEPER_DATADIR = "revision.manager.zk.dataDir";
+  public static final String ZOOKEEPER_DATADIR = "revision.manager.zk.dataDir";
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Thu Sep 12 01:21:10 2013
@@ -27,122 +27,122 @@ import java.util.List;
  * This interface provides APIs for implementing revision management.
  */
 public interface RevisionManager {
-    /**
-     * Version property required by HBase to use this interface
-     * for CoprocessorProtocol / RPC.
-     */
-    public static final long VERSION = 1L; // do not change
-
-    /**
-     * Initialize the revision manager.
-     */
-    public void initialize(Configuration conf);
-
-    /**
-     * Opens the revision manager.
-     *
-     * @throws IOException
-     */
-    public void open() throws IOException;
-
-    /**
-     * Closes the revision manager.
-     *
-     * @throws IOException
-     */
-    public void close() throws IOException;
-
-    /**
-     * Setup revision management for a newly created hbase table.
-     * @param table the hbase table name
-     * @param columnFamilies the column families in the table
-     */
-    public void createTable(String table, List<String> columnFamilies) throws IOException;
-
-    /**
-     * Remove table data from revision manager for a dropped table.
-     * @param table the hbase table name
-     */
-    public void dropTable(String table) throws IOException;
-
-    /**
-     * Start the write transaction.
-     *
-     * @param table
-     * @param families
-     * @return a new Transaction
-     * @throws IOException
-     */
-    public Transaction beginWriteTransaction(String table, List<String> families)
-        throws IOException;
-
-    /**
-     * Start the write transaction.
-     *
-     * @param table
-     * @param families
-     * @param keepAlive
-     * @return a new Transaction
-     * @throws IOException
-     */
-    public Transaction beginWriteTransaction(String table,
-                                             List<String> families, long keepAlive) throws IOException;
-
-    /**
-     * Commit the write transaction.
-     *
-     * @param transaction
-     * @throws IOException
-     */
-    public void commitWriteTransaction(Transaction transaction)
-        throws IOException;
-
-    /**
-     * Abort the write transaction.
-     *
-     * @param transaction
-     * @throws IOException
-     */
-    public void abortWriteTransaction(Transaction transaction)
-        throws IOException;
-
-    /**
-     * Get the list of aborted Transactions for a column family
-     *
-     * @param table the table name
-     * @param columnFamily the column family name
-     * @return a list of aborted WriteTransactions
-     * @throws java.io.IOException
-     */
-    public List<FamilyRevision> getAbortedWriteTransactions(String table,
-                                                            String columnFamily) throws IOException;
-
-    /**
-     * Create the latest snapshot of the table.
-     *
-     * @param tableName
-     * @return a new snapshot
-     * @throws IOException
-     */
-    public TableSnapshot createSnapshot(String tableName) throws IOException;
-
-    /**
-     * Create the snapshot of the table using the revision number.
-     *
-     * @param tableName
-     * @param revision
-     * @return a new snapshot
-     * @throws IOException
-     */
-    public TableSnapshot createSnapshot(String tableName, long revision)
-        throws IOException;
-
-    /**
-     * Extends the expiration of a transaction by the time indicated by keep alive.
-     *
-     * @param transaction
-     * @throws IOException
-     */
-    public void keepAlive(Transaction transaction) throws IOException;
+  /**
+   * Version property required by HBase to use this interface
+   * for CoprocessorProtocol / RPC.
+   */
+  public static final long VERSION = 1L; // do not change
+
+  /**
+   * Initialize the revision manager.
+   */
+  public void initialize(Configuration conf);
+
+  /**
+   * Opens the revision manager.
+   *
+   * @throws IOException
+   */
+  public void open() throws IOException;
+
+  /**
+   * Closes the revision manager.
+   *
+   * @throws IOException
+   */
+  public void close() throws IOException;
+
+  /**
+   * Setup revision management for a newly created hbase table.
+   * @param table the hbase table name
+   * @param columnFamilies the column families in the table
+   */
+  public void createTable(String table, List<String> columnFamilies) throws IOException;
+
+  /**
+   * Remove table data from revision manager for a dropped table.
+   * @param table the hbase table name
+   */
+  public void dropTable(String table) throws IOException;
+
+  /**
+   * Start the write transaction.
+   *
+   * @param table
+   * @param families
+   * @return a new Transaction
+   * @throws IOException
+   */
+  public Transaction beginWriteTransaction(String table, List<String> families)
+    throws IOException;
+
+  /**
+   * Start the write transaction.
+   *
+   * @param table
+   * @param families
+   * @param keepAlive
+   * @return a new Transaction
+   * @throws IOException
+   */
+  public Transaction beginWriteTransaction(String table,
+                       List<String> families, long keepAlive) throws IOException;
+
+  /**
+   * Commit the write transaction.
+   *
+   * @param transaction
+   * @throws IOException
+   */
+  public void commitWriteTransaction(Transaction transaction)
+    throws IOException;
+
+  /**
+   * Abort the write transaction.
+   *
+   * @param transaction
+   * @throws IOException
+   */
+  public void abortWriteTransaction(Transaction transaction)
+    throws IOException;
+
+  /**
+   * Get the list of aborted Transactions for a column family
+   *
+   * @param table the table name
+   * @param columnFamily the column family name
+   * @return a list of aborted WriteTransactions
+   * @throws java.io.IOException
+   */
+  public List<FamilyRevision> getAbortedWriteTransactions(String table,
+                              String columnFamily) throws IOException;
+
+  /**
+   * Create the latest snapshot of the table.
+   *
+   * @param tableName
+   * @return a new snapshot
+   * @throws IOException
+   */
+  public TableSnapshot createSnapshot(String tableName) throws IOException;
+
+  /**
+   * Create the snapshot of the table using the revision number.
+   *
+   * @param tableName
+   * @param revision
+   * @return a new snapshot
+   * @throws IOException
+   */
+  public TableSnapshot createSnapshot(String tableName, long revision)
+    throws IOException;
+
+  /**
+   * Extends the expiration of a transaction by the time indicated by keep alive.
+   *
+   * @param transaction
+   * @throws IOException
+   */
+  public void keepAlive(Transaction transaction) throws IOException;
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerConfiguration.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerConfiguration.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerConfiguration.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerConfiguration.java Thu Sep 12 01:21:10 2013
@@ -25,35 +25,35 @@ import org.apache.hadoop.hbase.HBaseConf
 public class RevisionManagerConfiguration {
 
 
-    public static Configuration addResources(Configuration conf) {
-        conf.addDefaultResource("revision-manager-default.xml");
-        conf.addResource("revision-manager-site.xml");
-        return conf;
-    }
+  public static Configuration addResources(Configuration conf) {
+    conf.addDefaultResource("revision-manager-default.xml");
+    conf.addResource("revision-manager-site.xml");
+    return conf;
+  }
 
-    /**
-     * Creates a Configuration with Revision Manager resources
-     * @return a Configuration with Revision Manager resources
-     */
-    public static Configuration create() {
-        Configuration conf = new Configuration();
-        return addResources(conf);
-    }
+  /**
+   * Creates a Configuration with Revision Manager resources
+   * @return a Configuration with Revision Manager resources
+   */
+  public static Configuration create() {
+    Configuration conf = new Configuration();
+    return addResources(conf);
+  }
 
-    /**
-     * Creates a clone of passed configuration.
-     * @param that Configuration to clone.
-     * @return a Configuration created with the revision-manager-*.xml files plus
-     * the given configuration.
-     */
-    public static Configuration create(final Configuration that) {
-        Configuration conf = create();
-        //we need to merge things instead of doing new Configuration(that)
-        //because of a bug in Configuration wherein the config
-        //set on the MR fronted will get loaded on the backend as resouce called job.xml
-        //hence adding resources on the backed could potentially overwrite properties
-        //set on the frontend which we shouldn't be doing here
-        HBaseConfiguration.merge(conf, that);
-        return conf;
-    }
+  /**
+   * Creates a clone of passed configuration.
+   * @param that Configuration to clone.
+   * @return a Configuration created with the revision-manager-*.xml files plus
+   * the given configuration.
+   */
+  public static Configuration create(final Configuration that) {
+    Configuration conf = create();
+    //we need to merge things instead of doing new Configuration(that)
+    //because of a bug in Configuration wherein the config
+    //set on the MR fronted will get loaded on the backend as resouce called job.xml
+    //hence adding resources on the backed could potentially overwrite properties
+    //set on the frontend which we shouldn't be doing here
+    HBaseConfiguration.merge(conf, that);
+    return conf;
+  }
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java Thu Sep 12 01:21:10 2013
@@ -36,106 +36,106 @@ import org.slf4j.LoggerFactory;
  */
 public class RevisionManagerEndpoint extends BaseEndpointCoprocessor implements RevisionManagerProtocol {
 
-    private static final Logger LOGGER =
-        LoggerFactory.getLogger(RevisionManagerEndpoint.class.getName());
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(RevisionManagerEndpoint.class.getName());
 
-    private RevisionManager rmImpl = null;
+  private RevisionManager rmImpl = null;
 
-    @Override
-    public void start(CoprocessorEnvironment env) {
-        super.start(env);
-        try {
-            Configuration conf = RevisionManagerConfiguration.create(env.getConfiguration());
-            String className = conf.get(RMConstants.REVISION_MGR_ENDPOINT_IMPL_CLASS,
-                ZKBasedRevisionManager.class.getName());
-            LOGGER.debug("Using Revision Manager implementation: {}", className);
-            rmImpl = RevisionManagerFactory.getOpenedRevisionManager(className, conf);
-        } catch (IOException e) {
-            LOGGER.error("Failed to initialize revision manager", e);
-        }
-    }
-
-    @Override
-    public void stop(CoprocessorEnvironment env) {
-        if (rmImpl != null) {
-            try {
-                rmImpl.close();
-            } catch (IOException e) {
-                LOGGER.warn("Error closing revision manager.", e);
-            }
-        }
-        super.stop(env);
-    }
-
-    @Override
-    public void initialize(Configuration conf) {
-        // do nothing, HBase controls life cycle
-    }
-
-    @Override
-    public void open() throws IOException {
-        // do nothing, HBase controls life cycle
-    }
-
-    @Override
-    public void close() throws IOException {
-        // do nothing, HBase controls life cycle
-    }
-
-    @Override
-    public void createTable(String table, List<String> columnFamilies) throws IOException {
-        rmImpl.createTable(table, columnFamilies);
-    }
-
-    @Override
-    public void dropTable(String table) throws IOException {
-        rmImpl.dropTable(table);
-    }
-
-    @Override
-    public Transaction beginWriteTransaction(String table, List<String> families)
-        throws IOException {
-        return rmImpl.beginWriteTransaction(table, families);
-    }
-
-    @Override
-    public Transaction beginWriteTransaction(String table,
-                                             List<String> families, long keepAlive) throws IOException {
-        return rmImpl.beginWriteTransaction(table, families, keepAlive);
-    }
-
-    @Override
-    public void commitWriteTransaction(Transaction transaction)
-        throws IOException {
-        rmImpl.commitWriteTransaction(transaction);
-    }
-
-    @Override
-    public void abortWriteTransaction(Transaction transaction)
-        throws IOException {
-        rmImpl.abortWriteTransaction(transaction);
-    }
-
-    @Override
-    public TableSnapshot createSnapshot(String tableName) throws IOException {
-        return rmImpl.createSnapshot(tableName);
-    }
-
-    @Override
-    public TableSnapshot createSnapshot(String tableName, long revision)
-        throws IOException {
-        return rmImpl.createSnapshot(tableName, revision);
-    }
-
-    @Override
-    public void keepAlive(Transaction transaction) throws IOException {
-        rmImpl.keepAlive(transaction);
-    }
-
-    @Override
-    public List<FamilyRevision> getAbortedWriteTransactions(String table,
-                                                            String columnFamily) throws IOException {
-        return rmImpl.getAbortedWriteTransactions(table, columnFamily);
-    }
+  @Override
+  public void start(CoprocessorEnvironment env) {
+    super.start(env);
+    try {
+      Configuration conf = RevisionManagerConfiguration.create(env.getConfiguration());
+      String className = conf.get(RMConstants.REVISION_MGR_ENDPOINT_IMPL_CLASS,
+        ZKBasedRevisionManager.class.getName());
+      LOGGER.debug("Using Revision Manager implementation: {}", className);
+      rmImpl = RevisionManagerFactory.getOpenedRevisionManager(className, conf);
+    } catch (IOException e) {
+      LOGGER.error("Failed to initialize revision manager", e);
+    }
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) {
+    if (rmImpl != null) {
+      try {
+        rmImpl.close();
+      } catch (IOException e) {
+        LOGGER.warn("Error closing revision manager.", e);
+      }
+    }
+    super.stop(env);
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    // do nothing, HBase controls life cycle
+  }
+
+  @Override
+  public void open() throws IOException {
+    // do nothing, HBase controls life cycle
+  }
+
+  @Override
+  public void close() throws IOException {
+    // do nothing, HBase controls life cycle
+  }
+
+  @Override
+  public void createTable(String table, List<String> columnFamilies) throws IOException {
+    rmImpl.createTable(table, columnFamilies);
+  }
+
+  @Override
+  public void dropTable(String table) throws IOException {
+    rmImpl.dropTable(table);
+  }
+
+  @Override
+  public Transaction beginWriteTransaction(String table, List<String> families)
+    throws IOException {
+    return rmImpl.beginWriteTransaction(table, families);
+  }
+
+  @Override
+  public Transaction beginWriteTransaction(String table,
+                       List<String> families, long keepAlive) throws IOException {
+    return rmImpl.beginWriteTransaction(table, families, keepAlive);
+  }
+
+  @Override
+  public void commitWriteTransaction(Transaction transaction)
+    throws IOException {
+    rmImpl.commitWriteTransaction(transaction);
+  }
+
+  @Override
+  public void abortWriteTransaction(Transaction transaction)
+    throws IOException {
+    rmImpl.abortWriteTransaction(transaction);
+  }
+
+  @Override
+  public TableSnapshot createSnapshot(String tableName) throws IOException {
+    return rmImpl.createSnapshot(tableName);
+  }
+
+  @Override
+  public TableSnapshot createSnapshot(String tableName, long revision)
+    throws IOException {
+    return rmImpl.createSnapshot(tableName, revision);
+  }
+
+  @Override
+  public void keepAlive(Transaction transaction) throws IOException {
+    rmImpl.keepAlive(transaction);
+  }
+
+  @Override
+  public List<FamilyRevision> getAbortedWriteTransactions(String table,
+                              String columnFamily) throws IOException {
+    return rmImpl.getAbortedWriteTransactions(table, columnFamily);
+  }
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java Thu Sep 12 01:21:10 2013
@@ -34,92 +34,92 @@ import org.apache.hadoop.hbase.util.Byte
  */
 public class RevisionManagerEndpointClient implements RevisionManager, Configurable {
 
-    private Configuration conf = null;
-    private RevisionManager rmProxy;
+  private Configuration conf = null;
+  private RevisionManager rmProxy;
 
-    @Override
-    public Configuration getConf() {
-        return this.conf;
-    }
-
-    @Override
-    public void setConf(Configuration arg0) {
-        this.conf = arg0;
-    }
-
-    @Override
-    public void initialize(Configuration conf) {
-        // do nothing
-    }
-
-    @Override
-    public void open() throws IOException {
-        // clone to adjust RPC settings unique to proxy
-        Configuration clonedConf = new Configuration(conf);
-        // conf.set("hbase.ipc.client.connect.max.retries", "0");
-        // conf.setInt(HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, 1);
-        clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // do not retry RPC
-        HTable table = new HTable(clonedConf, HConstants.ROOT_TABLE_NAME);
-        rmProxy = table.coprocessorProxy(RevisionManagerProtocol.class,
-            Bytes.toBytes("anyRow"));
-        rmProxy.open();
-    }
-
-    @Override
-    public void close() throws IOException {
-        rmProxy.close();
-    }
-
-    @Override
-    public void createTable(String table, List<String> columnFamilies) throws IOException {
-        rmProxy.createTable(table, columnFamilies);
-    }
-
-    @Override
-    public void dropTable(String table) throws IOException {
-        rmProxy.dropTable(table);
-    }
-
-    @Override
-    public Transaction beginWriteTransaction(String table, List<String> families) throws IOException {
-        return rmProxy.beginWriteTransaction(table, families);
-    }
-
-    @Override
-    public Transaction beginWriteTransaction(String table, List<String> families, long keepAlive)
-        throws IOException {
-        return rmProxy.beginWriteTransaction(table, families, keepAlive);
-    }
-
-    @Override
-    public void commitWriteTransaction(Transaction transaction) throws IOException {
-        rmProxy.commitWriteTransaction(transaction);
-    }
-
-    @Override
-    public void abortWriteTransaction(Transaction transaction) throws IOException {
-        rmProxy.abortWriteTransaction(transaction);
-    }
-
-    @Override
-    public List<FamilyRevision> getAbortedWriteTransactions(String table, String columnFamily)
-        throws IOException {
-        return rmProxy.getAbortedWriteTransactions(table, columnFamily);
-    }
-
-    @Override
-    public TableSnapshot createSnapshot(String tableName) throws IOException {
-        return rmProxy.createSnapshot(tableName);
-    }
-
-    @Override
-    public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
-        return rmProxy.createSnapshot(tableName, revision);
-    }
-
-    @Override
-    public void keepAlive(Transaction transaction) throws IOException {
-        rmProxy.keepAlive(transaction);
-    }
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void setConf(Configuration arg0) {
+    this.conf = arg0;
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    // do nothing
+  }
+
+  @Override
+  public void open() throws IOException {
+    // clone to adjust RPC settings unique to proxy
+    Configuration clonedConf = new Configuration(conf);
+    // conf.set("hbase.ipc.client.connect.max.retries", "0");
+    // conf.setInt(HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, 1);
+    clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // do not retry RPC
+    HTable table = new HTable(clonedConf, HConstants.ROOT_TABLE_NAME);
+    rmProxy = table.coprocessorProxy(RevisionManagerProtocol.class,
+      Bytes.toBytes("anyRow"));
+    rmProxy.open();
+  }
+
+  @Override
+  public void close() throws IOException {
+    rmProxy.close();
+  }
+
+  @Override
+  public void createTable(String table, List<String> columnFamilies) throws IOException {
+    rmProxy.createTable(table, columnFamilies);
+  }
+
+  @Override
+  public void dropTable(String table) throws IOException {
+    rmProxy.dropTable(table);
+  }
+
+  @Override
+  public Transaction beginWriteTransaction(String table, List<String> families) throws IOException {
+    return rmProxy.beginWriteTransaction(table, families);
+  }
+
+  @Override
+  public Transaction beginWriteTransaction(String table, List<String> families, long keepAlive)
+    throws IOException {
+    return rmProxy.beginWriteTransaction(table, families, keepAlive);
+  }
+
+  @Override
+  public void commitWriteTransaction(Transaction transaction) throws IOException {
+    rmProxy.commitWriteTransaction(transaction);
+  }
+
+  @Override
+  public void abortWriteTransaction(Transaction transaction) throws IOException {
+    rmProxy.abortWriteTransaction(transaction);
+  }
+
+  @Override
+  public List<FamilyRevision> getAbortedWriteTransactions(String table, String columnFamily)
+    throws IOException {
+    return rmProxy.getAbortedWriteTransactions(table, columnFamily);
+  }
+
+  @Override
+  public TableSnapshot createSnapshot(String tableName) throws IOException {
+    return rmProxy.createSnapshot(tableName);
+  }
+
+  @Override
+  public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
+    return rmProxy.createSnapshot(tableName, revision);
+  }
+
+  @Override
+  public void keepAlive(Transaction transaction) throws IOException {
+    rmProxy.keepAlive(transaction);
+  }
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java Thu Sep 12 01:21:10 2013
@@ -29,77 +29,77 @@ import org.apache.hadoop.conf.Configurat
  */
 public class RevisionManagerFactory {
 
-    public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class";
+  public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class";
 
-    /**
-     * Gets an instance of revision manager.
-     *
-     * @param conf The configuration required to created the revision manager.
-     * @return the revision manager An instance of revision manager.
-     * @throws IOException Signals that an I/O exception has occurred.
-     */
-    private static RevisionManager getRevisionManager(String className, Configuration conf) throws IOException {
-
-        RevisionManager revisionMgr;
-        ClassLoader classLoader = Thread.currentThread()
-            .getContextClassLoader();
-        if (classLoader == null) {
-            classLoader = RevisionManagerFactory.class.getClassLoader();
-        }
-        try {
-            Class<? extends RevisionManager> revisionMgrClass = Class
-                .forName(className, true, classLoader).asSubclass(RevisionManager.class);
-            revisionMgr = (RevisionManager) revisionMgrClass.newInstance();
-            revisionMgr.initialize(conf);
-        } catch (ClassNotFoundException e) {
-            throw new IOException(
-                "The implementation class of revision manager not found.",
-                e);
-        } catch (InstantiationException e) {
-            throw new IOException(
-                "Exception encountered during instantiating revision manager implementation.",
-                e);
-        } catch (IllegalAccessException e) {
-            throw new IOException(
-                "IllegalAccessException encountered during instantiating revision manager implementation.",
-                e);
-        } catch (IllegalArgumentException e) {
-            throw new IOException(
-                "IllegalArgumentException encountered during instantiating revision manager implementation.",
-                e);
-        }
-        return revisionMgr;
+  /**
+   * Gets an instance of revision manager.
+   *
+   * @param conf The configuration required to created the revision manager.
+   * @return the revision manager An instance of revision manager.
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  private static RevisionManager getRevisionManager(String className, Configuration conf) throws IOException {
+
+    RevisionManager revisionMgr;
+    ClassLoader classLoader = Thread.currentThread()
+      .getContextClassLoader();
+    if (classLoader == null) {
+      classLoader = RevisionManagerFactory.class.getClassLoader();
     }
-
-    /**
-     * Internally used by endpoint implementation to instantiate from different configuration setting.
-     * @param className
-     * @param conf
-     * @return the opened revision manager
-     * @throws IOException
-     */
-    static RevisionManager getOpenedRevisionManager(String className, Configuration conf) throws IOException {
-
-        RevisionManager revisionMgr = RevisionManagerFactory.getRevisionManager(className, conf);
-        if (revisionMgr instanceof Configurable) {
-            ((Configurable) revisionMgr).setConf(conf);
-        }
-        revisionMgr.open();
-        return revisionMgr;
+    try {
+      Class<? extends RevisionManager> revisionMgrClass = Class
+        .forName(className, true, classLoader).asSubclass(RevisionManager.class);
+      revisionMgr = (RevisionManager) revisionMgrClass.newInstance();
+      revisionMgr.initialize(conf);
+    } catch (ClassNotFoundException e) {
+      throw new IOException(
+        "The implementation class of revision manager not found.",
+        e);
+    } catch (InstantiationException e) {
+      throw new IOException(
+        "Exception encountered during instantiating revision manager implementation.",
+        e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(
+        "IllegalAccessException encountered during instantiating revision manager implementation.",
+        e);
+    } catch (IllegalArgumentException e) {
+      throw new IOException(
+        "IllegalArgumentException encountered during instantiating revision manager implementation.",
+        e);
     }
+    return revisionMgr;
+  }
 
-    /**
-     * Gets an instance of revision manager which is opened.
-     * The revision manager implementation can be specified as {@link #REVISION_MGR_IMPL_CLASS},
-     * default is {@link ZKBasedRevisionManager}.
-     * @param conf revision manager configuration
-     * @return RevisionManager An instance of revision manager.
-     * @throws IOException
-     */
-    public static RevisionManager getOpenedRevisionManager(Configuration conf) throws IOException {
-        String className = conf.get(RevisionManagerFactory.REVISION_MGR_IMPL_CLASS,
-            ZKBasedRevisionManager.class.getName());
-        return getOpenedRevisionManager(className, conf);
+  /**
+   * Internally used by endpoint implementation to instantiate from different configuration setting.
+   * @param className
+   * @param conf
+   * @return the opened revision manager
+   * @throws IOException
+   */
+  static RevisionManager getOpenedRevisionManager(String className, Configuration conf) throws IOException {
+
+    RevisionManager revisionMgr = RevisionManagerFactory.getRevisionManager(className, conf);
+    if (revisionMgr instanceof Configurable) {
+      ((Configurable) revisionMgr).setConf(conf);
     }
+    revisionMgr.open();
+    return revisionMgr;
+  }
+
+  /**
+   * Gets an instance of revision manager which is opened.
+   * The revision manager implementation can be specified as {@link #REVISION_MGR_IMPL_CLASS},
+   * default is {@link ZKBasedRevisionManager}.
+   * @param conf revision manager configuration
+   * @return RevisionManager An instance of revision manager.
+   * @throws IOException
+   */
+  public static RevisionManager getOpenedRevisionManager(Configuration conf) throws IOException {
+    String className = conf.get(RevisionManagerFactory.REVISION_MGR_IMPL_CLASS,
+      ZKBasedRevisionManager.class.getName());
+    return getOpenedRevisionManager(className, conf);
+  }
 
 }

Modified: hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java (original)
+++ hive/branches/vectorization/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java Thu Sep 12 01:21:10 2013
@@ -25,6 +25,6 @@ import org.apache.hadoop.hbase.ipc.Copro
  * (needs to extend CoprocessorProtocol)
  */
 public interface RevisionManagerProtocol extends RevisionManager,
-    CoprocessorProtocol {
+  CoprocessorProtocol {
 
 }



Mime
View raw message