incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/5] git commit: Refactoring all integration tests to run as a test suite in their own project. This is an effort to get the tests to run faster.
Date Mon, 08 Jun 2015 12:41:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java b/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java
deleted file mode 100644
index 53209a1..0000000
--- a/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTest.java
+++ /dev/null
@@ -1,579 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.hive;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.lang.reflect.Field;
-import java.net.ServerSocket;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.blur.MiniCluster;
-import org.apache.blur.mapreduce.lib.BlurColumn;
-import org.apache.blur.mapreduce.lib.BlurRecord;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.BlurQuery;
-import org.apache.blur.thrift.generated.BlurResults;
-import org.apache.blur.thrift.generated.ColumnDefinition;
-import org.apache.blur.thrift.generated.Query;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.GCWatcher;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.jdbc.HiveDriver;
-import org.apache.hive.service.server.HiveServer2;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.base.Splitter;
-
-public class BlurSerDeTest {
-
-  public static final File WAREHOUSE = new File("./target/tmp/warehouse");
-  public static final String COLUMN_SEP = new String(new char[] { 1 });
-  public static final String ITEM_SEP = new String(new char[] { 2 });
-  public static final File DERBY_FILE = new File("derby.log");
-  public static final File METASTORE_DB_FILE = new File("metastore_db");
-
-  private static final String FAM = "fam0";
-  private static final String YYYYMMDD = "yyyyMMdd";
-  private static final String YYYY_MM_DD = "yyyy-MM-dd";
-  private static final String TEST = "test";
-  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/tmp_BlurSerDeTest"));
-  private static MiniCluster miniCluster;
-  private static boolean externalProcesses = false;
-
-  @BeforeClass
-  public static void startCluster() throws IOException {
-    System.setProperty("hadoop.log.dir", "./target/tmp_BlurSerDeTest_hadoop_log");
-    GCWatcher.init(0.60);
-    LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
-    File testDirectory = new File(TMPDIR, "blur-SerDe-test").getAbsoluteFile();
-    testDirectory.mkdirs();
-
-    Path directory = new Path(testDirectory.getPath());
-    FsPermission dirPermissions = localFS.getFileStatus(directory).getPermission();
-    FsAction userAction = dirPermissions.getUserAction();
-    FsAction groupAction = dirPermissions.getGroupAction();
-    FsAction otherAction = dirPermissions.getOtherAction();
-
-    StringBuilder builder = new StringBuilder();
-    builder.append(userAction.ordinal());
-    builder.append(groupAction.ordinal());
-    builder.append(otherAction.ordinal());
-    String dirPermissionNum = builder.toString();
-    System.setProperty("dfs.datanode.data.dir.perm", dirPermissionNum);
-    testDirectory.delete();
-    miniCluster = new MiniCluster();
-    miniCluster.startBlurCluster(new File(testDirectory, "cluster").getAbsolutePath(), 2, 3, true, externalProcesses);
-    miniCluster.startMrMiniCluster();
-  }
-
-  @AfterClass
-  public static void shutdownCluster() throws IOException {
-    miniCluster.stopMrMiniCluster();
-    miniCluster.shutdownBlurCluster();
-  }
-
-  private String _mrWorkingPath;
-
-  @Before
-  public void setup() throws BlurException, TException, IOException {
-    _mrWorkingPath = miniCluster.getFileSystemUri().toString() + "/mrworkingpath";
-    String controllerConnectionStr = miniCluster.getControllerConnectionStr();
-    Iface client = BlurClient.getClient(controllerConnectionStr);
-    List<String> tableList = client.tableList();
-    if (!tableList.contains(TEST)) {
-      TableDescriptor tableDescriptor = new TableDescriptor();
-      tableDescriptor.setName(TEST);
-      tableDescriptor.setShardCount(1);
-      tableDescriptor.setTableUri(miniCluster.getFileSystemUri().toString() + "/blur/tables/test");
-      tableDescriptor.putToTableProperties(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH, _mrWorkingPath);
-
-      client.createTable(tableDescriptor);
-
-      Map<String, String> props = new HashMap<String, String>();
-      props.put("dateFormat", YYYYMMDD);
-
-      client.addColumnDefinition(TEST, cd(false, FAM, "string-col-single", "string"));
-      client.addColumnDefinition(TEST, cd(false, FAM, "text-col-single", "text"));
-      client.addColumnDefinition(TEST, cd(false, FAM, "stored-col-single", "stored"));
-      client.addColumnDefinition(TEST, cd(false, FAM, "double-col-single", "double"));
-      client.addColumnDefinition(TEST, cd(false, FAM, "float-col-single", "float"));
-      client.addColumnDefinition(TEST, cd(false, FAM, "long-col-single", "long"));
-      client.addColumnDefinition(TEST, cd(false, FAM, "int-col-single", "int"));
-      client.addColumnDefinition(TEST, cd(false, FAM, "date-col-single", "date", props));
-
-      client.addColumnDefinition(TEST, cd(false, FAM, "geo-col-single", "geo-pointvector"));
-
-      client.addColumnDefinition(TEST, cd(true, FAM, "string-col-multi", "string"));
-      client.addColumnDefinition(TEST, cd(true, FAM, "text-col-multi", "text"));
-      client.addColumnDefinition(TEST, cd(true, FAM, "stored-col-multi", "stored"));
-      client.addColumnDefinition(TEST, cd(true, FAM, "double-col-multi", "double"));
-      client.addColumnDefinition(TEST, cd(true, FAM, "float-col-multi", "float"));
-      client.addColumnDefinition(TEST, cd(true, FAM, "long-col-multi", "long"));
-      client.addColumnDefinition(TEST, cd(true, FAM, "int-col-multi", "int"));
-      client.addColumnDefinition(TEST, cd(true, FAM, "date-col-multi", "date", props));
-    }
-    rmr(WAREHOUSE);
-    rmr(METASTORE_DB_FILE);
-    rmr(DERBY_FILE);
-  }
-
-  @After
-  public void teardown() {
-    rmr(WAREHOUSE);
-    rmr(METASTORE_DB_FILE);
-    rmr(DERBY_FILE);
-  }
-
-  public static void rmr(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rmr(f);
-      }
-    }
-    file.delete();
-  }
-
-  private ColumnDefinition cd(boolean multiValue, String family, String columnName, String type) {
-    return cd(multiValue, family, columnName, type, null);
-  }
-
-  private ColumnDefinition cd(boolean multiValue, String family, String columnName, String type,
-      Map<String, String> props) {
-    ColumnDefinition columnDefinition = new ColumnDefinition(family, columnName, null, false, type, props, false);
-    columnDefinition.setMultiValueField(multiValue);
-    return columnDefinition;
-  }
-
-  @Test
-  public void test1() throws SerDeException {
-    long now = System.currentTimeMillis();
-    Date date = new Date(now);
-    SimpleDateFormat simpleDateFormat = new SimpleDateFormat(YYYYMMDD);
-    BlurSerDe blurSerDe = new BlurSerDe();
-
-    Configuration conf = new Configuration();
-    Properties tbl = new Properties();
-    tbl.put(BlurSerDe.TABLE, TEST);
-    tbl.put(BlurSerDe.FAMILY, FAM);
-    tbl.put(BlurSerDe.ZK, miniCluster.getZkConnectionString());
-
-    blurSerDe.initialize(conf, tbl);
-
-    ObjectInspector objectInspector = blurSerDe.getObjectInspector();
-    Object[] row = new Object[19];
-    int c = 0;
-    row[c++] = "rowid";
-    row[c++] = "recordid";
-    row[c++] = new Object[] { date, date };
-    row[c++] = date;
-    row[c++] = new Object[] { 1234.5678, 4321.5678 };
-    row[c++] = 1234.5678;
-    row[c++] = new Object[] { 1234.567f, 4321.567f };
-    row[c++] = 1234.567f;
-    row[c++] = new Object[] { 1.0f, 2.0f };
-    row[c++] = new Object[] { 12345678, 87654321 };
-    row[c++] = 12345678;
-    row[c++] = new Object[] { 12345678l, 87654321l };
-    row[c++] = 12345678l;
-    row[c++] = new Object[] { "stored input1", "stored input2" };
-    row[c++] = "stored input";
-    row[c++] = new Object[] { "string input1", "string input2" };
-    row[c++] = "string input";
-    row[c++] = new Object[] { "text input1", "text input2" };
-    row[c++] = "text input";
-
-    BlurRecord blurRecord = (BlurRecord) blurSerDe.serialize(row, objectInspector);
-    assertEquals("rowid", blurRecord.getRowId());
-    assertEquals("recordid", blurRecord.getRecordId());
-
-    Map<String, List<String>> columns = toMap(blurRecord.getColumns());
-
-    assertEquals(list("string input"), columns.get("string-col-single"));
-    assertEquals(list("string input1", "string input2"), columns.get("string-col-multi"));
-
-    assertEquals(list("text input"), columns.get("text-col-single"));
-    assertEquals(list("text input1", "text input2"), columns.get("text-col-multi"));
-
-    assertEquals(list("stored input"), columns.get("stored-col-single"));
-    assertEquals(list("stored input1", "stored input2"), columns.get("stored-col-multi"));
-
-    assertEquals(list("1234.5678"), columns.get("double-col-single"));
-    assertEquals(list("1234.5678", "4321.5678"), columns.get("double-col-multi"));
-
-    assertEquals(list("1234.567"), columns.get("float-col-single"));
-    assertEquals(list("1234.567", "4321.567"), columns.get("float-col-multi"));
-
-    assertEquals(list("12345678"), columns.get("long-col-single"));
-    assertEquals(list("12345678", "87654321"), columns.get("long-col-multi"));
-
-    assertEquals(list("12345678"), columns.get("int-col-single"));
-    assertEquals(list("12345678", "87654321"), columns.get("int-col-multi"));
-
-    assertEquals(list(simpleDateFormat.format(date)), columns.get("date-col-single"));
-    assertEquals(list(simpleDateFormat.format(date), simpleDateFormat.format(date)), columns.get("date-col-multi"));
-
-    assertEquals(list("1.0,2.0"), columns.get("geo-col-single"));
-  }
-
-  @Test
-  public void test2() throws SQLException, ClassNotFoundException, IOException, BlurException, TException,
-      InterruptedException {
-    int totalRecords = runLoad(true);
-    Iface client = BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
-    BlurQuery blurQuery = new BlurQuery();
-    Query query = new Query();
-    query.setQuery("*");
-    blurQuery.setQuery(query);
-    BlurResults results = client.query(TEST, blurQuery);
-    assertEquals(totalRecords, results.getTotalResults());
-  }
-
-  @Test
-  public void test3() throws Exception {
-    int totalRecords = runLoad(false);
-    Iface client = BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
-    BlurQuery blurQuery = new BlurQuery();
-    Query query = new Query();
-    query.setQuery("*");
-    blurQuery.setQuery(query);
-    BlurResults results = client.query(TEST, blurQuery);
-    assertEquals(totalRecords, results.getTotalResults());
-  }
-
-  private int runLoad(boolean disableMrUpdate) throws IOException, InterruptedException, ClassNotFoundException,
-      SQLException {
-
-    Configuration configuration = miniCluster.getMRConfiguration();
-    writeSiteFiles(configuration);
-    HiveConf hiveConf = new HiveConf(configuration, getClass());
-    hiveConf.set("hive.server2.thrift.port", "0");
-    HiveServer2 hiveServer2 = new HiveServer2();
-    hiveServer2.init(hiveConf);
-    hiveServer2.start();
-
-    int port = waitForStartupAndGetPort(hiveServer2);
-
-    Class.forName(HiveDriver.class.getName());
-    String userName = UserGroupInformation.getCurrentUser().getShortUserName();
-    Connection connection = DriverManager.getConnection("jdbc:hive2://localhost:" + port, userName, "");
-
-    run(connection, "set blur.mr.update.disabled=" + disableMrUpdate);
-    run(connection, "set hive.metastore.warehouse.dir=" + WAREHOUSE.toURI().toString());
-    run(connection, "create database if not exists testdb");
-    run(connection, "use testdb");
-
-    run(connection, "CREATE TABLE if not exists testtable ROW FORMAT SERDE 'org.apache.blur.hive.BlurSerDe' "
-        + "WITH SERDEPROPERTIES ( 'blur.zookeeper.connection'='" + miniCluster.getZkConnectionString() + "', "
-        + "'blur.table'='" + TEST + "', 'blur.family'='" + FAM + "' ) "
-        + "STORED BY 'org.apache.blur.hive.BlurHiveStorageHandler'");
-
-    run(connection, "desc testtable");
-
-    String createLoadTable = buildCreateLoadTable(connection);
-    run(connection, createLoadTable);
-    File dbDir = new File(WAREHOUSE, "testdb.db");
-    File tableDir = new File(dbDir, "loadtable");
-    int totalRecords = 100;
-    generateData(tableDir, totalRecords);
-
-    run(connection, "select * from loadtable");
-    run(connection, "set " + BlurSerDe.BLUR_BLOCKING_APPLY + "=true");
-    run(connection, "insert into table testtable select * from loadtable");
-    connection.close();
-    hiveServer2.stop();
-    return totalRecords;
-  }
-
-  private void writeSiteFiles(Configuration configuration) throws FileNotFoundException, IOException {
-    String name = BlurHiveMRLoaderOutputCommitter.MAPRED_SITE_XML;
-    if (miniCluster.useYarn()) {
-      name = BlurHiveMRLoaderOutputCommitter.YARN_SITE_XML;
-    }
-    String classPath = System.getProperty("java.class.path");
-    for (String path : Splitter.on(":").split(classPath)) {
-      File file = new File(path);
-      if (file.getName().equals("test-classes")) {
-        writeFile(new File(file, name), configuration);
-        return;
-      }
-    }
-  }
-
-  private void writeFile(File file, Configuration configuration) throws FileNotFoundException, IOException {
-    FileOutputStream outputStream = new FileOutputStream(file);
-    configuration.writeXml(outputStream);
-    outputStream.close();
-  }
-
-  private void generateData(File file, int totalRecords) throws IOException {
-    SimpleDateFormat simpleDateFormat = new SimpleDateFormat(YYYY_MM_DD);
-    file.mkdirs();
-    PrintWriter print = new PrintWriter(new File(file, "data"));
-    Date date = new Date(System.currentTimeMillis());
-    for (int i = 0; i < totalRecords; i++) {
-      // rowid
-      print.print("rowid" + i);
-      print.print(COLUMN_SEP);
-      // recordid
-      print.print("recordid" + i);
-      print.print(COLUMN_SEP);
-      {
-        // date_col_multi
-        print.print(simpleDateFormat.format(date));
-        print.print(ITEM_SEP);
-        print.print(simpleDateFormat.format(date));
-      }
-      print.print(COLUMN_SEP);
-      // date_col_single
-      print.print(simpleDateFormat.format(date));
-      print.print(COLUMN_SEP);
-      {
-        // double_col_multi
-        print.print("1.0");
-        print.print(ITEM_SEP);
-        print.print("2.0");
-      }
-      print.print(COLUMN_SEP);
-      // double_col_single
-      print.print("3.0");
-      print.print(COLUMN_SEP);
-
-      {
-        // float_col_multi
-        print.print("4.0");
-        print.print(ITEM_SEP);
-        print.print("5.0");
-      }
-      print.print(COLUMN_SEP);
-      // float_col_single
-      print.print("6.0");
-      print.print(COLUMN_SEP);
-
-      // geo_col_single
-      print.print("10.0");
-      print.print(ITEM_SEP);
-      print.print("10.0");
-      print.print(COLUMN_SEP);
-
-      {
-        // int_col_multi
-        print.print("1");
-        print.print(ITEM_SEP);
-        print.print("2");
-      }
-      print.print(COLUMN_SEP);
-      // int_col_single
-      print.print("3");
-      print.print(COLUMN_SEP);
-
-      {
-        // long_col_multi
-        print.print("4");
-        print.print(ITEM_SEP);
-        print.print("5");
-      }
-      print.print(COLUMN_SEP);
-      // long_col_single
-      print.print("6");
-      print.print(COLUMN_SEP);
-
-      {
-        // stored_col_multi
-        print.print("stored_1");
-        print.print(ITEM_SEP);
-        print.print("stored_2");
-      }
-      print.print(COLUMN_SEP);
-      // stored_col_single
-      print.print("stored_3");
-      print.print(COLUMN_SEP);
-
-      {
-        // string_col_multi
-        print.print("string_1");
-        print.print(ITEM_SEP);
-        print.print("string_2");
-      }
-      print.print(COLUMN_SEP);
-      // string_col_single
-      print.print("string_3");
-      print.print(COLUMN_SEP);
-
-      {
-        // text_col_multi
-        print.print("text_1");
-        print.print(ITEM_SEP);
-        print.print("text_2");
-      }
-      print.print(COLUMN_SEP);
-      // text_col_single
-      print.print("text_3");
-      print.println();
-    }
-    print.close();
-
-  }
-
-  private String buildCreateLoadTable(Connection connection) throws SQLException {
-    StringBuilder builder = new StringBuilder("create TABLE if not exists loadtable (");
-    Statement statement = connection.createStatement();
-    if (statement.execute("desc testtable")) {
-      ResultSet resultSet = statement.getResultSet();
-      boolean first = true;
-      while (resultSet.next()) {
-        if (!first) {
-          builder.append(", ");
-        }
-        Object name = resultSet.getObject(1);
-        Object type = resultSet.getObject(2);
-        builder.append(name.toString());
-        builder.append(' ');
-        builder.append(type.toString());
-        first = false;
-      }
-      builder.append(")");
-      return builder.toString();
-    }
-    throw new RuntimeException("Can't build create table script.");
-  }
-
-  public static void run(Connection connection, String sql) throws SQLException {
-    System.out.println("Running:" + sql);
-    Statement statement = connection.createStatement();
-    if (statement.execute(sql)) {
-      ResultSet resultSet = statement.getResultSet();
-      while (resultSet.next()) {
-        ResultSetMetaData metaData = resultSet.getMetaData();
-        int columnCount = metaData.getColumnCount();
-        for (int i = 1; i <= columnCount; i++) {
-          System.out.print(resultSet.getObject(i) + "\t");
-        }
-        System.out.println();
-      }
-    }
-    statement.close();
-  }
-
-  private List<String> list(String... sarray) {
-    List<String> list = new ArrayList<String>();
-    for (String s : sarray) {
-      list.add(s);
-    }
-    return list;
-  }
-
-  private Map<String, List<String>> toMap(List<BlurColumn> columns) {
-    Map<String, List<String>> map = new HashMap<String, List<String>>();
-    for (BlurColumn blurColumn : columns) {
-      String name = blurColumn.getName();
-      List<String> list = map.get(name);
-      if (list == null) {
-        map.put(name, list = new ArrayList<String>());
-      }
-      list.add(blurColumn.getValue());
-    }
-    return map;
-  }
-
-  @SuppressWarnings("resource")
-  private int waitForStartupAndGetPort(HiveServer2 hiveServer2) throws InterruptedException {
-    while (true) {
-      // thriftCLIService->server->serverTransport_->serverSocket_
-      Thread.sleep(100);
-      Object o1 = getObject(hiveServer2, "thriftCLIService");
-      if (o1 == null) {
-        continue;
-      }
-      Object o2 = getObject(o1, "server");
-      if (o2 == null) {
-        continue;
-      }
-      Object o3 = getObject(o2, "serverTransport_");
-      if (o3 == null) {
-        continue;
-      }
-      Object o4 = getObject(o3, "serverSocket_");
-      if (o4 == null) {
-        continue;
-      }
-      ServerSocket socket = (ServerSocket) o4;
-      return socket.getLocalPort();
-    }
-  }
-
-  private Object getObject(Object o, String field) {
-    return getObject(o, field, o.getClass());
-  }
-
-  private Object getObject(Object o, String field, Class<? extends Object> clazz) {
-    try {
-      Field declaredField = clazz.getDeclaredField(field);
-      return getObject(o, declaredField);
-    } catch (NoSuchFieldException e) {
-      return getObject(o, field, clazz.getSuperclass());
-    } catch (SecurityException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private Object getObject(Object o, Field field) {
-    field.setAccessible(true);
-    try {
-      return field.get(o);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTestIT.java
----------------------------------------------------------------------
diff --git a/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTestIT.java b/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTestIT.java
new file mode 100644
index 0000000..4e2484c
--- /dev/null
+++ b/blur-hive/src/test/java/org/apache/blur/hive/BlurSerDeTestIT.java
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.hive;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.mapreduce.lib.BlurColumn;
+import org.apache.blur.mapreduce.lib.BlurRecord;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.SuiteCluster;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.BlurQuery;
+import org.apache.blur.thrift.generated.BlurResults;
+import org.apache.blur.thrift.generated.ColumnDefinition;
+import org.apache.blur.thrift.generated.Query;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.jdbc.HiveDriver;
+import org.apache.hive.service.server.HiveServer2;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurSerDeTestIT {
+
+  public static final File WAREHOUSE = new File("./target/tmp/warehouse");
+  public static final String COLUMN_SEP = new String(new char[] { 1 });
+  public static final String ITEM_SEP = new String(new char[] { 2 });
+  public static final File DERBY_FILE = new File("derby.log");
+  public static final File METASTORE_DB_FILE = new File("metastore_db");
+
+  private static final String FAM = "fam0";
+  private static final String YYYYMMDD = "yyyyMMdd";
+  private static final String YYYY_MM_DD = "yyyy-MM-dd";
+  private static final String TEST = "test";
+
+  private static MiniCluster miniCluster;
+
+  @BeforeClass
+  public static void startup() throws IOException, BlurException, TException {
+    SuiteCluster.setupMiniCluster(BlurSerDeTestIT.class);
+    miniCluster = SuiteCluster.getMiniCluster();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    SuiteCluster.shutdownMiniCluster(BlurSerDeTestIT.class);
+  }
+
+  private String _mrWorkingPath;
+
+  @Before
+  public void setup() throws BlurException, TException, IOException {
+    _mrWorkingPath = miniCluster.getFileSystemUri().toString() + "/mrworkingpath";
+    String controllerConnectionStr = miniCluster.getControllerConnectionStr();
+    Iface client = BlurClient.getClient(controllerConnectionStr);
+    List<String> tableList = client.tableList();
+    if (!tableList.contains(TEST)) {
+      TableDescriptor tableDescriptor = new TableDescriptor();
+      tableDescriptor.setName(TEST);
+      tableDescriptor.setShardCount(1);
+      tableDescriptor.setTableUri(miniCluster.getFileSystemUri().toString() + "/blur/tables/test");
+      tableDescriptor.putToTableProperties(BlurConstants.BLUR_BULK_UPDATE_WORKING_PATH, _mrWorkingPath);
+
+      client.createTable(tableDescriptor);
+
+      Map<String, String> props = new HashMap<String, String>();
+      props.put("dateFormat", YYYYMMDD);
+
+      client.addColumnDefinition(TEST, cd(false, FAM, "string-col-single", "string"));
+      client.addColumnDefinition(TEST, cd(false, FAM, "text-col-single", "text"));
+      client.addColumnDefinition(TEST, cd(false, FAM, "stored-col-single", "stored"));
+      client.addColumnDefinition(TEST, cd(false, FAM, "double-col-single", "double"));
+      client.addColumnDefinition(TEST, cd(false, FAM, "float-col-single", "float"));
+      client.addColumnDefinition(TEST, cd(false, FAM, "long-col-single", "long"));
+      client.addColumnDefinition(TEST, cd(false, FAM, "int-col-single", "int"));
+      client.addColumnDefinition(TEST, cd(false, FAM, "date-col-single", "date", props));
+
+      client.addColumnDefinition(TEST, cd(false, FAM, "geo-col-single", "geo-pointvector"));
+
+      client.addColumnDefinition(TEST, cd(true, FAM, "string-col-multi", "string"));
+      client.addColumnDefinition(TEST, cd(true, FAM, "text-col-multi", "text"));
+      client.addColumnDefinition(TEST, cd(true, FAM, "stored-col-multi", "stored"));
+      client.addColumnDefinition(TEST, cd(true, FAM, "double-col-multi", "double"));
+      client.addColumnDefinition(TEST, cd(true, FAM, "float-col-multi", "float"));
+      client.addColumnDefinition(TEST, cd(true, FAM, "long-col-multi", "long"));
+      client.addColumnDefinition(TEST, cd(true, FAM, "int-col-multi", "int"));
+      client.addColumnDefinition(TEST, cd(true, FAM, "date-col-multi", "date", props));
+    }
+    rmr(WAREHOUSE);
+    rmr(METASTORE_DB_FILE);
+    rmr(DERBY_FILE);
+  }
+
+  @After
+  public void teardown() {
+    rmr(WAREHOUSE);
+    rmr(METASTORE_DB_FILE);
+    rmr(DERBY_FILE);
+  }
+
+  public static void rmr(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rmr(f);
+      }
+    }
+    file.delete();
+  }
+
+  private ColumnDefinition cd(boolean multiValue, String family, String columnName, String type) {
+    return cd(multiValue, family, columnName, type, null);
+  }
+
+  private ColumnDefinition cd(boolean multiValue, String family, String columnName, String type,
+      Map<String, String> props) {
+    ColumnDefinition columnDefinition = new ColumnDefinition(family, columnName, null, false, type, props, false);
+    columnDefinition.setMultiValueField(multiValue);
+    return columnDefinition;
+  }
+
+  @Test
+  public void test1() throws SerDeException {
+    long now = System.currentTimeMillis();
+    Date date = new Date(now);
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat(YYYYMMDD);
+    BlurSerDe blurSerDe = new BlurSerDe();
+
+    Configuration conf = new Configuration();
+    Properties tbl = new Properties();
+    tbl.put(BlurSerDe.TABLE, TEST);
+    tbl.put(BlurSerDe.FAMILY, FAM);
+    tbl.put(BlurSerDe.ZK, miniCluster.getZkConnectionString());
+
+    blurSerDe.initialize(conf, tbl);
+
+    ObjectInspector objectInspector = blurSerDe.getObjectInspector();
+    Object[] row = new Object[19];
+    int c = 0;
+    row[c++] = "rowid";
+    row[c++] = "recordid";
+    row[c++] = new Object[] { date, date };
+    row[c++] = date;
+    row[c++] = new Object[] { 1234.5678, 4321.5678 };
+    row[c++] = 1234.5678;
+    row[c++] = new Object[] { 1234.567f, 4321.567f };
+    row[c++] = 1234.567f;
+    row[c++] = new Object[] { 1.0f, 2.0f };
+    row[c++] = new Object[] { 12345678, 87654321 };
+    row[c++] = 12345678;
+    row[c++] = new Object[] { 12345678l, 87654321l };
+    row[c++] = 12345678l;
+    row[c++] = new Object[] { "stored input1", "stored input2" };
+    row[c++] = "stored input";
+    row[c++] = new Object[] { "string input1", "string input2" };
+    row[c++] = "string input";
+    row[c++] = new Object[] { "text input1", "text input2" };
+    row[c++] = "text input";
+
+    BlurRecord blurRecord = (BlurRecord) blurSerDe.serialize(row, objectInspector);
+    assertEquals("rowid", blurRecord.getRowId());
+    assertEquals("recordid", blurRecord.getRecordId());
+
+    Map<String, List<String>> columns = toMap(blurRecord.getColumns());
+
+    assertEquals(list("string input"), columns.get("string-col-single"));
+    assertEquals(list("string input1", "string input2"), columns.get("string-col-multi"));
+
+    assertEquals(list("text input"), columns.get("text-col-single"));
+    assertEquals(list("text input1", "text input2"), columns.get("text-col-multi"));
+
+    assertEquals(list("stored input"), columns.get("stored-col-single"));
+    assertEquals(list("stored input1", "stored input2"), columns.get("stored-col-multi"));
+
+    assertEquals(list("1234.5678"), columns.get("double-col-single"));
+    assertEquals(list("1234.5678", "4321.5678"), columns.get("double-col-multi"));
+
+    assertEquals(list("1234.567"), columns.get("float-col-single"));
+    assertEquals(list("1234.567", "4321.567"), columns.get("float-col-multi"));
+
+    assertEquals(list("12345678"), columns.get("long-col-single"));
+    assertEquals(list("12345678", "87654321"), columns.get("long-col-multi"));
+
+    assertEquals(list("12345678"), columns.get("int-col-single"));
+    assertEquals(list("12345678", "87654321"), columns.get("int-col-multi"));
+
+    assertEquals(list(simpleDateFormat.format(date)), columns.get("date-col-single"));
+    assertEquals(list(simpleDateFormat.format(date), simpleDateFormat.format(date)), columns.get("date-col-multi"));
+
+    assertEquals(list("1.0,2.0"), columns.get("geo-col-single"));
+  }
+
+  @Test
+  public void test2() throws SQLException, ClassNotFoundException, IOException, BlurException, TException,
+      InterruptedException {
+    int totalRecords = runLoad(true);
+    Iface client = BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
+    BlurQuery blurQuery = new BlurQuery();
+    Query query = new Query();
+    query.setQuery("*");
+    blurQuery.setQuery(query);
+    BlurResults results = client.query(TEST, blurQuery);
+    assertEquals(totalRecords, results.getTotalResults());
+  }
+
+  @Test
+  public void test3() throws Exception {
+    int totalRecords = runLoad(false);
+    Iface client = BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
+    BlurQuery blurQuery = new BlurQuery();
+    Query query = new Query();
+    query.setQuery("*");
+    blurQuery.setQuery(query);
+    BlurResults results = client.query(TEST, blurQuery);
+    assertEquals(totalRecords, results.getTotalResults());
+  }
+
+  private int runLoad(boolean disableMrUpdate) throws IOException, InterruptedException, ClassNotFoundException,
+      SQLException {
+
+    Configuration configuration = miniCluster.getMRConfiguration();
+    HiveConf hiveConf = new HiveConf(configuration, getClass());
+    hiveConf.set("hive.server2.thrift.port", "0");
+    HiveServer2 hiveServer2 = new HiveServer2();
+    hiveServer2.init(hiveConf);
+    hiveServer2.start();
+
+    int port = waitForStartupAndGetPort(hiveServer2);
+
+    Class.forName(HiveDriver.class.getName());
+    String userName = UserGroupInformation.getCurrentUser().getShortUserName();
+    Connection connection = DriverManager.getConnection("jdbc:hive2://localhost:" + port, userName, "");
+
+    run(connection, "set blur.mr.update.disabled=" + disableMrUpdate);
+    run(connection, "set hive.metastore.warehouse.dir=" + WAREHOUSE.toURI().toString());
+    run(connection, "create database if not exists testdb");
+    run(connection, "use testdb");
+
+    run(connection, "CREATE TABLE if not exists testtable ROW FORMAT SERDE 'org.apache.blur.hive.BlurSerDe' "
+        + "WITH SERDEPROPERTIES ( 'blur.zookeeper.connection'='" + miniCluster.getZkConnectionString() + "', "
+        + "'blur.table'='" + TEST + "', 'blur.family'='" + FAM + "' ) "
+        + "STORED BY 'org.apache.blur.hive.BlurHiveStorageHandler'");
+
+    run(connection, "desc testtable");
+
+    String createLoadTable = buildCreateLoadTable(connection);
+    run(connection, createLoadTable);
+    File dbDir = new File(WAREHOUSE, "testdb.db");
+    File tableDir = new File(dbDir, "loadtable");
+    int totalRecords = 100;
+    generateData(tableDir, totalRecords);
+
+    run(connection, "select * from loadtable");
+    run(connection, "set " + BlurSerDe.BLUR_BLOCKING_APPLY + "=true");
+    run(connection, "insert into table testtable select * from loadtable");
+    connection.close();
+    hiveServer2.stop();
+    return totalRecords;
+  }
+
+  private void generateData(File file, int totalRecords) throws IOException {
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat(YYYY_MM_DD);
+    file.mkdirs();
+    PrintWriter print = new PrintWriter(new File(file, "data"));
+    Date date = new Date(System.currentTimeMillis());
+    for (int i = 0; i < totalRecords; i++) {
+      // rowid
+      print.print("rowid" + i);
+      print.print(COLUMN_SEP);
+      // recordid
+      print.print("recordid" + i);
+      print.print(COLUMN_SEP);
+      {
+        // date_col_multi
+        print.print(simpleDateFormat.format(date));
+        print.print(ITEM_SEP);
+        print.print(simpleDateFormat.format(date));
+      }
+      print.print(COLUMN_SEP);
+      // date_col_single
+      print.print(simpleDateFormat.format(date));
+      print.print(COLUMN_SEP);
+      {
+        // double_col_multi
+        print.print("1.0");
+        print.print(ITEM_SEP);
+        print.print("2.0");
+      }
+      print.print(COLUMN_SEP);
+      // double_col_single
+      print.print("3.0");
+      print.print(COLUMN_SEP);
+
+      {
+        // float_col_multi
+        print.print("4.0");
+        print.print(ITEM_SEP);
+        print.print("5.0");
+      }
+      print.print(COLUMN_SEP);
+      // float_col_single
+      print.print("6.0");
+      print.print(COLUMN_SEP);
+
+      // geo_col_single
+      print.print("10.0");
+      print.print(ITEM_SEP);
+      print.print("10.0");
+      print.print(COLUMN_SEP);
+
+      {
+        // int_col_multi
+        print.print("1");
+        print.print(ITEM_SEP);
+        print.print("2");
+      }
+      print.print(COLUMN_SEP);
+      // int_col_single
+      print.print("3");
+      print.print(COLUMN_SEP);
+
+      {
+        // long_col_multi
+        print.print("4");
+        print.print(ITEM_SEP);
+        print.print("5");
+      }
+      print.print(COLUMN_SEP);
+      // long_col_single
+      print.print("6");
+      print.print(COLUMN_SEP);
+
+      {
+        // stored_col_multi
+        print.print("stored_1");
+        print.print(ITEM_SEP);
+        print.print("stored_2");
+      }
+      print.print(COLUMN_SEP);
+      // stored_col_single
+      print.print("stored_3");
+      print.print(COLUMN_SEP);
+
+      {
+        // string_col_multi
+        print.print("string_1");
+        print.print(ITEM_SEP);
+        print.print("string_2");
+      }
+      print.print(COLUMN_SEP);
+      // string_col_single
+      print.print("string_3");
+      print.print(COLUMN_SEP);
+
+      {
+        // text_col_multi
+        print.print("text_1");
+        print.print(ITEM_SEP);
+        print.print("text_2");
+      }
+      print.print(COLUMN_SEP);
+      // text_col_single
+      print.print("text_3");
+      print.println();
+    }
+    print.close();
+
+  }
+
+  private String buildCreateLoadTable(Connection connection) throws SQLException {
+    StringBuilder builder = new StringBuilder("create TABLE if not exists loadtable (");
+    Statement statement = connection.createStatement();
+    if (statement.execute("desc testtable")) {
+      ResultSet resultSet = statement.getResultSet();
+      boolean first = true;
+      while (resultSet.next()) {
+        if (!first) {
+          builder.append(", ");
+        }
+        Object name = resultSet.getObject(1);
+        Object type = resultSet.getObject(2);
+        builder.append(name.toString());
+        builder.append(' ');
+        builder.append(type.toString());
+        first = false;
+      }
+      builder.append(")");
+      return builder.toString();
+    }
+    throw new RuntimeException("Can't build create table script.");
+  }
+
+  public static void run(Connection connection, String sql) throws SQLException {
+    System.out.println("Running:" + sql);
+    Statement statement = connection.createStatement();
+    if (statement.execute(sql)) {
+      ResultSet resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        ResultSetMetaData metaData = resultSet.getMetaData();
+        int columnCount = metaData.getColumnCount();
+        for (int i = 1; i <= columnCount; i++) {
+          System.out.print(resultSet.getObject(i) + "\t");
+        }
+        System.out.println();
+      }
+    }
+    statement.close();
+  }
+
+  private List<String> list(String... sarray) {
+    List<String> list = new ArrayList<String>();
+    for (String s : sarray) {
+      list.add(s);
+    }
+    return list;
+  }
+
+  private Map<String, List<String>> toMap(List<BlurColumn> columns) {
+    Map<String, List<String>> map = new HashMap<String, List<String>>();
+    for (BlurColumn blurColumn : columns) {
+      String name = blurColumn.getName();
+      List<String> list = map.get(name);
+      if (list == null) {
+        map.put(name, list = new ArrayList<String>());
+      }
+      list.add(blurColumn.getValue());
+    }
+    return map;
+  }
+
+  @SuppressWarnings("resource")
+  private int waitForStartupAndGetPort(HiveServer2 hiveServer2) throws InterruptedException {
+    while (true) {
+      // thriftCLIService->server->serverTransport_->serverSocket_
+      Thread.sleep(100);
+      Object o1 = getObject(hiveServer2, "thriftCLIService");
+      if (o1 == null) {
+        continue;
+      }
+      Object o2 = getObject(o1, "server");
+      if (o2 == null) {
+        continue;
+      }
+      Object o3 = getObject(o2, "serverTransport_");
+      if (o3 == null) {
+        continue;
+      }
+      Object o4 = getObject(o3, "serverSocket_");
+      if (o4 == null) {
+        continue;
+      }
+      ServerSocket socket = (ServerSocket) o4;
+      return socket.getLocalPort();
+    }
+  }
+
+  private Object getObject(Object o, String field) {
+    return getObject(o, field, o.getClass());
+  }
+
+  private Object getObject(Object o, String field, Class<? extends Object> clazz) {
+    try {
+      Field declaredField = clazz.getDeclaredField(field);
+      return getObject(o, declaredField);
+    } catch (NoSuchFieldException e) {
+      return getObject(o, field, clazz.getSuperclass());
+    } catch (SecurityException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Object getObject(Object o, Field field) {
+    field.setAccessible(true);
+    try {
+      return field.get(o);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/blur-integration-tests/pom.xml b/blur-integration-tests/pom.xml
new file mode 100644
index 0000000..f48eb48
--- /dev/null
+++ b/blur-integration-tests/pom.xml
@@ -0,0 +1,331 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	you under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.blur</groupId>
+		<artifactId>blur</artifactId>
+		<version>0.2.4-incubating-SNAPSHOT</version>
+		<relativePath>../pom.xml</relativePath>
+	</parent>
+	<groupId>org.apache.blur</groupId>
+	<artifactId>blur-integration-tests</artifactId>
+	<version>${projectVersion}</version>
+	<packaging>jar</packaging>
+	<name>Blur Integration Tests</name>
+	<description>The Blur integration tests module contains integration tests that use the mini-cluster.</description>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-command</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-command</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-console</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-console</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-mapred</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-mapred</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-query</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-query</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-shell</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-shell</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-status</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-status</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-thrift</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-thrift</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-util</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.blur</groupId>
+			<artifactId>blur-util</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<repositories>
+		<repository>
+			<id>libdir</id>
+			<url>file://${basedir}/../lib</url>
+		</repository>
+	</repositories>
+
+	<build>
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-jar-plugin</artifactId>
+					<executions>
+						<execution>
+							<goals>
+								<goal>test-jar</goal>
+							</goals>
+						</execution>
+					</executions>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-source-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>attach-sources</id>
+						<goals>
+							<goal>jar</goal>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-help-plugin</artifactId>
+				<version>2.2</version>
+				<executions>
+					<execution>
+						<phase>generate-resources</phase>
+						<goals>
+							<goal>effective-pom</goal>
+						</goals>
+						<configuration>
+							<output>${project.build.directory}/effective-pom.xml</output>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-install-plugin</artifactId>
+				<version>2.3.1</version>
+				<executions>
+					<execution>
+						<phase>install</phase>
+						<goals>
+							<goal>install-file</goal>
+						</goals>
+						<configuration>
+							<file>${project.build.directory}/${artifactId}-${project.version}.jar</file>
+							<pomFile>${project.build.directory}/effective-pom.xml</pomFile>
+							<!-- sources></sources -->
+							<!-- javadoc></javadoc -->
+							<groupId>${project.groupId}</groupId>
+							<artifactId>${project.artifactId}</artifactId>
+							<version>${project.version}</version>
+							<packaging>jar</packaging>
+							<!--classifier></classifier -->
+							<generatePom>true</generatePom>
+							<createChecksum>true</createChecksum>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+			    <groupId>org.apache.maven.plugins</groupId>
+			    <artifactId>maven-surefire-plugin</artifactId>
+			    <configuration>
+			      <includes>
+			        <include>**/RunIntegrationTests.java</include>
+			      </includes>
+			    </configuration>
+			  </plugin>
+		</plugins>
+	</build>
+	
+	<profiles>
+		<profile>
+			<id>hadoop1</id>
+			<activation>
+				<property>
+					<name>hadoop1</name>
+				</property>
+			</activation>
+			<properties>
+				<projectHiveVersion>hadoop1-${hadoop.version}-${hive.version}-${project.parent.version}</projectHiveVersion>
+			</properties>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.blur</groupId>
+					<artifactId>blur-hive</artifactId>
+					<version>${projectHiveVersion}</version>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.blur</groupId>
+					<artifactId>blur-hive</artifactId>
+					<version>${projectHiveVersion}</version>
+					<type>test-jar</type>
+					<scope>test</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-test</artifactId>
+					<version>${hadoop.version}</version>
+					<scope>test</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<id>hadoop2-mr1</id>
+			<activation>
+				<property>
+					<name>hadoop2-mr1</name>
+				</property>
+			</activation>
+			<properties>
+				<projectHiveVersion>hadoop2-mr1-${hadoop.version}-${hive.version}-${project.parent.version}</projectHiveVersion>
+			</properties>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.blur</groupId>
+					<artifactId>blur-hive</artifactId>
+					<version>${projectHiveVersion}</version>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.blur</groupId>
+					<artifactId>blur-hive</artifactId>
+					<version>${projectHiveVersion}</version>
+					<type>test-jar</type>
+					<scope>test</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-client</artifactId>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<scope>test</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<id>hadoop2</id>
+			<activation>
+				<property>
+					<name>hadoop2</name>
+				</property>
+			</activation>
+			<properties>
+				<projectHiveVersion>hadoop2-${hadoop.version}-${hive.version}-${project.parent.version}</projectHiveVersion>
+			</properties>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.blur</groupId>
+					<artifactId>blur-hive</artifactId>
+					<version>${projectHiveVersion}</version>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.blur</groupId>
+					<artifactId>blur-hive</artifactId>
+					<version>${projectHiveVersion}</version>
+					<type>test-jar</type>
+					<scope>test</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-client</artifactId>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<scope>test</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-integration-tests/src/test/java/org/apache/blur/tests/RunIntegrationTests.java
----------------------------------------------------------------------
diff --git a/blur-integration-tests/src/test/java/org/apache/blur/tests/RunIntegrationTests.java b/blur-integration-tests/src/test/java/org/apache/blur/tests/RunIntegrationTests.java
new file mode 100644
index 0000000..c6d7df4
--- /dev/null
+++ b/blur-integration-tests/src/test/java/org/apache/blur/tests/RunIntegrationTests.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.tests;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.analysis.ThriftFieldManagerTestIT;
+import org.apache.blur.command.ReconnectWhileCommandIsRunningIntTestsIT;
+import org.apache.blur.hive.BlurSerDeTestIT;
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatusTestIT;
+import org.apache.blur.manager.indexserver.MasterBasedDistributedLayoutFactoryTestIT;
+import org.apache.blur.manager.indexserver.SafeModeTestIT;
+import org.apache.blur.manager.writer.SharedMergeSchedulerThroughputTestIT;
+import org.apache.blur.mapreduce.lib.BlurInputFormatTestIT;
+import org.apache.blur.mapreduce.lib.BlurOutputFormatMiniClusterTestIT;
+import org.apache.blur.mapreduce.lib.BlurOutputFormatTestIT;
+import org.apache.blur.mapreduce.lib.update.DriverTestIT;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClientTestMultipleQuorumsIT;
+import org.apache.blur.thrift.BlurClusterTestNoSecurityIT;
+import org.apache.blur.thrift.BlurClusterTestSecurityIT;
+import org.apache.blur.thrift.FacetTestsIT;
+import org.apache.blur.thrift.SuiteCluster;
+import org.apache.blur.thrift.TermsTestsIT;
+import org.apache.blur.thrift.ThriftServerTestIT;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.junit.runners.Suite.SuiteClasses;
+
+@RunWith(Suite.class)
+@SuiteClasses({ ThriftFieldManagerTestIT.class, ReconnectWhileCommandIsRunningIntTestsIT.class,
+    ZookeeperClusterStatusTestIT.class, MasterBasedDistributedLayoutFactoryTestIT.class, SafeModeTestIT.class,
+    SharedMergeSchedulerThroughputTestIT.class, BlurClientTestMultipleQuorumsIT.class,
+    BlurClusterTestNoSecurityIT.class, BlurClusterTestSecurityIT.class, FacetTestsIT.class, TermsTestsIT.class,
+    ThriftServerTestIT.class, BlurSerDeTestIT.class, DriverTestIT.class, BlurInputFormatTestIT.class,
+    BlurOutputFormatMiniClusterTestIT.class, BlurOutputFormatTestIT.class })
+public class RunIntegrationTests {
+
+  @BeforeClass
+  public static void startup() throws IOException, BlurException, TException {
+    SuiteCluster.setupMiniCluster(RunIntegrationTests.class);
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    SuiteCluster.shutdownMiniCluster(RunIntegrationTests.class);
+  }
+  
+  @After
+  public void tearDown() throws BlurException, TException, IOException {
+    Iface client = SuiteCluster.getClient();
+    List<String> tableList = client.tableList();
+    for (String table : tableList) {
+      client.disableTable(table);
+      client.removeTable(table, true);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
deleted file mode 100644
index f7d5d1e..0000000
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.blur.mapreduce.lib;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.blur.MiniCluster;
-import org.apache.blur.store.buffer.BufferStore;
-import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thrift.BlurClient;
-import org.apache.blur.thrift.generated.Blur.Iface;
-import org.apache.blur.thrift.generated.BlurException;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.ColumnDefinition;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.RecordMutation;
-import org.apache.blur.thrift.generated.RecordMutationType;
-import org.apache.blur.thrift.generated.RowMutation;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class BlurInputFormatTest {
-
-  private static Configuration conf = new Configuration();
-  private static MiniCluster miniCluster;
-
-  @BeforeClass
-  public static void setupTest() throws Exception {
-    setupJavaHome();
-    File file = new File("./target/tmp/BlurInputFormatTest_tmp");
-    String pathStr = file.getAbsoluteFile().toURI().toString();
-    System.setProperty("test.build.data", pathStr + "/data");
-    System.setProperty("hadoop.log.dir", pathStr + "/hadoop_log");
-    miniCluster = new MiniCluster();
-    miniCluster.startBlurCluster(pathStr + "/blur", 2, 2);
-    miniCluster.startMrMiniCluster();
-    conf = miniCluster.getMRConfiguration();
-
-    BufferStore.initNewBuffer(128, 128 * 128);
-  }
-
-  public static void setupJavaHome() {
-    String str = System.getenv("JAVA_HOME");
-    if (str == null) {
-      String property = System.getProperty("java.home");
-      if (property != null) {
-        throw new RuntimeException("JAVA_HOME not set should probably be [" + property + "].");
-      }
-      throw new RuntimeException("JAVA_HOME not set.");
-    }
-  }
-
-  @AfterClass
-  public static void teardown() throws IOException {
-    if (miniCluster != null) {
-      miniCluster.stopMrMiniCluster();
-    }
-    rm(new File("build"));
-  }
-
-  private static void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Test
-  public void testBlurInputFormatFastDisabledNoFileCache() throws IOException, BlurException, TException,
-      ClassNotFoundException, InterruptedException {
-    String tableName = "testBlurInputFormatFastDisabledNoFileCache";
-    runTest(tableName, true, null);
-  }
-
-  @Test
-  public void testBlurInputFormatFastEnabledNoFileCache() throws IOException, BlurException, TException,
-      ClassNotFoundException, InterruptedException {
-    String tableName = "testBlurInputFormatFastEnabledNoFileCache";
-    runTest(tableName, false, null);
-  }
-
-  @Test
-  public void testBlurInputFormatFastDisabledFileCache() throws IOException, BlurException, TException,
-      ClassNotFoundException, InterruptedException {
-    String tableName = "testBlurInputFormatFastDisabledFileCache";
-    Path fileCache = new Path(miniCluster.getFileSystemUri() + "/filecache");
-    runTest(tableName, true, fileCache);
-    FileSystem fileSystem = miniCluster.getFileSystem();
-    // @TODO write some assertions.
-    // RemoteIterator<LocatedFileStatus> listFiles =
-    // fileSystem.listFiles(fileCache, true);
-    // while (listFiles.hasNext()) {
-    // LocatedFileStatus locatedFileStatus = listFiles.next();
-    // System.out.println(locatedFileStatus.getPath());
-    // }
-  }
-
-  @Test
-  public void testBlurInputFormatFastEnabledFileCache() throws IOException, BlurException, TException,
-      ClassNotFoundException, InterruptedException {
-    String tableName = "testBlurInputFormatFastEnabledFileCache";
-    Path fileCache = new Path(miniCluster.getFileSystemUri() + "/filecache");
-    runTest(tableName, false, fileCache);
-    FileSystem fileSystem = miniCluster.getFileSystem();
-    // @TODO write some assertions.
-    // RemoteIterator<LocatedFileStatus> listFiles =
-    // fileSystem.listFiles(fileCache, true);
-    // while (listFiles.hasNext()) {
-    // LocatedFileStatus locatedFileStatus = listFiles.next();
-    // System.out.println(locatedFileStatus.getPath());
-    // }
-  }
-
-  private void runTest(String tableName, boolean disableFast, Path fileCache) throws IOException, BlurException,
-      TException, InterruptedException, ClassNotFoundException {
-    FileSystem fileSystem = miniCluster.getFileSystem();
-    Path root = new Path(fileSystem.getUri() + "/");
-
-    creatTable(tableName, new Path(root, "tables"), disableFast);
-    loadTable(tableName, 100, 100);
-
-    Iface client = getClient();
-
-    TableDescriptor tableDescriptor = client.describe(tableName);
-
-    Job job = Job.getInstance(conf, "Read Data");
-    job.setJarByClass(BlurInputFormatTest.class);
-    job.setMapperClass(TestMapper.class);
-    job.setInputFormatClass(BlurInputFormat.class);
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    job.setNumReduceTasks(0);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(TableBlurRecord.class);
-
-    Path output = new Path(new Path(root, "output"), tableName);
-
-    String snapshot = UUID.randomUUID().toString();
-    client.createSnapshot(tableName, snapshot);
-
-    if (fileCache != null) {
-      BlurInputFormat.setLocalCachePath(job, fileCache);
-    }
-
-    BlurInputFormat.addTable(job, tableDescriptor, snapshot);
-    FileOutputFormat.setOutputPath(job, output);
-
-    try {
-      assertTrue(job.waitForCompletion(true));
-    } finally {
-      client.removeSnapshot(tableName, snapshot);
-    }
-
-    final Map<Text, TableBlurRecord> results = new TreeMap<Text, TableBlurRecord>();
-    walkOutput(output, conf, new ResultReader() {
-      @Override
-      public void read(Text rowId, TableBlurRecord tableBlurRecord) {
-        results.put(new Text(rowId), new TableBlurRecord(tableBlurRecord));
-      }
-    });
-    int rowId = 100;
-    for (Entry<Text, TableBlurRecord> e : results.entrySet()) {
-      Text r = e.getKey();
-      assertEquals(new Text("row-" + rowId), r);
-      BlurRecord blurRecord = new BlurRecord();
-      blurRecord.setRowId("row-" + rowId);
-      blurRecord.setRecordId("record-" + rowId);
-      blurRecord.setFamily("fam0");
-      blurRecord.addColumn("col0", "value-" + rowId);
-      TableBlurRecord tableBlurRecord = new TableBlurRecord(new Text(tableName), blurRecord);
-      assertEquals(tableBlurRecord, e.getValue());
-
-      rowId++;
-    }
-    assertEquals(200, rowId);
-  }
-
-  public interface ResultReader {
-
-    void read(Text rowId, TableBlurRecord tableBlurRecord);
-
-  }
-
-  private void walkOutput(Path output, Configuration conf, ResultReader resultReader) throws IOException {
-    FileSystem fileSystem = output.getFileSystem(conf);
-    FileStatus fileStatus = fileSystem.getFileStatus(output);
-    if (fileStatus.isDir()) {
-      FileStatus[] listStatus = fileSystem.listStatus(output, new PathFilter() {
-        @Override
-        public boolean accept(Path path) {
-          return !path.getName().startsWith("_");
-        }
-      });
-      for (FileStatus fs : listStatus) {
-        walkOutput(fs.getPath(), conf, resultReader);
-      }
-    } else {
-      Reader reader = new SequenceFile.Reader(fileSystem, output, conf);
-      Text rowId = new Text();
-      TableBlurRecord tableBlurRecord = new TableBlurRecord();
-      while (reader.next(rowId, tableBlurRecord)) {
-        resultReader.read(rowId, tableBlurRecord);
-      }
-      reader.close();
-    }
-  }
-
-  private Iface getClient() {
-    return BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
-  }
-
-  private void loadTable(String tableName, int startId, int numb) throws BlurException, TException {
-    Iface client = getClient();
-    List<RowMutation> batch = new ArrayList<RowMutation>();
-    for (int i = 0; i < numb; i++) {
-      int id = startId + i;
-      RowMutation rowMutation = new RowMutation();
-      rowMutation.setTable(tableName);
-      rowMutation.setRowId("row-" + Integer.toString(id));
-      Record record = new Record();
-      record.setFamily("fam0");
-      record.setRecordId("record-" + id);
-      record.addToColumns(new Column("col0", "value-" + id));
-      rowMutation.addToRecordMutations(new RecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD, record));
-      batch.add(rowMutation);
-    }
-    client.mutateBatch(batch);
-  }
-
-  private void creatTable(String tableName, Path tables, boolean fastDisable) throws BlurException, TException {
-    Path tablePath = new Path(tables, tableName);
-    Iface client = getClient();
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setTableUri(tablePath.toString());
-    tableDescriptor.setName(tableName);
-    tableDescriptor.setShardCount(2);
-    tableDescriptor.putToTableProperties(BlurConstants.BLUR_TABLE_DISABLE_FAST_DIR, Boolean.toString(fastDisable));
-    client.createTable(tableDescriptor);
-
-    ColumnDefinition colDef = new ColumnDefinition();
-    colDef.setFamily("fam0");
-    colDef.setColumnName("col0");
-    colDef.setFieldType("string");
-    client.addColumnDefinition(tableName, colDef);
-  }
-
-  public static class TestMapper extends Mapper<Text, TableBlurRecord, Text, TableBlurRecord> {
-    @Override
-    protected void map(Text key, TableBlurRecord value, Context context) throws IOException, InterruptedException {
-      context.write(key, value);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/d8756092/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTestIT.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTestIT.java b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTestIT.java
new file mode 100644
index 0000000..b052731
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTestIT.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.SuiteCluster;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.ColumnDefinition;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.RecordMutationType;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurInputFormatTestIT {
+
+  private static MiniCluster miniCluster;
+
+  @BeforeClass
+  public static void startup() throws IOException, BlurException, TException {
+    SuiteCluster.setupMiniCluster(BlurInputFormatTestIT.class);
+    miniCluster = SuiteCluster.getMiniCluster();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    SuiteCluster.shutdownMiniCluster(BlurInputFormatTestIT.class);
+  }
+
+  public static void setupJavaHome() {
+    String str = System.getenv("JAVA_HOME");
+    if (str == null) {
+      String property = System.getProperty("java.home");
+      if (property != null) {
+        throw new RuntimeException("JAVA_HOME not set should probably be [" + property + "].");
+      }
+      throw new RuntimeException("JAVA_HOME not set.");
+    }
+  }
+
+  @AfterClass
+  public static void teardown() throws IOException {
+    if (miniCluster != null) {
+      miniCluster.shutdownMrMiniCluster();
+    }
+    rm(new File("build"));
+  }
+
+  private static void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Test
+  public void testBlurInputFormatFastDisabledNoFileCache() throws IOException, BlurException, TException,
+      ClassNotFoundException, InterruptedException {
+    String tableName = "testBlurInputFormatFastDisabledNoFileCache";
+    runTest(tableName, true, null);
+  }
+
+  @Test
+  public void testBlurInputFormatFastEnabledNoFileCache() throws IOException, BlurException, TException,
+      ClassNotFoundException, InterruptedException {
+    String tableName = "testBlurInputFormatFastEnabledNoFileCache";
+    runTest(tableName, false, null);
+  }
+
+  @Test
+  public void testBlurInputFormatFastDisabledFileCache() throws IOException, BlurException, TException,
+      ClassNotFoundException, InterruptedException {
+    String tableName = "testBlurInputFormatFastDisabledFileCache";
+    Path fileCache = new Path(miniCluster.getFileSystemUri() + "/filecache");
+    runTest(tableName, true, fileCache);
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    // @TODO write some assertions.
+    // RemoteIterator<LocatedFileStatus> listFiles =
+    // fileSystem.listFiles(fileCache, true);
+    // while (listFiles.hasNext()) {
+    // LocatedFileStatus locatedFileStatus = listFiles.next();
+    // System.out.println(locatedFileStatus.getPath());
+    // }
+  }
+
+  @Test
+  public void testBlurInputFormatFastEnabledFileCache() throws IOException, BlurException, TException,
+      ClassNotFoundException, InterruptedException {
+    String tableName = "testBlurInputFormatFastEnabledFileCache";
+    Path fileCache = new Path(miniCluster.getFileSystemUri() + "/filecache");
+    runTest(tableName, false, fileCache);
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    // @TODO write some assertions.
+    // RemoteIterator<LocatedFileStatus> listFiles =
+    // fileSystem.listFiles(fileCache, true);
+    // while (listFiles.hasNext()) {
+    // LocatedFileStatus locatedFileStatus = listFiles.next();
+    // System.out.println(locatedFileStatus.getPath());
+    // }
+  }
+
+  private void runTest(String tableName, boolean disableFast, Path fileCache) throws IOException, BlurException,
+      TException, InterruptedException, ClassNotFoundException {
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    Path root = new Path(fileSystem.getUri() + "/");
+
+    creatTable(tableName, new Path(root, "tables"), disableFast);
+    loadTable(tableName, 100, 100);
+
+    Iface client = getClient();
+
+    TableDescriptor tableDescriptor = client.describe(tableName);
+
+    Job job = Job.getInstance(getMrConf(), "Read Data");
+    job.setJarByClass(BlurInputFormatTestIT.class);
+    job.setMapperClass(TestMapper.class);
+    job.setInputFormatClass(BlurInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(TableBlurRecord.class);
+
+    Path output = new Path(new Path(root, "output"), tableName);
+
+    String snapshot = UUID.randomUUID().toString();
+    client.createSnapshot(tableName, snapshot);
+
+    if (fileCache != null) {
+      BlurInputFormat.setLocalCachePath(job, fileCache);
+    }
+
+    BlurInputFormat.addTable(job, tableDescriptor, snapshot);
+    FileOutputFormat.setOutputPath(job, output);
+
+    try {
+      assertTrue(job.waitForCompletion(true));
+    } finally {
+      client.removeSnapshot(tableName, snapshot);
+    }
+
+    final Map<Text, TableBlurRecord> results = new TreeMap<Text, TableBlurRecord>();
+    walkOutput(output, getMrConf(), new ResultReader() {
+      @Override
+      public void read(Text rowId, TableBlurRecord tableBlurRecord) {
+        results.put(new Text(rowId), new TableBlurRecord(tableBlurRecord));
+      }
+    });
+    int rowId = 100;
+    for (Entry<Text, TableBlurRecord> e : results.entrySet()) {
+      Text r = e.getKey();
+      assertEquals(new Text("row-" + rowId), r);
+      BlurRecord blurRecord = new BlurRecord();
+      blurRecord.setRowId("row-" + rowId);
+      blurRecord.setRecordId("record-" + rowId);
+      blurRecord.setFamily("fam0");
+      blurRecord.addColumn("col0", "value-" + rowId);
+      TableBlurRecord tableBlurRecord = new TableBlurRecord(new Text(tableName), blurRecord);
+      assertEquals(tableBlurRecord, e.getValue());
+
+      rowId++;
+    }
+    assertEquals(200, rowId);
+  }
+
+  private Configuration getMrConf() {
+    return miniCluster.getMRConfiguration();
+  }
+
+  public interface ResultReader {
+
+    void read(Text rowId, TableBlurRecord tableBlurRecord);
+
+  }
+
+  private void walkOutput(Path output, Configuration conf, ResultReader resultReader) throws IOException {
+    FileSystem fileSystem = output.getFileSystem(conf);
+    FileStatus fileStatus = fileSystem.getFileStatus(output);
+    if (fileStatus.isDir()) {
+      FileStatus[] listStatus = fileSystem.listStatus(output, new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          return !path.getName().startsWith("_");
+        }
+      });
+      for (FileStatus fs : listStatus) {
+        walkOutput(fs.getPath(), conf, resultReader);
+      }
+    } else {
+      Reader reader = new SequenceFile.Reader(fileSystem, output, conf);
+      Text rowId = new Text();
+      TableBlurRecord tableBlurRecord = new TableBlurRecord();
+      while (reader.next(rowId, tableBlurRecord)) {
+        resultReader.read(rowId, tableBlurRecord);
+      }
+      reader.close();
+    }
+  }
+
+  private Iface getClient() {
+    return BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
+  }
+
+  private void loadTable(String tableName, int startId, int numb) throws BlurException, TException {
+    Iface client = getClient();
+    List<RowMutation> batch = new ArrayList<RowMutation>();
+    for (int i = 0; i < numb; i++) {
+      int id = startId + i;
+      RowMutation rowMutation = new RowMutation();
+      rowMutation.setTable(tableName);
+      rowMutation.setRowId("row-" + Integer.toString(id));
+      Record record = new Record();
+      record.setFamily("fam0");
+      record.setRecordId("record-" + id);
+      record.addToColumns(new Column("col0", "value-" + id));
+      rowMutation.addToRecordMutations(new RecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD, record));
+      batch.add(rowMutation);
+    }
+    client.mutateBatch(batch);
+  }
+
+  private void creatTable(String tableName, Path tables, boolean fastDisable) throws BlurException, TException {
+    Path tablePath = new Path(tables, tableName);
+    Iface client = getClient();
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setTableUri(tablePath.toString());
+    tableDescriptor.setName(tableName);
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.putToTableProperties(BlurConstants.BLUR_TABLE_DISABLE_FAST_DIR, Boolean.toString(fastDisable));
+    client.createTable(tableDescriptor);
+
+    ColumnDefinition colDef = new ColumnDefinition();
+    colDef.setFamily("fam0");
+    colDef.setColumnName("col0");
+    colDef.setFieldType("string");
+    client.addColumnDefinition(tableName, colDef);
+  }
+
+  public static class TestMapper extends Mapper<Text, TableBlurRecord, Text, TableBlurRecord> {
+    @Override
+    protected void map(Text key, TableBlurRecord value, Context context) throws IOException, InterruptedException {
+      context.write(key, value);
+    }
+  }
+
+}


Mime
View raw message