incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [4/5] git commit: Fixing BLUR-397.
Date Mon, 15 Dec 2014 15:38:45 GMT
Fixing BLUR-397.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/22200a3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/22200a3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/22200a3a

Branch: refs/heads/master
Commit: 22200a3a9008f614d7216c8ffaef55fe3e43c7c5
Parents: 25ad5d4
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Dec 15 10:38:17 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Dec 15 10:38:17 2014 -0500

----------------------------------------------------------------------
 .../apache/blur/server/FilteredBlurServer.java  |  5 ++
 .../org/apache/blur/server/TableContext.java    | 82 ++++++++++++++++++++
 .../java/org/apache/blur/thrift/TableAdmin.java | 10 +++
 .../blur/mapreduce/lib/BlurOutputFormat.java    |  7 +-
 .../blur/mapreduce/lib/CheckOutputSpecs.java    | 19 ++---
 .../lib/BlurOutputFormatMiniClusterTest.java    |  5 ++
 .../mapreduce/lib/BlurOutputFormatTest.java     | 33 ++++----
 .../blur/mapreduce/lib/BlurOutputFormat.java    |  7 +-
 .../blur/mapreduce/lib/CheckOutputSpecs.java    | 15 ++--
 .../lib/BlurOutputFormatMiniClusterTest.java    |  5 ++
 .../mapreduce/lib/BlurOutputFormatTest.java     | 69 ++++++++++------
 docs/Blur.html                                  | 11 +++
 12 files changed, 203 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
index 9318f95..d561560 100644
--- a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
+++ b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
@@ -286,4 +286,9 @@ public class FilteredBlurServer implements Iface {
     _iface.commandCancel(commandExecutionId);
   }
 
+  @Override
+  public void loadData(String table, String location) throws BlurException, TException {
+    _iface.loadData(table, location);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/TableContext.java b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
index b56bf40..5dbd012 100644
--- a/blur-core/src/main/java/org/apache/blur/server/TableContext.java
+++ b/blur-core/src/main/java/org/apache/blur/server/TableContext.java
@@ -51,12 +51,18 @@ import org.apache.blur.manager.writer.BlurIndexCloser;
 import org.apache.blur.manager.writer.BlurIndexSimpleWriter;
 //import org.apache.blur.manager.writer.BlurNRTIndex;
 import org.apache.blur.manager.writer.SharedMergeScheduler;
+import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.ScoreType;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
 import org.apache.lucene.index.Term;
@@ -371,4 +377,80 @@ public class TableContext implements Cloneable {
     }
   }
 
+  public void loadData(String location) throws IOException {
+    Path path = new Path(location);
+    FileSystem fileSystem = path.getFileSystem(_configuration);
+
+    validateLoad(path, fileSystem);
+
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus fileStatus : listStatus) {
+      loadShard(fileStatus.getPath(), fileSystem);
+    }
+
+    // printFS(path, fileSystem);
+
+  }
+
+  private void validateLoad(Path path, FileSystem fileSystem) throws IOException {
+    TableDescriptor descriptor = getDescriptor();
+    int shardCount = descriptor.getShardCount();
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    int count = 0;
+    for (FileStatus fileStatus : listStatus) {
+      Path shardPath = fileStatus.getPath();
+      String shardId = shardPath.getName();
+      int shardIndex = BlurUtil.getShardIndex(shardId);
+      if (shardIndex >= shardCount) {
+        throw new IOException("Too many shards [" + shardIndex + "].");
+      }
+      count++;
+      validateIndexesExist(shardPath, fileSystem);
+    }
+    if (shardCount != count) {
+      throw new IOException("Not enough shards [" + count + "] should be [" + shardCount
+ "].");
+    }
+  }
+
+  private void validateIndexesExist(Path shardPath, FileSystem fileSystem) throws IOException
{
+    FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".commit");
+      }
+    });
+    for (FileStatus fileStatus : listStatus) {
+      Path path = fileStatus.getPath();
+      HdfsDirectory directory = new HdfsDirectory(_configuration, path);
+      try {
+        if (!DirectoryReader.indexExists(directory)) {
+          throw new IOException("Path [" + path + "] is not a valid index.");
+        }
+      } finally {
+        directory.close();
+      }
+    }
+  }
+
+  private void loadShard(Path newLoadShardPath, FileSystem fileSystem) throws IOException
{
+    Path tablePath = getTablePath();
+    Path shardPath = new Path(tablePath, newLoadShardPath.getName());
+    FileStatus[] listStatus = fileSystem.listStatus(newLoadShardPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().endsWith(".commit");
+      }
+    });
+
+    for (FileStatus fileStatus : listStatus) {
+      Path src = fileStatus.getPath();
+      Path dst = new Path(shardPath, src.getName());
+      if (fileSystem.rename(src, dst)) {
+        LOG.info("Successfully moved [{0}] to [{1}].", src, dst);
+      } else {
+        LOG.info("Could not move [{0}] to [{1}].", src, dst);
+        throw new IOException("Could not move [" + src + "] to [" + dst + "].");
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
index d221774..7583160 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/TableAdmin.java
@@ -696,4 +696,14 @@ public abstract class TableAdmin implements Iface {
     }
   }
 
+  @Override
+  public void loadData(String table, String location) throws BlurException, TException {
+    TableContext tableContext = getTableContext(table);
+    try {
+      tableContext.loadData(location);
+    } catch (IOException e) {
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index bf68e66..da1c3cf 100644
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -204,7 +204,6 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     }
     transport.close();
     configuration.set(BLUR_TABLE_DESCRIPTOR, new String(outputStream.toByteArray()));
-    setOutputPath(configuration, new Path(tableDescriptor.getTableUri()));
   }
 
   /**
@@ -279,7 +278,11 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
   }
 
   public static Path getOutputPath(Configuration configuration) {
-    return new Path(configuration.get(BLUR_OUTPUT_PATH));
+    String pathString = configuration.get(BLUR_OUTPUT_PATH);
+    if (pathString == null) {
+      return null;
+    }
+    return new Path(pathString);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
index 6bbaad4..175274d 100644
--- a/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
+++ b/blur-mapred-hadoop1/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
@@ -21,26 +21,21 @@ import java.io.IOException;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 public class CheckOutputSpecs {
-  
+
   public static void checkOutputSpecs(Configuration config, int reducers) throws IOException,
InterruptedException {
     TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(config);
     if (tableDescriptor == null) {
       throw new IOException("setTableDescriptor needs to be called first.");
     }
-    int shardCount = tableDescriptor.getShardCount();
-    FileSystem fileSystem = BlurOutputFormat.getOutputPath(config).getFileSystem(config);
-    Path tablePath = new Path(tableDescriptor.getTableUri());
-    if (fileSystem.exists(tablePath)) {
-      BlurUtil.validateShardCount(shardCount, fileSystem, tablePath);
-    } else {
-      throw new IOException("Table path [ " + tablePath + " ] doesn't exist for table [ "
+ tableDescriptor.getName()
-          + " ].");
+    Path outputPath = BlurOutputFormat.getOutputPath(config);
+    if (outputPath == null) {
+      throw new IOException("Output path is not set.");
     }
-    BlurUtil.validateWritableDirectory(fileSystem, tablePath);
+    BlurUtil.validateWritableDirectory(outputPath.getFileSystem(config), outputPath);
+    int shardCount = tableDescriptor.getShardCount();
     int reducerMultiplier = BlurOutputFormat.getReducerMultiplier(config);
     int validNumberOfReducers = reducerMultiplier * shardCount;
     if (reducers > 0 && reducers != validNumberOfReducers) {
@@ -48,5 +43,5 @@ public class CheckOutputSpecs {
           + " Number of Reducers should be [ " + validNumberOfReducers + " ].");
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
index c14e86e..f2a53d7 100644
--- a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
+++ b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
@@ -158,6 +158,9 @@ public class BlurOutputFormatMiniClusterTest {
     client.createTable(tableDescriptor);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
+    
     Path tablePath = new Path(tableUri);
     Path shardPath = new Path(tablePath, BlurUtil.getShardName(0));
     FileStatus[] listStatus = fileSystem.listStatus(shardPath);
@@ -170,6 +173,8 @@ public class BlurOutputFormatMiniClusterTest {
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
+    
+    client.loadData(tableName, output.toString());
 
     while (true) {
       TableStats tableStats = client.tableStats(tableName);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index d097e6c..a2937ec 100644
--- a/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/blur-mapred-hadoop1/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -125,23 +125,24 @@ public class BlurOutputFormatTest {
     job.setInputFormatClass(TextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
     tableDescriptor.setName("test");
 
     createShardDirectories(outDir, 1);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
 
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
 
-    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Path path = new Path(output, BlurUtil.getShardName(0));
     Collection<Path> commitedTasks = getCommitedTasks(path);
     assertEquals(1, commitedTasks.size());
     DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
@@ -174,17 +175,18 @@ public class BlurOutputFormatTest {
     job.setInputFormatClass(TextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
     tableDescriptor.setName("test");
 
     createShardDirectories(outDir, 1);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
     BlurOutputFormat.setIndexLocally(job, true);
     BlurOutputFormat.setOptimizeInFlight(job, false);
 
@@ -192,7 +194,7 @@ public class BlurOutputFormatTest {
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
 
-    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Path path = new Path(output, BlurUtil.getShardName(0));
     Collection<Path> commitedTasks = getCommitedTasks(path);
     assertEquals(1, commitedTasks.size());
 
@@ -243,17 +245,18 @@ public class BlurOutputFormatTest {
     job.setInputFormatClass(TextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(2);
-    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
     tableDescriptor.setName("test");
 
     createShardDirectories(outDir, 2);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
     BlurOutputFormat.setIndexLocally(job, false);
     BlurOutputFormat.setDocumentBufferStrategy(job, DocumentBufferStrategyHeapSize.class);
     BlurOutputFormat.setMaxDocumentBufferHeapSize(job, 128 * 1024);
@@ -264,7 +267,7 @@ public class BlurOutputFormatTest {
 
     long total = 0;
     for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Path path = new Path(output, BlurUtil.getShardName(i));
       Collection<Path> commitedTasks = getCommitedTasks(path);
       assertEquals(1, commitedTasks.size());
 
@@ -289,17 +292,18 @@ public class BlurOutputFormatTest {
     job.setInputFormatClass(TextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(7);
-    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
     tableDescriptor.setName("test");
 
     createShardDirectories(outDir, 7);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
     int multiple = 2;
     BlurOutputFormat.setReducerMultiplier(job, multiple);
 
@@ -309,7 +313,7 @@ public class BlurOutputFormatTest {
 
     long total = 0;
     for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Path path = new Path(output, BlurUtil.getShardName(i));
       Collection<Path> commitedTasks = getCommitedTasks(path);
       assertTrue(multiple >= commitedTasks.size());
       for (Path p : commitedTasks) {
@@ -335,17 +339,18 @@ public class BlurOutputFormatTest {
     job.setInputFormatClass(TextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
     tableDescriptor.setName("test");
 
     createShardDirectories(outDir, 1);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
     BlurOutputFormat.setReducerMultiplier(job, 2);
     job.setNumReduceTasks(4);
     job.submit();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
index 00c163a..52900cf 100644
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/BlurOutputFormat.java
@@ -205,7 +205,6 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
     }
     transport.close();
     configuration.set(BLUR_TABLE_DESCRIPTOR, new String(outputStream.toByteArray()));
-    setOutputPath(configuration, new Path(tableDescriptor.getTableUri()));
   }
 
   /**
@@ -280,7 +279,11 @@ public class BlurOutputFormat extends OutputFormat<Text, BlurMutate>
{
   }
 
   public static Path getOutputPath(Configuration configuration) {
-    return new Path(configuration.get(BLUR_OUTPUT_PATH));
+    String pathString = configuration.get(BLUR_OUTPUT_PATH);
+    if (pathString == null) {
+      return null;
+    }
+    return new Path(pathString);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
index 6bbaad4..0dc0459 100644
--- a/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
+++ b/blur-mapred-hadoop2/src/main/java/org/apache/blur/mapreduce/lib/CheckOutputSpecs.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 public class CheckOutputSpecs {
@@ -31,16 +30,12 @@ public class CheckOutputSpecs {
     if (tableDescriptor == null) {
       throw new IOException("setTableDescriptor needs to be called first.");
     }
-    int shardCount = tableDescriptor.getShardCount();
-    FileSystem fileSystem = BlurOutputFormat.getOutputPath(config).getFileSystem(config);
-    Path tablePath = new Path(tableDescriptor.getTableUri());
-    if (fileSystem.exists(tablePath)) {
-      BlurUtil.validateShardCount(shardCount, fileSystem, tablePath);
-    } else {
-      throw new IOException("Table path [ " + tablePath + " ] doesn't exist for table [ "
+ tableDescriptor.getName()
-          + " ].");
+    Path outputPath = BlurOutputFormat.getOutputPath(config);
+    if (outputPath == null) {
+      throw new IOException("Output path is not set.");
     }
-    BlurUtil.validateWritableDirectory(fileSystem, tablePath);
+    BlurUtil.validateWritableDirectory(outputPath.getFileSystem(config), outputPath);
+    int shardCount = tableDescriptor.getShardCount();
     int reducerMultiplier = BlurOutputFormat.getReducerMultiplier(config);
     int validNumberOfReducers = reducerMultiplier * shardCount;
     if (reducers > 0 && reducers != validNumberOfReducers) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
index a7ee0f0..14daa27 100644
--- a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
+++ b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
@@ -165,6 +165,9 @@ public class BlurOutputFormatMiniClusterTest {
     client.createTable(tableDescriptor);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
+    
     Path tablePath = new Path(tableUri);
     Path shardPath = new Path(tablePath, BlurUtil.getShardName(0));
     FileStatus[] listStatus = fileSystem.listStatus(shardPath);
@@ -177,6 +180,8 @@ public class BlurOutputFormatMiniClusterTest {
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
+    
+    client.loadData(tableName, output.toString());
 
     while (true) {
       TableStats tableStats = client.tableStats(tableName);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index 8d25470..fbcee34 100644
--- a/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/blur-mapred-hadoop2/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -65,14 +65,16 @@ public class BlurOutputFormatTest {
   @BeforeClass
   public static void setupTest() throws Exception {
     setupJavaHome();
-    System.setProperty("test.build.data", "./target/BlurOutputFormatTest/data");
-    TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", "target/tmp/BlurOutputFormatTest_tmp"));
-    System.setProperty("hadoop.log.dir", "./target/BlurOutputFormatTest/hadoop_log");
+    File file = new File("./target/tmp/BlurOutputFormatTest_tmp");
+    String pathStr = file.getAbsoluteFile().toURI().toString();
+    System.setProperty("test.build.data", pathStr + "/data");
+    System.setProperty("hadoop.log.dir", pathStr + "/hadoop_log");
     try {
       localFs = FileSystem.getLocal(conf);
     } catch (IOException io) {
       throw new RuntimeException("problem getting local fs", io);
     }
+    TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", pathStr));
 
     FileSystem.setDefaultUri(conf, new URI("file:///"));
     mr = (MiniMRYarnClusterAdapter) MiniMRClientClusterFactory.create(BlurOutputFormatTest.class,
1, conf);
@@ -85,6 +87,10 @@ public class BlurOutputFormatTest {
   public static void setupJavaHome() {
     String str = System.getenv("JAVA_HOME");
     if (str == null) {
+      String property = System.getProperty("java.home");
+      if (property != null) {
+        throw new RuntimeException("JAVA_HOME not set should probably be [" + property +
"].");
+      }
       throw new RuntimeException("JAVA_HOME not set.");
     }
   }
@@ -127,24 +133,25 @@ public class BlurOutputFormatTest {
     job.setInputFormatClass(TextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").makeQualified(localFs.getUri(), localFs.getWorkingDirectory())
-        .toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
     tableDescriptor.setName("test");
 
     createShardDirectories(outDir, 1);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
 
     assertTrue(job.waitForCompletion(true));
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
 
-    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Path path = new Path(output, BlurUtil.getShardName(0));
+    dump(path, conf);
     Collection<Path> commitedTasks = getCommitedTasks(path);
     assertEquals(1, commitedTasks.size());
     DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
@@ -152,6 +159,17 @@ public class BlurOutputFormatTest {
     reader.close();
   }
 
+  private void dump(Path path, Configuration conf) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    System.out.println(path);
+    if (!fileSystem.isFile(path)) {
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus fileStatus : listStatus) {
+        dump(fileStatus.getPath(), conf);
+      }
+    }
+  }
+
   private Collection<Path> getCommitedTasks(Path path) throws IOException {
     Collection<Path> result = new TreeSet<Path>();
     FileSystem fileSystem = path.getFileSystem(conf);
@@ -179,18 +197,18 @@ public class BlurOutputFormatTest {
     job.setInputFormatClass(TrackingTextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").makeQualified(localFs.getUri(), localFs.getWorkingDirectory())
-        .toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
     tableDescriptor.setName("test");
 
     createShardDirectories(outDir, 1);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
     BlurOutputFormat.setIndexLocally(job, true);
     BlurOutputFormat.setOptimizeInFlight(job, false);
 
@@ -198,7 +216,7 @@ public class BlurOutputFormatTest {
     Counters ctrs = job.getCounters();
     System.out.println("Counters: " + ctrs);
 
-    Path path = new Path(tableUri, BlurUtil.getShardName(0));
+    Path path = new Path(output, BlurUtil.getShardName(0));
     Collection<Path> commitedTasks = getCommitedTasks(path);
     assertEquals(1, commitedTasks.size());
 
@@ -222,18 +240,18 @@ public class BlurOutputFormatTest {
     job.setInputFormatClass(TrackingTextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").makeQualified(localFs.getUri(), localFs.getWorkingDirectory())
-        .toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(2);
-    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
     tableDescriptor.setName("test");
 
     createShardDirectories(outDir, 2);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
     BlurOutputFormat.setIndexLocally(job, false);
     BlurOutputFormat.setDocumentBufferStrategy(job, DocumentBufferStrategyHeapSize.class);
     BlurOutputFormat.setMaxDocumentBufferHeapSize(job, 128 * 1024);
@@ -244,7 +262,7 @@ public class BlurOutputFormatTest {
 
     long total = 0;
     for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Path path = new Path(output, BlurUtil.getShardName(i));
       Collection<Path> commitedTasks = getCommitedTasks(path);
       assertEquals(1, commitedTasks.size());
 
@@ -271,18 +289,18 @@ public class BlurOutputFormatTest {
     job.setInputFormatClass(TrackingTextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").makeQualified(localFs.getUri(), localFs.getWorkingDirectory())
-        .toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(7);
-    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
     tableDescriptor.setName("test");
 
     createShardDirectories(outDir, 7);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
     int multiple = 2;
     BlurOutputFormat.setReducerMultiplier(job, multiple);
 
@@ -292,7 +310,7 @@ public class BlurOutputFormatTest {
 
     long total = 0;
     for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Path path = new Path(output, BlurUtil.getShardName(i));
       Collection<Path> commitedTasks = getCommitedTasks(path);
       assertTrue(multiple >= commitedTasks.size());
       for (Path p : commitedTasks) {
@@ -319,17 +337,18 @@ public class BlurOutputFormatTest {
     job.setInputFormatClass(TrackingTextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
     tableDescriptor.setName("test");
 
     createShardDirectories(outDir, 1);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
     BlurOutputFormat.setReducerMultiplier(job, 2);
     job.setNumReduceTasks(4);
     job.submit();
@@ -353,18 +372,18 @@ public class BlurOutputFormatTest {
     job.setInputFormatClass(TrackingTextInputFormat.class);
 
     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
-    String tableUri = new Path(TEST_ROOT_DIR + "/out").makeQualified(localFs.getUri(), localFs.getWorkingDirectory())
-        .toString();
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(2);
-    tableDescriptor.setTableUri(tableUri);
+    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
     tableDescriptor.setName("test");
 
     createShardDirectories(outDir, 2);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
+    Path output = new Path(TEST_ROOT_DIR + "/out");
+    BlurOutputFormat.setOutputPath(job, output);
     BlurOutputFormat.setIndexLocally(job, false);
 
     job.submit();
@@ -382,7 +401,7 @@ public class BlurOutputFormatTest {
     assertFalse(job.isSuccessful());
 
     for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
-      Path path = new Path(tableUri, BlurUtil.getShardName(i));
+      Path path = new Path(output, BlurUtil.getShardName(i));
       FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
       FileStatus[] listStatus = fileSystem.listStatus(path);
       assertEquals(toString(listStatus), 0, listStatus.length);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/22200a3a/docs/Blur.html
----------------------------------------------------------------------
diff --git a/docs/Blur.html b/docs/Blur.html
index ab7b47d..68eb633 100644
--- a/docs/Blur.html
+++ b/docs/Blur.html
@@ -140,6 +140,7 @@ limitations under the License.
 <li><a href="#Fn_Blur_query">&nbsp;&nbsp;query</a></li>
 <li><a href="#Fn_Blur_fetchRow">&nbsp;&nbsp;fetchRow</a></li>
 <li><a href="#Fn_Blur_fetchRowBatch">&nbsp;&nbsp;fetchRowBatch</a></li>
+<li><a href="#Fn_Blur_loadData">&nbsp;&nbsp;loadData</a></li>
 <li><a href="#Fn_Blur_mutate">&nbsp;&nbsp;mutate</a></li>
 <li><a href="#Fn_Blur_enqueueMutate">&nbsp;&nbsp;enqueueMutate</a></li>
 <li><a href="#Fn_Blur_mutateBatch">&nbsp;&nbsp;mutateBatch</a></li>
@@ -922,6 +923,16 @@ throws <code><a href="Blur.html#Struct_BlurException">BlurException</a></code>
 </td></tr>
 <tr><td>selectors</td><td>the Selector to use to fetch the Row or
Record.
 </td></tr>
+</table></p></section><section><div class="page-header"><h4
id="Fn_Blur_loadData">Function: Blur.loadData</h4></div><p class="lead">
+<pre><code>void</code> loadData(<code>string</code> table,
+<code>string</code> location)
+throws <code><a href="Blur.html#Struct_BlurException">BlurException</a></code>
+</pre>Loads data from external location.
+<br/><br/><h4 id="Parameters_Blur_loadData">Parameters</h4>
+<table class="table-bordered table-striped table-condensed"><thead><th>Name</th><th>Description</th></thead><tr><td>table</td><td>The
table name.
+</td></tr>
+<tr><td>location</td><td>Location of bulk data load.
+</td></tr>
 </table></p></section><section><div class="page-header"><h4
id="Fn_Blur_mutate">Function: Blur.mutate</h4></div><p class="lead">
 <pre><code>void</code> mutate(<code><a href="Blur.html#Struct_RowMutation">RowMutation</a></code>
mutation)
 throws <code><a href="Blur.html#Struct_BlurException">BlurException</a></code>


Mime
View raw message