phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject [05/12] phoenix git commit: PHOENIX-2154 Failure of one mapper should not affect other mappers in MR index build (Ravi Kishore Valeti)
Date Thu, 03 Sep 2015 21:41:52 GMT
PHOENIX-2154 Failure of one mapper should not affect other mappers in MR index build (Ravi
Kishore Valeti)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/16fcdf9e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/16fcdf9e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/16fcdf9e

Branch: refs/heads/calcite
Commit: 16fcdf9e1c116758027b79a24f9ec701cb63496f
Parents: 2f128ee
Author: Thomas D'Silva <tdsilva@salesforce.com>
Authored: Fri Aug 28 17:48:43 2015 -0700
Committer: Thomas D'Silva <tdsilva@salesforce.com>
Committed: Fri Aug 28 17:48:43 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/mapreduce/IndexToolIT.java   |  41 ++-
 .../mapreduce/index/DirectHTableWriter.java     | 115 ++++++++
 .../phoenix/mapreduce/index/IndexTool.java      | 272 ++++++++++++-------
 .../phoenix/mapreduce/index/IndexToolUtil.java  |  76 ++++++
 .../index/PhoenixIndexImportDirectMapper.java   | 136 ++++++++++
 .../index/PhoenixIndexToolReducer.java          |  60 ++++
 6 files changed, 597 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/16fcdf9e/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
index 90411df..bc85c6a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.mapreduce;
 import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
@@ -91,7 +90,31 @@ public class IndexToolIT {
         testSecondaryIndex("SCHEMA", "DATA_TABLE4", false, true);
     }
     
+    @Test
+    public void testImmutableGlobalIndexDirectApi() throws Exception {
+    	testSecondaryIndex("SCHEMA", "DATA_TABLE5", true, false, true);
+    }
+    
+    @Test
+    public void testImmutableLocalIndexDirectApi() throws Exception {
+    	testSecondaryIndex("SCHEMA", "DATA_TABLE6", true, true, true);
+    }
+    
+    @Test
+    public void testMutableGlobalIndexDirectApi() throws Exception {
+    	testSecondaryIndex("SCHEMA", "DATA_TABLE7", false, false, true);
+    }
+    
+    @Test
+    public void testMutableLocalIndexDirectApi() throws Exception {
+    	testSecondaryIndex("SCHEMA", "DATA_TABLE8", false, true, true);
+    }
+    
     public void testSecondaryIndex(final String schemaName, final String dataTable, final
boolean isImmutable , final boolean isLocal) throws Exception {
+    	testSecondaryIndex(schemaName, dataTable, isImmutable, isLocal, false);
+    }
+    
+    public void testSecondaryIndex(final String schemaName, final String dataTable, final
boolean isImmutable , final boolean isLocal, final boolean directApi) throws Exception {
         
     	final String fullTableName = SchemaUtil.getTableName(schemaName, dataTable);
         final String indxTable = String.format("%s_%s",dataTable,"INDX");
@@ -130,7 +153,7 @@ public class IndexToolIT {
             final IndexTool indexingTool = new IndexTool();
             indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
             
-            final String[] cmdArgs = getArgValues(schemaName, dataTable, indxTable);
+            final String[] cmdArgs = getArgValues(schemaName, dataTable, indxTable, directApi);
             int status = indexingTool.run(cmdArgs);
             assertEquals(0, status);
             
@@ -263,15 +286,25 @@ public class IndexToolIT {
     }
 
     private String[] getArgValues(String schemaName, String dataTable, String indxTable)
{
+        return getArgValues(schemaName, dataTable, indxTable, false);
+    }
+    
+    private String[] getArgValues(String schemaName, String dataTable, String indxTable,
boolean directApi) {
         final List<String> args = Lists.newArrayList();
         if (schemaName!=null) {
-        	args.add("-s");
-        	args.add(schemaName);
+            args.add("-s");
+            args.add(schemaName);
         }
         args.add("-dt");
         args.add(dataTable);
         args.add("-it");
         args.add(indxTable);
+        if(directApi) {
+            args.add("-direct");
+            // Need to run this job in foreground for the test to be deterministic
+            args.add("-runfg");
+        }
+
         args.add("-op");
         args.add("/tmp/"+UUID.randomUUID().toString());
         return args.toArray(new String[0]);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/16fcdf9e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
new file mode 100644
index 0000000..c9512c2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writes mutations directly to HBase using HBase front-door APIs.
+ */
+public class DirectHTableWriter {
+    private static final Logger LOG = LoggerFactory.getLogger(DirectHTableWriter.class);
+
+    private Configuration conf = null;
+
+    private HTable table;
+
+    /** Job parameter that specifies the output table. */
+    public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+
+    /**
+     * Optional job parameter to specify a peer cluster. Used specifying remote cluster when
copying
+     * between hbase clusters (the source is picked up from <code>hbase-site.xml</code>).
+     * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job,
+     *      Class, String, String, String)
+     */
+    public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
+
+    /** Optional job parameter to specify peer cluster's ZK client port */
+    public static final String QUORUM_PORT = "hbase.mapred.output.quorum.port";
+
+    /** Optional specification of the rs class name of the peer cluster */
+    public static final String REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
+    /** Optional specification of the rs impl name of the peer cluster */
+    public static final String REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
+
+    public DirectHTableWriter(Configuration otherConf) {
+        setConf(otherConf);
+    }
+
+    protected void setConf(Configuration otherConf) {
+        this.conf = HBaseConfiguration.create(otherConf);
+
+        String tableName = this.conf.get(OUTPUT_TABLE);
+        if (tableName == null || tableName.length() <= 0) {
+            throw new IllegalArgumentException("Must specify table name");
+        }
+
+        String address = this.conf.get(QUORUM_ADDRESS);
+        int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
+        String serverClass = this.conf.get(REGION_SERVER_CLASS);
+        String serverImpl = this.conf.get(REGION_SERVER_IMPL);
+
+        try {
+            if (address != null) {
+                ZKUtil.applyClusterKeyToConf(this.conf, address);
+            }
+            if (serverClass != null) {
+                this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
+            }
+            if (zkClientPort != 0) {
+                this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
+            }
+            this.table = new HTable(this.conf, tableName);
+            this.table.setAutoFlush(false, true);
+            LOG.info("Created table instance for " + tableName);
+        } catch (IOException e) {
+            LOG.error("IOException : ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void write(Mutation mutation) throws IOException {
+        if (mutation instanceof Put) this.table.put(new Put((Put) mutation));
+        else if (mutation instanceof Delete) this.table.delete(new Delete((Delete) mutation));
+        else throw new IOException("Pass a Delete or a Put");
+    }
+
+    protected Configuration getConf() {
+        return conf;
+    }
+
+    protected HTable getTable() {
+        return table;
+    }
+
+    public void close() throws IOException {
+        table.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/16fcdf9e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 8378469..8a4f963 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -42,6 +42,9 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
@@ -65,8 +68,6 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
  * An MR job to populate the index table from the data table.
  *
@@ -75,30 +76,41 @@ public class IndexTool extends Configured implements Tool {
 
     private static final Logger LOG = LoggerFactory.getLogger(IndexTool.class);
 
-    private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true, "Phoenix
schema name (optional)");
-    private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true,
"Data table name (mandatory)");
-    private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true,
"Index table name(mandatory)");
-    private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true,
"Output path where the files are written(mandatory)");
+    private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true,
+            "Phoenix schema name (optional)");
+    private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true,
+            "Data table name (mandatory)");
+    private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true,
+            "Index table name(mandatory)");
+    private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false,
+            "If specified, we avoid the bulk load (optional)");
+    private static final Option RUN_FOREGROUND_OPTION =
+            new Option(
+                    "runfg",
+                    "run-foreground",
+                    false,
+                    "Applicable on top of -direct option."
+                            + "If specified, runs index build in Foreground. Default - Runs
the build in background.");
+    private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true,
+            "Output path where the files are written");
     private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
-    
-    private static final String ALTER_INDEX_QUERY_TEMPLATE = "ALTER INDEX IF EXISTS %s ON
%s %s";  
     private static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";
-    
-    
+
     private Options getOptions() {
         final Options options = new Options();
         options.addOption(SCHEMA_NAME_OPTION);
         options.addOption(DATA_TABLE_OPTION);
         options.addOption(INDEX_TABLE_OPTION);
+        options.addOption(DIRECT_API_OPTION);
+        options.addOption(RUN_FOREGROUND_OPTION);
         options.addOption(OUTPUT_PATH_OPTION);
         options.addOption(HELP_OPTION);
         return options;
     }
-    
+
     /**
      * Parses the commandline arguments, throws IllegalStateException if mandatory arguments
are
      * missing.
-     *
      * @param args supplied command line arguments
      * @return the parsed command line
      */
@@ -111,7 +123,7 @@ public class IndexTool extends Configured implements Tool {
         try {
             cmdLine = parser.parse(options, args);
         } catch (ParseException e) {
-            printHelpAndExit("Error parsing command line options: "+ e.getMessage(), options);
+            printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
         }
 
         if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
@@ -119,23 +131,28 @@ public class IndexTool extends Configured implements Tool {
         }
 
         if (!cmdLine.hasOption(DATA_TABLE_OPTION.getOpt())) {
-            throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory
" 
-                   + "parameter");
+            throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory
"
+                    + "parameter");
         }
-        
+
         if (!cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
-            throw new IllegalStateException(INDEX_TABLE_OPTION.getLongOpt() + " is a mandatory
" 
-                   + "parameter");
+            throw new IllegalStateException(INDEX_TABLE_OPTION.getLongOpt() + " is a mandatory
"
+                    + "parameter");
         }
 
         if (!cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) {
-            throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory
" 
-                   + "parameter");
+            throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory
"
+                    + "parameter");
+        }
+
+        if (!cmdLine.hasOption(DIRECT_API_OPTION.getOpt())
+                && cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt())) {
+            throw new IllegalStateException(RUN_FOREGROUND_OPTION.getLongOpt()
+                    + " is applicable only for " + DIRECT_API_OPTION.getLongOpt());
         }
         return cmdLine;
     }
 
-   
     private void printHelpAndExit(String errorMessage, Options options) {
         System.err.println(errorMessage);
         printHelpAndExit(options, 1);
@@ -146,7 +163,7 @@ public class IndexTool extends Configured implements Tool {
         formatter.printHelp("help", options);
         System.exit(exitCode);
     }
-    
+
     @Override
     public int run(String[] args) throws Exception {
         Connection connection = null;
@@ -163,94 +180,165 @@ public class IndexTool extends Configured implements Tool {
             final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
             final String qDataTable = SchemaUtil.getTableName(schemaName, dataTable);
             final String qIndexTable = SchemaUtil.getTableName(schemaName, indexTable);
-         
+
             connection = ConnectionUtil.getInputConnection(configuration);
-            if(!isValidIndexTable(connection, qDataTable, indexTable)) {
-                throw new IllegalArgumentException(String.format(" %s is not an index table
for %s ",qIndexTable,qDataTable));
+            if (!isValidIndexTable(connection, qDataTable, indexTable)) {
+                throw new IllegalArgumentException(String.format(
+                    " %s is not an index table for %s ", qIndexTable, qDataTable));
             }
-            
+
             final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
             final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
-            
+
             // this is set to ensure index tables remains consistent post population.
             long indxTimestamp = pindexTable.getTimeStamp();
-            configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,Long.toString(indxTimestamp
+ 1));
-            
-            // check if the index type is LOCAL, if so, set the logicalIndexName that is
computed from the dataTable name.
-            String logicalIndexTable = qIndexTable;
-            if(IndexType.LOCAL.equals(pindexTable.getIndexType())) {
-                logicalIndexTable  = MetaDataUtil.getLocalIndexTableName(qDataTable);
+            configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
+                Long.toString(indxTimestamp + 1));
+
+            // check if the index type is LOCAL, if so, derive and set the physicalIndexName
that is
+            // computed from the qDataTable name.
+            String physicalIndexTable = qIndexTable;
+            if (IndexType.LOCAL.equals(pindexTable.getIndexType())) {
+                physicalIndexTable = MetaDataUtil.getLocalIndexTableName(qDataTable);
             }
-            
+
             final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
-            final PostIndexDDLCompiler ddlCompiler = new PostIndexDDLCompiler(pConnection,new
TableRef(pdataTable));
+            final PostIndexDDLCompiler ddlCompiler =
+                    new PostIndexDDLCompiler(pConnection, new TableRef(pdataTable));
             ddlCompiler.compile(pindexTable);
-            
+
             final List<String> indexColumns = ddlCompiler.getIndexColumnNames();
             final String selectQuery = ddlCompiler.getSelectQuery();
-            final String upsertQuery = QueryUtil.constructUpsertStatement(qIndexTable, indexColumns,
Hint.NO_INDEX);
-       
+            final String upsertQuery =
+                    QueryUtil.constructUpsertStatement(qIndexTable, indexColumns, Hint.NO_INDEX);
+
             configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery);
-            PhoenixConfigurationUtil.setPhysicalTableName(configuration, logicalIndexTable);
-            PhoenixConfigurationUtil.setOutputTableName(configuration, qIndexTable);
-            PhoenixConfigurationUtil.setUpsertColumnNames(configuration,indexColumns.toArray(new
String[indexColumns.size()]));
-            final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection,
qIndexTable, indexColumns);
+            PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable);
+            PhoenixConfigurationUtil.setOutputTableName(configuration, indexTable);
+            PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
+                indexColumns.toArray(new String[indexColumns.size()]));
+            final List<ColumnInfo> columnMetadataList =
+                    PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns);
             ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
+
+            final Path outputPath =
+                    new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()), physicalIndexTable);
+            FileSystem.get(configuration).delete(outputPath, true);
             
-            final Path outputPath =  new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()),logicalIndexTable);
-            
-            final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE,dataTable,indexTable);
+            final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, dataTable, indexTable);
             final Job job = Job.getInstance(configuration, jobName);
             job.setJarByClass(IndexTool.class);
-           
-            job.setMapperClass(PhoenixIndexImportMapper.class);
-            job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
-            job.setMapOutputValueClass(KeyValue.class);
-            PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,dataTable,selectQuery);
-     
-            TableMapReduceUtil.initCredentials(job);
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
             FileOutputFormat.setOutputPath(job, outputPath);
             
-            final HTable htable = new HTable(configuration, logicalIndexTable);
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
-
-            boolean status = job.waitForCompletion(true);
-            if (!status) {
-                LOG.error("Failed to run the IndexTool job. ");
-                htable.close();
-                return -1;
-            }
-            
-            LOG.info("Loading HFiles from {}", outputPath);
-            LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
-            loader.doBulkLoad(outputPath, htable);
-            htable.close();
+            PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable,
+                selectQuery);
+            TableMapReduceUtil.initCredentials(job);
             
-            LOG.info("Removing output directory {}", outputPath);
-            if (!FileSystem.get(configuration).delete(outputPath, true)) {
-                LOG.error("Removing output directory {} failed", outputPath);
+            boolean useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
+            if (useDirectApi) {
+                job.setMapperClass(PhoenixIndexImportDirectMapper.class);
+                configureSubmittableJobUsingDirectApi(job, outputPath,
+                    cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()));
+            } else {
+                job.setMapperClass(PhoenixIndexImportMapper.class);
+                configureRunnableJobUsingBulkLoad(job, outputPath);
+                // finally update the index state to ACTIVE.
+                IndexToolUtil.updateIndexState(connection, qDataTable, indexTable,
+                    PIndexState.ACTIVE);
             }
-            
-            // finally update the index state to ACTIVE.
-            updateIndexState(connection,qDataTable,indexTable,PIndexState.ACTIVE);
             return 0;
-            
         } catch (Exception ex) {
-           LOG.error(" An exception occured while performing the indexing job : "+ ExceptionUtils.getStackTrace(ex));
-           return -1;
+            LOG.error(" An exception occured while performing the indexing job : "
+                    + ExceptionUtils.getStackTrace(ex));
+            return -1;
         } finally {
             try {
-                if(connection != null) {
+                if (connection != null) {
                     connection.close();
                 }
-            } catch(SQLException sqle) {
-                LOG.error(" Failed to close connection ",sqle.getMessage());
+            } catch (SQLException sqle) {
+                LOG.error(" Failed to close connection ", sqle.getMessage());
                 throw new RuntimeException("Failed to close connection");
             }
         }
     }
 
     /**
+     * Submits the job and waits for completion.
+     * @param job
+     * @param outputPath
+     * @return
+     * @throws Exception
+     */
+    private int configureRunnableJobUsingBulkLoad(Job job, Path outputPath) throws Exception
{
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(KeyValue.class);
+        final Configuration configuration = job.getConfiguration();
+        final String logicalIndexTable =
+                PhoenixConfigurationUtil.getPhysicalTableName(configuration);
+        final HTable htable = new HTable(configuration, logicalIndexTable);
+        HFileOutputFormat.configureIncrementalLoad(job, htable);
+        boolean status = job.waitForCompletion(true);
+        if (!status) {
+            LOG.error("Failed to run the IndexTool job. ");
+            htable.close();
+            return -1;
+        }
+
+        LOG.info("Loading HFiles from {}", outputPath);
+        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
+        loader.doBulkLoad(outputPath, htable);
+        htable.close();
+        
+        FileSystem.get(configuration).delete(outputPath, true);
+        
+        return 0;
+    }
+    
+    /**
+     * Uses the HBase Front Door Api to write to index table. Submits the job and either
returns or
+     * waits for the job completion based on runForeground parameter.
+     * 
+     * @param job
+     * @param outputPath
+     * @param runForeground - if true, waits for job completion, else submits and returns
+     *            immediately.
+     * @return
+     * @throws Exception
+     */
+    private int configureSubmittableJobUsingDirectApi(Job job, Path outputPath, boolean runForeground)
+            throws Exception {
+        Configuration conf = job.getConfiguration();
+        HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+        // Set the Physical Table name for use in DirectHTableWriter#write(Mutation)
+        conf.set(TableOutputFormat.OUTPUT_TABLE,
+            PhoenixConfigurationUtil.getPhysicalTableName(job.getConfiguration()));
+        
+        //Set the Output classes
+        job.setMapOutputValueClass(IntWritable.class);
+        job.setReducerClass(PhoenixIndexToolReducer.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(NullWritable.class);
+        TableMapReduceUtil.addDependencyJars(job);
+        job.setNumReduceTasks(1);
+
+        if (!runForeground) {
+            LOG.info("Running Index Build in Background - Submit async and exit");
+            job.submit();
+            return 0;
+        }
+        LOG.info("Running Index Build in Foreground. Waits for the build to complete. This
may take a long time!.");
+        boolean result = job.waitForCompletion(true);
+        if (!result) {
+            LOG.error("Job execution failed!");
+            return -1;
+        }
+        FileSystem.get(conf).delete(outputPath, true);
+        return 0;
+    }
+
+    /**
      * Checks for the validity of the index table passed to the job.
      * @param connection
      * @param masterTable
@@ -258,43 +346,29 @@ public class IndexTool extends Configured implements Tool {
      * @return
      * @throws SQLException
      */
-    private boolean isValidIndexTable(final Connection connection, final String masterTable,
final String indexTable) throws SQLException {
+    private boolean isValidIndexTable(final Connection connection, final String masterTable,
+            final String indexTable) throws SQLException {
         final DatabaseMetaData dbMetaData = connection.getMetaData();
         final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
         final String tableName = SchemaUtil.getTableNameFromFullName(masterTable);
-        
+
         ResultSet rs = null;
         try {
             rs = dbMetaData.getIndexInfo(null, schemaName, tableName, false, false);
-            while(rs.next()) {
+            while (rs.next()) {
                 final String indexName = rs.getString(6);
-                if(indexTable.equalsIgnoreCase(indexName)) {
+                if (indexTable.equalsIgnoreCase(indexName)) {
                     return true;
                 }
             }
         } finally {
-            if(rs != null) {
+            if (rs != null) {
                 rs.close();
             }
         }
         return false;
     }
-    
-    /**
-     * Updates the index state.
-     * @param connection
-     * @param masterTable
-     * @param indexTable
-     * @param state
-     * @throws SQLException
-     */
-    private void updateIndexState(Connection connection, final String masterTable , final
String indexTable, PIndexState state) throws SQLException {
-        Preconditions.checkNotNull(connection);
-        final String alterQuery = String.format(ALTER_INDEX_QUERY_TEMPLATE,indexTable,masterTable,state.name());
-        connection.createStatement().execute(alterQuery);
-        LOG.info(" Updated the status of the index {} to {} " , indexTable , state.name());
-    }
-    
+
     public static void main(final String[] args) throws Exception {
         int result = ToolRunner.run(new IndexTool(), args);
         System.exit(result);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/16fcdf9e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
new file mode 100644
index 0000000..96e711c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.PIndexState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class for {@linkplain IndexTool}
+ *
+ */
+public class IndexToolUtil {
+
+	private static final String ALTER_INDEX_QUERY_TEMPLATE = "ALTER INDEX IF EXISTS %s ON %s
%s";  
+    
+	private static final Logger LOG = LoggerFactory.getLogger(IndexToolUtil.class);
+	
+	/**
+	 * Updates the index state.
+	 * @param configuration
+	 * @param state
+	 * @throws SQLException 
+	 */
+	public static void updateIndexState(Configuration configuration,PIndexState state) throws
SQLException {
+		final String masterTable = PhoenixConfigurationUtil.getInputTableName(configuration);
+		final String indexTable = PhoenixConfigurationUtil.getOutputTableName(configuration);
+		final Connection connection = ConnectionUtil.getOutputConnection(configuration);
+		try {
+			updateIndexState(connection, masterTable, indexTable , state);
+		} finally {
+			if(connection != null) {
+				connection.close();
+			}
+		}
+	}
+	
+	/**
+     * Updates the index state.
+     * @param connection
+     * @param masterTable
+     * @param indexTable
+     * @param state
+     * @throws SQLException
+     */
+    public static void updateIndexState(Connection connection, final String masterTable ,
final String indexTable, PIndexState state) throws SQLException {
+        Preconditions.checkNotNull(connection);
+        final String alterQuery = String.format(ALTER_INDEX_QUERY_TEMPLATE,indexTable,masterTable,state.name());
+        connection.createStatement().execute(alterQuery);
+        LOG.info(" Updated the status of the index {} to {} " , indexTable , state.name());
+    }
+	
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/16fcdf9e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
new file mode 100644
index 0000000..addbcae
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper that hands over rows from data table to the index table.
+ */
+public class PhoenixIndexImportDirectMapper extends
+        Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, IntWritable>
{
+
+    private static final Logger LOG = LoggerFactory.getLogger(PhoenixIndexImportDirectMapper.class);
+
+    private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable();
+
+    private List<ColumnInfo> indxTblColumnMetadata;
+
+    private Connection connection;
+
+    private PreparedStatement pStatement;
+
+    private DirectHTableWriter writer;
+
+    @Override
+    protected void setup(final Context context) throws IOException, InterruptedException
{
+        super.setup(context);
+        final Configuration configuration = context.getConfiguration();
+        writer = new DirectHTableWriter(configuration);
+
+        try {
+            indxTblColumnMetadata =
+                    PhoenixConfigurationUtil
+                            .getUpsertColumnMetadataList(configuration);
+            indxWritable.setColumnMetadata(indxTblColumnMetadata);
+
+            final Properties overrideProps = new Properties();
+            overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
+            connection = ConnectionUtil.getOutputConnection(configuration, overrideProps);
+            connection.setAutoCommit(false);
+            final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+            this.pStatement = connection.prepareStatement(upsertQuery);
+
+        } catch (SQLException e) {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    @Override
+    protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
+            throws IOException, InterruptedException {
+
+        context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+
+        try {
+            final List<Object> values = record.getValues();
+            indxWritable.setValues(values);
+            indxWritable.write(this.pStatement);
+            this.pStatement.execute();
+
+            final PhoenixConnection pconn = connection.unwrap(PhoenixConnection.class);
+            final Iterator<Pair<byte[], List<Mutation>>> iterator =
+                    pconn.getMutationState().toMutations(true);
+
+            while (iterator.hasNext()) {
+                Pair<byte[], List<Mutation>> mutationPair = iterator.next();
+                for (Mutation mutation : mutationPair.getSecond()) {
+                    writer.write(mutation);
+                }
+                context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1);
+            }
+            connection.rollback();
+        } catch (SQLException e) {
+            LOG.error(" Error {}  while read/write of a record ", e.getMessage());
+            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        // We are writing some dummy key-value as map output here so that we commit only
one
+        // output to reducer.
+        context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
+            new IntWritable(0));
+        super.cleanup(context);
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                LOG.error("Error {} while closing connection in the PhoenixIndexMapper class
",
+                    e.getMessage());
+            }
+        }
+        writer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/16fcdf9e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolReducer.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolReducer.java
new file mode 100644
index 0000000..6df0ee1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolReducer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.schema.PIndexState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reducer class that does only one task and that is to update the index state of the table.
+ */
+public class PhoenixIndexToolReducer extends
+        Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PhoenixIndexToolReducer.class);
+    private Configuration configuration;
+
+    /**
+     * Called once at the start of the task.
+     */
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+        configuration = context.getConfiguration();
+    }
+
+    @Override
+    protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> arg1,
+            Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context
arg2)
+            throws IOException, InterruptedException {
+        try {
+            IndexToolUtil.updateIndexState(configuration, PIndexState.ACTIVE);
+        } catch (SQLException e) {
+            LOG.error(" Failed to update the status to Active");
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+}


Mime
View raw message