phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject phoenix git commit: PHOENIX-3812 Use HBase snapshots in async index building M/R job (Akshita Malhotra) [Forced Update!]
Date Sat, 10 Jun 2017 00:01:37 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master 892be1398 -> 59f1a2839 (forced update)


PHOENIX-3812 Use HBase snapshots in async index building M/R job (Akshita Malhotra)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/59f1a283
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/59f1a283
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/59f1a283

Branch: refs/heads/master
Commit: 59f1a2839480cdc77ffb0c6742986ffff3b91bba
Parents: cd49ba7
Author: James Taylor <jamestaylor@apache.org>
Authored: Fri Jun 9 16:53:57 2017 -0700
Committer: James Taylor <jamestaylor@apache.org>
Committed: Fri Jun 9 17:01:30 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/IndexExtendedIT.java | 23 +++++++---
 .../phoenix/mapreduce/PhoenixInputFormat.java   |  7 +++
 .../phoenix/mapreduce/index/IndexTool.java      | 45 ++++++++++++++++----
 3 files changed, 60 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/59f1a283/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java
index b79e557..53bf625 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java
@@ -74,17 +74,19 @@ public class IndexExtendedIT extends BaseTest {
     private final boolean directApi;
     private final String tableDDLOptions;
     private final boolean mutable;
+    private final boolean useSnapshot;
     
     @AfterClass
     public static void doTeardown() throws Exception {
         tearDownMiniCluster();
     }
 
-    public IndexExtendedIT(boolean transactional, boolean mutable, boolean localIndex, boolean
directApi) {
+    public IndexExtendedIT(boolean transactional, boolean mutable, boolean localIndex, boolean
directApi, boolean useSnapshot) {
         this.localIndex = localIndex;
         this.transactional = transactional;
         this.directApi = directApi;
         this.mutable = mutable;
+        this.useSnapshot = useSnapshot;
         StringBuilder optionBuilder = new StringBuilder();
         if (!mutable) {
             optionBuilder.append(" IMMUTABLE_ROWS=true ");
@@ -110,13 +112,16 @@ public class IndexExtendedIT extends BaseTest {
                 .iterator()));
     }
     
-    @Parameters(name="transactional = {0} , mutable = {1} , localIndex = {2}, directApi =
{3}")
+    @Parameters(name="transactional = {0} , mutable = {1} , localIndex = {2}, directApi =
{3}, useSnapshot = {4}")
     public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {     
-                 { false, false, false, false }, { false, false, false, true }, { false,
false, true, false }, { false, false, true, true }, 
-                 { false, true, false, false }, { false, true, false, true }, { false, true,
true, false }, { false, true, true, true }, 
-                 { true, false, false, false }, { true, false, false, true }, { true, false,
true, false }, { true, false, true, true }, 
-                 { true, true, false, false }, { true, true, false, true }, { true, true,
true, false }, { true, true, true, true } 
+        return Arrays.asList(new Boolean[][] {
+                 { false, false, false, false, false }, { false, false, false, true, false
}, { false, false, true, false, false }, { false, false, true, true, false },
+                 { false, true, false, false, false }, { false, true, false, true, false
}, { false, true, true, false, false }, { false, true, true, true, false },
+                 { true, false, false, false, false }, { true, false, false, true, false
}, { true, false, true, false, false }, { true, false, true, true, false },
+                 { true, true, false, false, false }, { true, true, false, true, false },
{ true, true, true, false, false }, { true, true, true, true, false },
+                 { false, true, false, false, true }, { false, true, false, true, true },
{ false, true, true, false, true }, { false, true, true, true, true },
+                 { true, false, false, false, true }, { true, false, false, true, true },
{ true, false, true, false, true }, { true, false, true, true, true },
+                 { true, true, false, false, true }, { true, true, false, true, true }, {
true, true, true, false, true }, { true, true, true, true, true }
            });
     }
     
@@ -307,6 +312,10 @@ public class IndexExtendedIT extends BaseTest {
             args.add("-runfg");
         }
 
+        if(useSnapshot) {
+            args.add("-snap");
+        }
+
         args.add("-op");
         args.add("/tmp/"+UUID.randomUUID().toString());
         return args.toArray(new String[0]);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/59f1a283/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 14f7b94..25729d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -184,6 +184,13 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
             if (txnScnValue!=null) {
                 scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
             }
+
+            // setting the snapshot configuration
+            String snapshotName = configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+            if (snapshotName != null)
+                PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
+                    getQueryServices().getConfiguration(), snapshotName);
+
             // Initialize the query plan so it sets up the parallel scans
             queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
             return queryPlan;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/59f1a283/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index da216ed..671e4cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
@@ -118,6 +120,8 @@ public class IndexTool extends Configured implements Tool {
                             + "If specified, runs index build in Foreground. Default - Runs
the build in background.");
     private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true,
             "Output path where the files are written");
+    private static final Option SNAPSHOT_OPTION = new Option("snap", "snapshot", false,
+        "If specified, uses Snapshots for async index building (optional)");
     private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
     public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";
 
@@ -130,6 +134,7 @@ public class IndexTool extends Configured implements Tool {
         options.addOption(DIRECT_API_OPTION);
         options.addOption(RUN_FOREGROUND_OPTION);
         options.addOption(OUTPUT_PATH_OPTION);
+        options.addOption(SNAPSHOT_OPTION);
         options.addOption(HELP_OPTION);
         return options;
     }
@@ -203,11 +208,12 @@ public class IndexTool extends Configured implements Tool {
 
         }
 
-        public Job getJob(String schemaName, String indexTable, String dataTable, boolean
useDirectApi, boolean isPartialBuild) throws Exception {
+        public Job getJob(String schemaName, String indexTable, String dataTable, boolean
useDirectApi, boolean isPartialBuild,
+            boolean useSnapshot) throws Exception {
             if (isPartialBuild) {
                 return configureJobForPartialBuild(schemaName, dataTable);
             } else {
-                return configureJobForAysncIndex(schemaName, indexTable, dataTable, useDirectApi);
+                return configureJobForAysncIndex(schemaName, indexTable, dataTable, useDirectApi,
useSnapshot);
             }
         }
         
@@ -320,7 +326,7 @@ public class IndexTool extends Configured implements Tool {
             
         }
 
-        private Job configureJobForAysncIndex(String schemaName, String indexTable, String
dataTable, boolean useDirectApi)
+        private Job configureJobForAysncIndex(String schemaName, String indexTable, String
dataTable, boolean useDirectApi, boolean useSnapshot)
                 throws Exception {
             final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
             final String qIndexTable;
@@ -374,12 +380,34 @@ public class IndexTool extends Configured implements Tool {
             job.setJarByClass(IndexTool.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);
             FileOutputFormat.setOutputPath(job, outputPath);
-            
-            PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable,
-                selectQuery);
+
+            if (!useSnapshot) {
+                PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable,
+                    selectQuery);
+            } else {
+                HBaseAdmin admin = null;
+                String snapshotName;
+                try {
+                    admin = pConnection.getQueryServices().getAdmin();
+                    String pdataTableName = pdataTable.getName().getString();
+                    snapshotName = new StringBuilder(pdataTableName).append("-Snapshot").toString();
+                    admin.snapshot(snapshotName, TableName.valueOf(pdataTableName));
+                } finally {
+                    if (admin != null) {
+                        admin.close();
+                    }
+                }
+                // root dir not a subdirectory of hbase dir
+                Path rootDir = new Path("hdfs:///index-snapshot-dir");
+                FSUtils.setRootDir(configuration, rootDir);
+                Path restoreDir = new Path(FSUtils.getRootDir(configuration), "restore-dir");
+
+                // set input for map reduce job using hbase snapshots
+                PhoenixMapReduceUtil
+                    .setInput(job, PhoenixIndexDBWritable.class, snapshotName, qDataTable,
restoreDir, selectQuery);
+            }
             TableMapReduceUtil.initCredentials(job);
             
-            
             if (useDirectApi) {
                 return configureSubmittableJobUsingDirectApi(job, false);
             } else {
@@ -464,6 +492,7 @@ public class IndexTool extends Configured implements Tool {
             boolean useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
             String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
             boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+            boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
             connection = ConnectionUtil.getInputConnection(configuration);
             byte[][] splitKeysBeforeJob = null;
             boolean isLocalIndexBuild = false;
@@ -494,7 +523,7 @@ public class IndexTool extends Configured implements Tool {
 			}
             
             Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName,
indexTable, dataTable,
-                    useDirectApi, isPartialBuild);
+                    useDirectApi, isPartialBuild, useSnapshot);
             if (!isForeground && useDirectApi) {
                 LOG.info("Running Index Build in Background - Submit async and exit");
                 job.submit();


Mime
View raw message