phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sama...@apache.org
Subject phoenix git commit: Changes to make builds stable again
Date Fri, 08 Jan 2016 06:29:23 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master 2b5863a51 -> cc6cd9f56


Changes to make builds stable again


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cc6cd9f5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cc6cd9f5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cc6cd9f5

Branch: refs/heads/master
Commit: cc6cd9f561398c74b697322e140825973a45662a
Parents: 2b5863a
Author: Samarth <samarth.jain@salesforce.com>
Authored: Thu Jan 7 22:28:23 2016 -0800
Committer: Samarth <samarth.jain@salesforce.com>
Committed: Thu Jan 7 22:28:23 2016 -0800

----------------------------------------------------------------------
 .../end2end/mapreduce/CsvBulkLoadToolIT.java    | 372 +++++++++++++++++++
 .../phoenix/end2end/mapreduce/IndexToolIT.java  | 339 +++++++++++++++++
 .../phoenix/mapreduce/CsvBulkLoadToolIT.java    | 371 ------------------
 .../apache/phoenix/mapreduce/IndexToolIT.java   | 339 -----------------
 pom.xml                                         |  11 +-
 5 files changed, 718 insertions(+), 714 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc6cd9f5/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/CsvBulkLoadToolIT.java
new file mode 100644
index 0000000..1bc36d0
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/CsvBulkLoadToolIT.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.mapreduce;
+
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.CsvBulkLoadTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class CsvBulkLoadToolIT {
+
+    // We use HBaseTestUtil because we need to start up a MapReduce cluster as well
+    private static HBaseTestingUtility hbaseTestUtil;
+    private static String zkQuorum;
+    private static Connection conn;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        hbaseTestUtil = new HBaseTestingUtility();
+        Configuration conf = hbaseTestUtil.getConfiguration();
+        setUpConfigForMiniCluster(conf);
+        // Since we're using the real PhoenixDriver in this test, remove the
+        // extra JDBC argument that causes the test driver to be used.
+        conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        hbaseTestUtil.startMiniCluster();
+        hbaseTestUtil.startMiniMapReduceCluster();
+
+        Class.forName(PhoenixDriver.class.getName());
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+        conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL
+                + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        try {
+            if (conn != null) conn.close();
+        } finally {
+            try {
+                DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+            } finally {
+                try {
+                    hbaseTestUtil.shutdownMiniMapReduceCluster();
+                } finally {
+                    hbaseTestUtil.shutdownMiniCluster();
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testBasicImport() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
+
+        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,Name 1,1970/01/01");
+        printWriter.println("2,Name 2,1970/01/02");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+        csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+        int exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input1.csv",
+                "--table", "table1",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table1 ORDER BY id");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals("Name 1", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("Name 2", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+        assertFalse(rs.next());
+
+        rs.close();
+        stmt.close();
+    }
+
+    @Test
+    public void testFullOptionImport() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE2 (ID INTEGER NOT NULL PRIMARY KEY, " +
+                "NAME VARCHAR, NAMES VARCHAR ARRAY)");
+
+        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input2.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1|Name 1a;Name 1b");
+        printWriter.println("2|Name 2a;Name 2b");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        int exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input2.csv",
+                "--table", "table2",
+                "--zookeeper", zkQuorum,
+                "--delimiter", "|",
+                "--array-delimiter", ";",
+                "--import-columns", "ID,NAMES"});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, names FROM table2 ORDER BY id");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertArrayEquals(new Object[] { "Name 1a", "Name 1b" }, (Object[]) rs.getArray(2).getArray());
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertArrayEquals(new Object[] { "Name 2a", "Name 2b" }, (Object[]) rs.getArray(2).getArray());
+        assertFalse(rs.next());
+
+        rs.close();
+        stmt.close();
+    }
+
+    @Test
+    public void testMultipleInputFiles() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE7 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
+
+        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,Name 1,1970/01/01");
+        printWriter.close();
+        outputStream = fs.create(new Path("/tmp/input2.csv"));
+        printWriter = new PrintWriter(outputStream);
+        printWriter.println("2,Name 2,1970/01/02");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+        csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+        int exitCode = csvBulkLoadTool.run(new String[] {
+            "--input", "/tmp/input1.csv,/tmp/input2.csv",
+            "--table", "table7",
+            "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table7 ORDER BY id");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals("Name 1", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("Name 2", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+        assertFalse(rs.next());
+
+        rs.close();
+        stmt.close();
+    }
+
+    @Test
+    public void testImportWithIndex() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE3 (ID INTEGER NOT NULL PRIMARY KEY, " +
+            "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+        String ddl = "CREATE INDEX TABLE3_IDX ON TABLE3 "
+                + " (FIRST_NAME ASC)"
+                + " INCLUDE (LAST_NAME)";
+        stmt.execute(ddl);
+        
+        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,FirstName 1,LastName 1");
+        printWriter.println("2,FirstName 2,LastName 2");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        int exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input3.csv",
+                "--table", "table3",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE3 where first_name='FirstName 2'");
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("FirstName 2", rs.getString(2));
+
+        rs.close();
+        stmt.close();
+    }
+
+    @Test
+    public void testImportWithLocalIndex() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " +
+                "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+        String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 "
+                + " (FIRST_NAME ASC)";
+        stmt.execute(ddl);
+        ddl = "CREATE LOCAL INDEX TABLE6_IDX2 ON TABLE6 " + " (LAST_NAME ASC)";
+        stmt.execute(ddl);
+
+        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,FirstName 1,LastName 1");
+        printWriter.println("2,FirstName 2,LastName 2");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        try {
+            csvBulkLoadTool.run(new String[] {
+                    "--input", "/tmp/input3.csv",
+                    "--table", "table6",
+                    "--zookeeper", zkQuorum});
+            fail("Csv bulk load currently has issues with local indexes.");
+        } catch( UnsupportedOperationException ise) {
+            assertEquals("Local indexes not supported by Bulk Loader",ise.getMessage());
+        }
+        
+    }
+
+    @Test
+    public void testImportOneIndexTable() throws Exception {
+        testImportOneIndexTable("TABLE4", false);
+    }
+
+    //@Test
+    public void testImportOneLocalIndexTable() throws Exception {
+        testImportOneIndexTable("TABLE5", true);
+    }
+
+    public void testImportOneIndexTable(String tableName, boolean localIndex) throws Exception {
+
+        String indexTableName = String.format("%s_IDX", tableName);
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+                + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+        String ddl =
+                "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON "
+                        + tableName + "(FIRST_NAME ASC)";
+        stmt.execute(ddl);
+
+        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,FirstName 1,LastName 1");
+        printWriter.println("2,FirstName 2,LastName 2");
+        printWriter.close();
+
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        int exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input4.csv",
+                "--table", tableName,
+                "--index-table", indexTableName,
+                "--zookeeper", zkQuorum });
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+        assertFalse(rs.next());
+        rs = stmt.executeQuery("SELECT FIRST_NAME FROM " + tableName + " where FIRST_NAME='FirstName 1'");
+        assertTrue(rs.next());
+        assertEquals("FirstName 1", rs.getString(1));
+
+        rs.close();
+        stmt.close();
+    }
+    
+    @Test
+    public void testInvalidArguments() {
+        String tableName = "TABLE8";
+        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        try {
+            csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input4.csv",
+                "--table", tableName,
+                "--zookeeper", zkQuorum });
+            fail(String.format("Table %s not created, hence should fail",tableName));
+        } catch (Exception ex) {
+            assertTrue(ex instanceof IllegalArgumentException); 
+            assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName)));
+        }
+    }
+    
+    @Test
+    public void testAlreadyExistsOutputPath() {
+        String tableName = "TABLE9";
+        String outputPath = "/tmp/output/tabl9";
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+                    + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+            
+            FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+            fs.create(new Path(outputPath));
+            FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv"));
+            PrintWriter printWriter = new PrintWriter(outputStream);
+            printWriter.println("1,FirstName 1,LastName 1");
+            printWriter.println("2,FirstName 2,LastName 2");
+            printWriter.close();
+            
+            CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+            csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+            csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input9.csv",
+                "--output", outputPath,
+                "--table", tableName,
+                "--zookeeper", zkQuorum });
+            
+            fail(String.format("Output path %s already exists. hence, should fail",outputPath));
+        } catch (Exception ex) {
+            assertTrue(ex instanceof FileAlreadyExistsException); 
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc6cd9f5/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/IndexToolIT.java
new file mode 100644
index 0000000..e5696f0
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/mapreduce/IndexToolIT.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.mapreduce;
+
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests for the {@link IndexTool}
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexToolIT {
+    
+    private static HBaseTestingUtility hbaseTestUtil;
+    private static String zkQuorum;
+  
+    @BeforeClass
+    public static void setUp() throws Exception {
+        hbaseTestUtil = new HBaseTestingUtility();
+        Configuration conf = hbaseTestUtil.getConfiguration();
+        conf.setBoolean("hbase.defaults.for.version.skip", true);
+        // Since we're using the real PhoenixDriver in this test, remove the
+        // extra JDBC argument that causes the test driver to be used.
+        conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        setUpConfigForMiniCluster(conf);
+        hbaseTestUtil.startMiniCluster();
+        hbaseTestUtil.startMiniMapReduceCluster();
+        Class.forName(PhoenixDriver.class.getName());
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+    }
+    
+    @Test
+    public void testImmutableGlobalIndex() throws Exception {
+        testSecondaryIndex("SCHEMA", "DATA_TABLE1", true, false);
+    }
+    
+    @Test
+    public void testImmutableLocalIndex() throws Exception {
+        testSecondaryIndex("SCHEMA", "DATA_TABLE2", true, true);
+    }
+    
+    @Test
+    public void testMutableGlobalIndex() throws Exception {
+        testSecondaryIndex("SCHEMA", "DATA_TABLE3", false, false);
+    }
+    
+    @Test
+    public void testMutableLocalIndex() throws Exception {
+        testSecondaryIndex("SCHEMA", "DATA_TABLE4", false, true);
+    }
+    
+    @Test
+    public void testImmutableGlobalIndexDirectApi() throws Exception {
+    	testSecondaryIndex("SCHEMA", "DATA_TABLE5", true, false, true);
+    }
+    
+    @Test
+    public void testImmutableLocalIndexDirectApi() throws Exception {
+    	testSecondaryIndex("SCHEMA", "DATA_TABLE6", true, true, true);
+    }
+    
+    @Test
+    public void testMutableGlobalIndexDirectApi() throws Exception {
+    	testSecondaryIndex("SCHEMA", "DATA_TABLE7", false, false, true);
+    }
+    
+    @Test
+    public void testMutableLocalIndexDirectApi() throws Exception {
+    	testSecondaryIndex("SCHEMA", "DATA_TABLE8", false, true, true);
+    }
+    
+    public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal) throws Exception {
+    	testSecondaryIndex(schemaName, dataTable, isImmutable, isLocal, false);
+    }
+    
+    public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal, final boolean directApi) throws Exception {
+        
+    	final String fullTableName = SchemaUtil.getTableName(schemaName, dataTable);
+        final String indxTable = String.format("%s_%s",dataTable,"INDX");
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
+        Statement stmt = conn.createStatement();
+        try {
+        
+            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s", fullTableName, (isImmutable ? "IMMUTABLE_ROWS=true" :"")));
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            
+            int id = 1;
+            // insert two rows
+            upsertRow(stmt1, id++);
+            upsertRow(stmt1, id++);
+            conn.commit();
+            
+            stmt.execute(String.format("CREATE %s INDEX %s ON %s  (LPAD(UPPER(NAME),8,'x')||'_xyz') ASYNC ", (isLocal ? "LOCAL" : ""), indxTable, fullTableName));
+   
+            //verify rows are fetched from data table.
+            String selectSql = String.format("SELECT LPAD(UPPER(NAME),8,'x')||'_xyz',ID FROM %s", fullTableName);
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            
+            //assert we are pulling from data table.
+            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s", fullTableName), actualExplainPlan);
+            
+            rs = stmt1.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("xxUNAME1_xyz", rs.getString(1));    
+            assertTrue(rs.next());
+            assertEquals("xxUNAME2_xyz", rs.getString(1));
+           
+            //run the index MR job.
+            final IndexTool indexingTool = new IndexTool();
+            indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+            
+            final String[] cmdArgs = getArgValues(schemaName, dataTable, indxTable, directApi);
+            int status = indexingTool.run(cmdArgs);
+            assertEquals(0, status);
+            
+            // insert two more rows
+            upsertRow(stmt1, 3);
+            upsertRow(stmt1, 4);
+            conn.commit();
+            
+            rs = stmt1.executeQuery("SELECT * FROM "+SchemaUtil.getTableName(schemaName, indxTable));
+
+            //assert we are pulling from index table.
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertExplainPlan(actualExplainPlan,schemaName,dataTable,indxTable,isLocal);
+            
+            rs = stmt.executeQuery(selectSql);
+//            assertTrue(rs.next());
+//            assertEquals("xxUNAME1_xyz", rs.getString(1));
+//            assertEquals(1, rs.getInt(2));
+//            
+//            assertTrue(rs.next());
+//            assertEquals("xxUNAME2_xyz", rs.getString(1));
+//            assertEquals(2, rs.getInt(2));
+//
+//            assertTrue(rs.next());
+//            assertEquals("xxUNAME3_xyz", rs.getString(1));
+//            assertEquals(3, rs.getInt(2));
+//            
+//            assertTrue(rs.next());
+//            assertEquals("xxUNAME4_xyz", rs.getString(1));
+//            assertEquals(4, rs.getInt(2));
+//      
+//            assertFalse(rs.next());
+            
+            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , fullTableName));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    
+    /**
+     * This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before
+     * the MR job runs, do show up in the index table . 
+     * @throws Exception
+     */
+    @Test
+    public void testMutalbleIndexWithUpdates() throws Exception {
+        
+        final String dataTable = "DATA_TABLE5";
+        final String indxTable = String.format("%s_%s",dataTable,"INDX");
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
+        Statement stmt = conn.createStatement();
+        try {
+        
+            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable));
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            
+            int id = 1;
+            // insert two rows
+            upsertRow(stmt1, id++);
+            upsertRow(stmt1, id++);
+            conn.commit();
+            
+            stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable));
+            
+            //update a row 
+            stmt1.setInt(1, 1);
+            stmt1.setString(2, "uname" + String.valueOf(10));
+            stmt1.setInt(3, 95050 + 1);
+            stmt1.executeUpdate();
+            conn.commit();  
+            
+            //verify rows are fetched from data table.
+            String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            
+            //assert we are pulling from data table.
+            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
+            
+            rs = stmt1.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME10", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+           
+            //run the index MR job.
+            final IndexTool indexingTool = new IndexTool();
+            indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+            
+            final String[] cmdArgs = getArgValues(null, dataTable,indxTable);
+            int status = indexingTool.run(cmdArgs);
+            assertEquals(0, status);
+            
+            //assert we are pulling from index table.
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertExplainPlan(actualExplainPlan,null,dataTable,indxTable,false);
+            
+            rs = stmt.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME10", rs.getString(1));
+            assertEquals(1, rs.getInt(2));
+            
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , dataTable));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    private void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
+            String indxTable, boolean isLocal) {
+        
+        String expectedExplainPlan = "";
+        if(isLocal) {
+            final String localIndexName = MetaDataUtil.getLocalIndexTableName(SchemaUtil.getTableName(schemaName, dataTable));
+            expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]"
+                + "\n    SERVER FILTER BY FIRST KEY ONLY", localIndexName);
+        } else {
+            expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s"
+                    + "\n    SERVER FILTER BY FIRST KEY ONLY",SchemaUtil.getTableName(schemaName, indxTable));
+        }
+        assertEquals(expectedExplainPlan,actualExplainPlan);
+    }
+
+    private String[] getArgValues(String schemaName, String dataTable, String indxTable) {
+        return getArgValues(schemaName, dataTable, indxTable, false);
+    }
+    
+    private String[] getArgValues(String schemaName, String dataTable, String indxTable, boolean directApi) {
+        final List<String> args = Lists.newArrayList();
+        if (schemaName!=null) {
+            args.add("-s");
+            args.add(schemaName);
+        }
+        args.add("-dt");
+        args.add(dataTable);
+        args.add("-it");
+        args.add(indxTable);
+        if(directApi) {
+            args.add("-direct");
+            // Need to run this job in foreground for the test to be deterministic
+            args.add("-runfg");
+        }
+
+        args.add("-op");
+        args.add("/tmp/"+UUID.randomUUID().toString());
+        return args.toArray(new String[0]);
+    }
+
+    private void upsertRow(PreparedStatement stmt, int i) throws SQLException {
+        // insert row
+        stmt.setInt(1, i);
+        stmt.setString(2, "uname" + String.valueOf(i));
+        stmt.setInt(3, 95050 + i);
+        stmt.executeUpdate();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        try {
+            DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+        } finally {
+            try {
+                hbaseTestUtil.shutdownMiniMapReduceCluster();
+            } finally {
+                hbaseTestUtil.shutdownMiniCluster();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc6cd9f5/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
deleted file mode 100644
index 2970d56..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.mapreduce;
-
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.PrintWriter;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.util.DateUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class CsvBulkLoadToolIT {
-
-    // We use HBaseTestUtil because we need to start up a MapReduce cluster as well
-    private static HBaseTestingUtility hbaseTestUtil;
-    private static String zkQuorum;
-    private static Connection conn;
-
-    @BeforeClass
-    public static void setUp() throws Exception {
-        hbaseTestUtil = new HBaseTestingUtility();
-        Configuration conf = hbaseTestUtil.getConfiguration();
-        setUpConfigForMiniCluster(conf);
-        // Since we're using the real PhoenixDriver in this test, remove the
-        // extra JDBC argument that causes the test driver to be used.
-        conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-        hbaseTestUtil.startMiniCluster();
-        hbaseTestUtil.startMiniMapReduceCluster();
-
-        Class.forName(PhoenixDriver.class.getName());
-        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
-        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
-        conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL
-                + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum);
-    }
-
-    @AfterClass
-    public static void tearDownAfterClass() throws Exception {
-        try {
-            if (conn != null) conn.close();
-        } finally {
-            try {
-                DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
-            } finally {
-                try {
-                    hbaseTestUtil.shutdownMiniMapReduceCluster();
-                } finally {
-                    hbaseTestUtil.shutdownMiniCluster();
-                }
-            }
-        }
-    }
-
-    @Test
-    public void testBasicImport() throws Exception {
-
-        Statement stmt = conn.createStatement();
-        stmt.execute("CREATE TABLE TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
-
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
-        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
-        PrintWriter printWriter = new PrintWriter(outputStream);
-        printWriter.println("1,Name 1,1970/01/01");
-        printWriter.println("2,Name 2,1970/01/02");
-        printWriter.close();
-
-        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
-        csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
-        int exitCode = csvBulkLoadTool.run(new String[] {
-                "--input", "/tmp/input1.csv",
-                "--table", "table1",
-                "--zookeeper", zkQuorum});
-        assertEquals(0, exitCode);
-
-        ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table1 ORDER BY id");
-        assertTrue(rs.next());
-        assertEquals(1, rs.getInt(1));
-        assertEquals("Name 1", rs.getString(2));
-        assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
-        assertTrue(rs.next());
-        assertEquals(2, rs.getInt(1));
-        assertEquals("Name 2", rs.getString(2));
-        assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
-        assertFalse(rs.next());
-
-        rs.close();
-        stmt.close();
-    }
-
-    @Test
-    public void testFullOptionImport() throws Exception {
-
-        Statement stmt = conn.createStatement();
-        stmt.execute("CREATE TABLE TABLE2 (ID INTEGER NOT NULL PRIMARY KEY, " +
-                "NAME VARCHAR, NAMES VARCHAR ARRAY)");
-
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
-        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input2.csv"));
-        PrintWriter printWriter = new PrintWriter(outputStream);
-        printWriter.println("1|Name 1a;Name 1b");
-        printWriter.println("2|Name 2a;Name 2b");
-        printWriter.close();
-
-        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
-        int exitCode = csvBulkLoadTool.run(new String[] {
-                "--input", "/tmp/input2.csv",
-                "--table", "table2",
-                "--zookeeper", zkQuorum,
-                "--delimiter", "|",
-                "--array-delimiter", ";",
-                "--import-columns", "ID,NAMES"});
-        assertEquals(0, exitCode);
-
-        ResultSet rs = stmt.executeQuery("SELECT id, names FROM table2 ORDER BY id");
-        assertTrue(rs.next());
-        assertEquals(1, rs.getInt(1));
-        assertArrayEquals(new Object[] { "Name 1a", "Name 1b" }, (Object[]) rs.getArray(2).getArray());
-        assertTrue(rs.next());
-        assertEquals(2, rs.getInt(1));
-        assertArrayEquals(new Object[] { "Name 2a", "Name 2b" }, (Object[]) rs.getArray(2).getArray());
-        assertFalse(rs.next());
-
-        rs.close();
-        stmt.close();
-    }
-
-    @Test
-    public void testMultipleInputFiles() throws Exception {
-
-        Statement stmt = conn.createStatement();
-        stmt.execute("CREATE TABLE TABLE7 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
-
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
-        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
-        PrintWriter printWriter = new PrintWriter(outputStream);
-        printWriter.println("1,Name 1,1970/01/01");
-        printWriter.close();
-        outputStream = fs.create(new Path("/tmp/input2.csv"));
-        printWriter = new PrintWriter(outputStream);
-        printWriter.println("2,Name 2,1970/01/02");
-        printWriter.close();
-
-        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
-        csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
-        int exitCode = csvBulkLoadTool.run(new String[] {
-            "--input", "/tmp/input1.csv,/tmp/input2.csv",
-            "--table", "table7",
-            "--zookeeper", zkQuorum});
-        assertEquals(0, exitCode);
-
-        ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table7 ORDER BY id");
-        assertTrue(rs.next());
-        assertEquals(1, rs.getInt(1));
-        assertEquals("Name 1", rs.getString(2));
-        assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
-        assertTrue(rs.next());
-        assertEquals(2, rs.getInt(1));
-        assertEquals("Name 2", rs.getString(2));
-        assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
-        assertFalse(rs.next());
-
-        rs.close();
-        stmt.close();
-    }
-
-    @Test
-    public void testImportWithIndex() throws Exception {
-
-        Statement stmt = conn.createStatement();
-        stmt.execute("CREATE TABLE TABLE3 (ID INTEGER NOT NULL PRIMARY KEY, " +
-            "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
-        String ddl = "CREATE INDEX TABLE3_IDX ON TABLE3 "
-                + " (FIRST_NAME ASC)"
-                + " INCLUDE (LAST_NAME)";
-        stmt.execute(ddl);
-        
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
-        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
-        PrintWriter printWriter = new PrintWriter(outputStream);
-        printWriter.println("1,FirstName 1,LastName 1");
-        printWriter.println("2,FirstName 2,LastName 2");
-        printWriter.close();
-
-        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
-        int exitCode = csvBulkLoadTool.run(new String[] {
-                "--input", "/tmp/input3.csv",
-                "--table", "table3",
-                "--zookeeper", zkQuorum});
-        assertEquals(0, exitCode);
-
-        ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE3 where first_name='FirstName 2'");
-        assertTrue(rs.next());
-        assertEquals(2, rs.getInt(1));
-        assertEquals("FirstName 2", rs.getString(2));
-
-        rs.close();
-        stmt.close();
-    }
-
-    @Test
-    public void testImportWithLocalIndex() throws Exception {
-
-        Statement stmt = conn.createStatement();
-        stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " +
-                "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
-        String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 "
-                + " (FIRST_NAME ASC)";
-        stmt.execute(ddl);
-        ddl = "CREATE LOCAL INDEX TABLE6_IDX2 ON TABLE6 " + " (LAST_NAME ASC)";
-        stmt.execute(ddl);
-
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
-        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
-        PrintWriter printWriter = new PrintWriter(outputStream);
-        printWriter.println("1,FirstName 1,LastName 1");
-        printWriter.println("2,FirstName 2,LastName 2");
-        printWriter.close();
-
-        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
-        try {
-            csvBulkLoadTool.run(new String[] {
-                    "--input", "/tmp/input3.csv",
-                    "--table", "table6",
-                    "--zookeeper", zkQuorum});
-            fail("Csv bulk load currently has issues with local indexes.");
-        } catch( UnsupportedOperationException ise) {
-            assertEquals("Local indexes not supported by Bulk Loader",ise.getMessage());
-        }
-        
-    }
-
-    @Test
-    public void testImportOneIndexTable() throws Exception {
-        testImportOneIndexTable("TABLE4", false);
-    }
-
-    //@Test
-    public void testImportOneLocalIndexTable() throws Exception {
-        testImportOneIndexTable("TABLE5", true);
-    }
-
-    public void testImportOneIndexTable(String tableName, boolean localIndex) throws Exception {
-
-        String indexTableName = String.format("%s_IDX", tableName);
-        Statement stmt = conn.createStatement();
-        stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
-                + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
-        String ddl =
-                "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON "
-                        + tableName + "(FIRST_NAME ASC)";
-        stmt.execute(ddl);
-
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
-        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv"));
-        PrintWriter printWriter = new PrintWriter(outputStream);
-        printWriter.println("1,FirstName 1,LastName 1");
-        printWriter.println("2,FirstName 2,LastName 2");
-        printWriter.close();
-
-        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
-        int exitCode = csvBulkLoadTool.run(new String[] {
-                "--input", "/tmp/input4.csv",
-                "--table", tableName,
-                "--index-table", indexTableName,
-                "--zookeeper", zkQuorum });
-        assertEquals(0, exitCode);
-
-        ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
-        assertFalse(rs.next());
-        rs = stmt.executeQuery("SELECT FIRST_NAME FROM " + tableName + " where FIRST_NAME='FirstName 1'");
-        assertTrue(rs.next());
-        assertEquals("FirstName 1", rs.getString(1));
-
-        rs.close();
-        stmt.close();
-    }
-    
-    @Test
-    public void testInvalidArguments() {
-        String tableName = "TABLE8";
-        CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
-        try {
-            csvBulkLoadTool.run(new String[] {
-                "--input", "/tmp/input4.csv",
-                "--table", tableName,
-                "--zookeeper", zkQuorum });
-            fail(String.format("Table %s not created, hence should fail",tableName));
-        } catch (Exception ex) {
-            assertTrue(ex instanceof IllegalArgumentException); 
-            assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName)));
-        }
-    }
-    
-    @Test
-    public void testAlreadyExistsOutputPath() {
-        String tableName = "TABLE9";
-        String outputPath = "/tmp/output/tabl9";
-        try {
-            Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
-                    + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
-            
-            FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
-            fs.create(new Path(outputPath));
-            FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv"));
-            PrintWriter printWriter = new PrintWriter(outputStream);
-            printWriter.println("1,FirstName 1,LastName 1");
-            printWriter.println("2,FirstName 2,LastName 2");
-            printWriter.close();
-            
-            CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-            csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
-            csvBulkLoadTool.run(new String[] {
-                "--input", "/tmp/input9.csv",
-                "--output", outputPath,
-                "--table", tableName,
-                "--zookeeper", zkQuorum });
-            
-            fail(String.format("Output path %s already exists. hence, should fail",outputPath));
-        } catch (Exception ex) {
-            assertTrue(ex instanceof FileAlreadyExistsException); 
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc6cd9f5/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
deleted file mode 100644
index c88a5f4..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.mapreduce;
-
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.google.common.collect.Lists;
-
-/**
- * Tests for the {@link IndexTool}
- */
-@Category(NeedsOwnMiniClusterTest.class)
-public class IndexToolIT {
-    
-    private static HBaseTestingUtility hbaseTestUtil;
-    private static String zkQuorum;
-  
-    @BeforeClass
-    public static void setUp() throws Exception {
-        hbaseTestUtil = new HBaseTestingUtility();
-        Configuration conf = hbaseTestUtil.getConfiguration();
-        conf.setBoolean("hbase.defaults.for.version.skip", true);
-        // Since we're using the real PhoenixDriver in this test, remove the
-        // extra JDBC argument that causes the test driver to be used.
-        conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-        setUpConfigForMiniCluster(conf);
-        hbaseTestUtil.startMiniCluster();
-        hbaseTestUtil.startMiniMapReduceCluster();
-        Class.forName(PhoenixDriver.class.getName());
-        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
-        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
-    }
-    
-    @Test
-    public void testImmutableGlobalIndex() throws Exception {
-        testSecondaryIndex("SCHEMA", "DATA_TABLE1", true, false);
-    }
-    
-    @Test
-    public void testImmutableLocalIndex() throws Exception {
-        testSecondaryIndex("SCHEMA", "DATA_TABLE2", true, true);
-    }
-    
-    @Test
-    public void testMutableGlobalIndex() throws Exception {
-        testSecondaryIndex("SCHEMA", "DATA_TABLE3", false, false);
-    }
-    
-    @Test
-    public void testMutableLocalIndex() throws Exception {
-        testSecondaryIndex("SCHEMA", "DATA_TABLE4", false, true);
-    }
-    
-    @Test
-    public void testImmutableGlobalIndexDirectApi() throws Exception {
-    	testSecondaryIndex("SCHEMA", "DATA_TABLE5", true, false, true);
-    }
-    
-    @Test
-    public void testImmutableLocalIndexDirectApi() throws Exception {
-    	testSecondaryIndex("SCHEMA", "DATA_TABLE6", true, true, true);
-    }
-    
-    @Test
-    public void testMutableGlobalIndexDirectApi() throws Exception {
-    	testSecondaryIndex("SCHEMA", "DATA_TABLE7", false, false, true);
-    }
-    
-    @Test
-    public void testMutableLocalIndexDirectApi() throws Exception {
-    	testSecondaryIndex("SCHEMA", "DATA_TABLE8", false, true, true);
-    }
-    
-    public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal) throws Exception {
-    	testSecondaryIndex(schemaName, dataTable, isImmutable, isLocal, false);
-    }
-    
-    public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal, final boolean directApi) throws Exception {
-        
-    	final String fullTableName = SchemaUtil.getTableName(schemaName, dataTable);
-        final String indxTable = String.format("%s_%s",dataTable,"INDX");
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
-        Statement stmt = conn.createStatement();
-        try {
-        
-            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s", fullTableName, (isImmutable ? "IMMUTABLE_ROWS=true" :"")));
-            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
-            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-            
-            int id = 1;
-            // insert two rows
-            upsertRow(stmt1, id++);
-            upsertRow(stmt1, id++);
-            conn.commit();
-            
-            stmt.execute(String.format("CREATE %s INDEX %s ON %s  (LPAD(UPPER(NAME),8,'x')||'_xyz') ASYNC ", (isLocal ? "LOCAL" : ""), indxTable, fullTableName));
-   
-            //verify rows are fetched from data table.
-            String selectSql = String.format("SELECT LPAD(UPPER(NAME),8,'x')||'_xyz',ID FROM %s", fullTableName);
-            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
-            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
-            
-            //assert we are pulling from data table.
-            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s", fullTableName), actualExplainPlan);
-            
-            rs = stmt1.executeQuery(selectSql);
-            assertTrue(rs.next());
-            assertEquals("xxUNAME1_xyz", rs.getString(1));    
-            assertTrue(rs.next());
-            assertEquals("xxUNAME2_xyz", rs.getString(1));
-           
-            //run the index MR job.
-            final IndexTool indexingTool = new IndexTool();
-            indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
-            
-            final String[] cmdArgs = getArgValues(schemaName, dataTable, indxTable, directApi);
-            int status = indexingTool.run(cmdArgs);
-            assertEquals(0, status);
-            
-            // insert two more rows
-            upsertRow(stmt1, 3);
-            upsertRow(stmt1, 4);
-            conn.commit();
-            
-            rs = stmt1.executeQuery("SELECT * FROM "+SchemaUtil.getTableName(schemaName, indxTable));
-
-            //assert we are pulling from index table.
-            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
-            actualExplainPlan = QueryUtil.getExplainPlan(rs);
-            assertExplainPlan(actualExplainPlan,schemaName,dataTable,indxTable,isLocal);
-            
-            rs = stmt.executeQuery(selectSql);
-//            assertTrue(rs.next());
-//            assertEquals("xxUNAME1_xyz", rs.getString(1));
-//            assertEquals(1, rs.getInt(2));
-//            
-//            assertTrue(rs.next());
-//            assertEquals("xxUNAME2_xyz", rs.getString(1));
-//            assertEquals(2, rs.getInt(2));
-//
-//            assertTrue(rs.next());
-//            assertEquals("xxUNAME3_xyz", rs.getString(1));
-//            assertEquals(3, rs.getInt(2));
-//            
-//            assertTrue(rs.next());
-//            assertEquals("xxUNAME4_xyz", rs.getString(1));
-//            assertEquals(4, rs.getInt(2));
-//      
-//            assertFalse(rs.next());
-            
-            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , fullTableName));
-        } finally {
-            conn.close();
-        }
-    }
-    
-    
-    /**
-     * This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before
-     * the MR job runs, do show up in the index table . 
-     * @throws Exception
-     */
-    @Test
-    public void testMutalbleIndexWithUpdates() throws Exception {
-        
-        final String dataTable = "DATA_TABLE5";
-        final String indxTable = String.format("%s_%s",dataTable,"INDX");
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
-        Statement stmt = conn.createStatement();
-        try {
-        
-            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable));
-            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
-            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-            
-            int id = 1;
-            // insert two rows
-            upsertRow(stmt1, id++);
-            upsertRow(stmt1, id++);
-            conn.commit();
-            
-            stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable));
-            
-            //update a row 
-            stmt1.setInt(1, 1);
-            stmt1.setString(2, "uname" + String.valueOf(10));
-            stmt1.setInt(3, 95050 + 1);
-            stmt1.executeUpdate();
-            conn.commit();  
-            
-            //verify rows are fetched from data table.
-            String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
-            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
-            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
-            
-            //assert we are pulling from data table.
-            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
-            
-            rs = stmt1.executeQuery(selectSql);
-            assertTrue(rs.next());
-            assertEquals("UNAME10", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("UNAME2", rs.getString(1));
-           
-            //run the index MR job.
-            final IndexTool indexingTool = new IndexTool();
-            indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
-            
-            final String[] cmdArgs = getArgValues(null, dataTable,indxTable);
-            int status = indexingTool.run(cmdArgs);
-            assertEquals(0, status);
-            
-            //assert we are pulling from index table.
-            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
-            actualExplainPlan = QueryUtil.getExplainPlan(rs);
-            assertExplainPlan(actualExplainPlan,null,dataTable,indxTable,false);
-            
-            rs = stmt.executeQuery(selectSql);
-            assertTrue(rs.next());
-            assertEquals("UNAME10", rs.getString(1));
-            assertEquals(1, rs.getInt(2));
-            
-            assertTrue(rs.next());
-            assertEquals("UNAME2", rs.getString(1));
-            assertEquals(2, rs.getInt(2));
-            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , dataTable));
-        } finally {
-            conn.close();
-        }
-    }
-    
-    private void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
-            String indxTable, boolean isLocal) {
-        
-        String expectedExplainPlan = "";
-        if(isLocal) {
-            final String localIndexName = MetaDataUtil.getLocalIndexTableName(SchemaUtil.getTableName(schemaName, dataTable));
-            expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]"
-                + "\n    SERVER FILTER BY FIRST KEY ONLY", localIndexName);
-        } else {
-            expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s"
-                    + "\n    SERVER FILTER BY FIRST KEY ONLY",SchemaUtil.getTableName(schemaName, indxTable));
-        }
-        assertEquals(expectedExplainPlan,actualExplainPlan);
-    }
-
-    private String[] getArgValues(String schemaName, String dataTable, String indxTable) {
-        return getArgValues(schemaName, dataTable, indxTable, false);
-    }
-    
-    private String[] getArgValues(String schemaName, String dataTable, String indxTable, boolean directApi) {
-        final List<String> args = Lists.newArrayList();
-        if (schemaName!=null) {
-            args.add("-s");
-            args.add(schemaName);
-        }
-        args.add("-dt");
-        args.add(dataTable);
-        args.add("-it");
-        args.add(indxTable);
-        if(directApi) {
-            args.add("-direct");
-            // Need to run this job in foreground for the test to be deterministic
-            args.add("-runfg");
-        }
-
-        args.add("-op");
-        args.add("/tmp/"+UUID.randomUUID().toString());
-        return args.toArray(new String[0]);
-    }
-
-    private void upsertRow(PreparedStatement stmt, int i) throws SQLException {
-        // insert row
-        stmt.setInt(1, i);
-        stmt.setString(2, "uname" + String.valueOf(i));
-        stmt.setInt(3, 95050 + i);
-        stmt.executeUpdate();
-    }
-
-    @AfterClass
-    public static void tearDownAfterClass() throws Exception {
-        try {
-            DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
-        } finally {
-            try {
-                hbaseTestUtil.shutdownMiniMapReduceCluster();
-            } finally {
-                hbaseTestUtil.shutdownMiniCluster();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc6cd9f5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b6a35e8..8860451 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,8 +121,8 @@
     <!-- Plugin versions -->
     <maven-eclipse-plugin.version>2.9</maven-eclipse-plugin.version>
     <maven-build-helper-plugin.version>1.9.1</maven-build-helper-plugin.version>
-    <maven-surefire-plugin.version>2.19</maven-surefire-plugin.version>
-    <maven-failsafe-plugin.version>2.19</maven-failsafe-plugin.version>
+    <maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
+    <maven-failsafe-plugin.version>2.19.1</maven-failsafe-plugin.version>
     
     <maven-dependency-plugin.version>2.1</maven-dependency-plugin.version>
     <maven.assembly.version>2.5.2</maven.assembly.version>
@@ -130,7 +130,7 @@
 
     <!-- Plugin options -->
     <numForkedUT>3</numForkedUT>
-    <numForkedIT>7</numForkedIT>
+    <numForkedIT>5</numForkedIT>
     
     <!-- Set default encoding so multi-byte tests work correctly on the Mac -->
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -268,6 +268,7 @@
                  <configuration>
                  <encoding>UTF-8</encoding>
                  <forkCount>${numForkedIT}</forkCount>
+                 <runOrder>alphabetical</runOrder>
                  <reuseForks>true</reuseForks>
                  <argLine>-enableassertions -Xmx2000m -XX:MaxPermSize=128m -Djava.security.egd=file:/dev/./urandom "-Djava.library.path=${hadoop.library.path}${path.separator}${java.library.path}"</argLine>
                  <redirectTestOutputToFile>${test.output.tofile}</redirectTestOutputToFile>
@@ -284,6 +285,7 @@
                  <configuration>
                  <encoding>UTF-8</encoding>
                  <forkCount>${numForkedIT}</forkCount>
+                 <runOrder>alphabetical</runOrder>
                  <reuseForks>true</reuseForks>
                  <argLine>-enableassertions -Xmx2000m -XX:MaxPermSize=128m -Djava.security.egd=file:/dev/./urandom "-Djava.library.path=${hadoop.library.path}${path.separator}${java.library.path}"</argLine>
                  <redirectTestOutputToFile>${test.output.tofile}</redirectTestOutputToFile>
@@ -300,7 +302,8 @@
                  <configuration>
                  <encoding>UTF-8</encoding>
                  <forkCount>${numForkedIT}</forkCount>
-                 <reuseForks>true</reuseForks>
+                 <runOrder>alphabetical</runOrder>
+                 <reuseForks>false</reuseForks>
                  <argLine>-enableassertions -Xmx2000m -XX:MaxPermSize=128m -Djava.security.egd=file:/dev/./urandom "-Djava.library.path=${hadoop.library.path}${path.separator}${java.library.path}"</argLine>
                  <redirectTestOutputToFile>${test.output.tofile}</redirectTestOutputToFile>
                  <testSourceDirectory>${basedir}/src/it/java</testSourceDirectory>


Mime
View raw message