incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Updating unit tests and fixing an issue when async closing is used in the hdfs directory.
Date Fri, 17 Apr 2015 14:37:07 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master bca39998f -> 18c8db8f6


Updating unit tests and fixing an issue when async closing is used in the hdfs directory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/18c8db8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/18c8db8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/18c8db8f

Branch: refs/heads/master
Commit: 18c8db8f6328cf8681928476f641dff7b57957a9
Parents: bca3999
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Apr 17 10:37:01 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Apr 17 10:37:01 2015 -0400

----------------------------------------------------------------------
 .../lib/BlurOutputFormatMiniClusterTest.java    |   3 +-
 .../mapreduce/lib/BlurOutputFormatTest.java     | 237 ++++++++++---------
 .../blur/mapreduce/lib/CsvBlurDriverTest.java   |  32 +--
 .../apache/blur/store/hdfs/HdfsDirectory.java   |  32 ++-
 .../hdfs/MultiInstancesHdfsDirectoryTest.java   |  77 ++++++
 .../store/hdfs_v2/HdfsKeyValueStoreTest.java    |   5 +-
 .../java/org/apache/blur/utils/JavaHome.java    |  31 +++
 7 files changed, 279 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/18c8db8f/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
index 22d2b56..f1820b1 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatMiniClusterTest.java
@@ -38,6 +38,7 @@ import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.thrift.generated.TableStats;
 import org.apache.blur.utils.GCWatcher;
+import org.apache.blur.utils.JavaHome;
 import org.apache.blur.utils.ShardUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -68,7 +69,7 @@ public class BlurOutputFormatMiniClusterTest {
   @BeforeClass
   public static void setupTest() throws Exception {
     GCWatcher.init(0.60);
-    BlurOutputFormatTest.setupJavaHome();
+    JavaHome.checkJavaHome();
     LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
     File testDirectory = new File(TMPDIR, "blur-cluster-test").getAbsoluteFile();
     testDirectory.mkdirs();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/18c8db8f/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
index c3ab723..f4e7074 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurOutputFormatTest.java
@@ -27,7 +27,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
-import java.net.URI;
 import java.util.Collection;
 import java.util.TreeSet;
 
@@ -36,6 +35,7 @@ import org.apache.blur.server.TableContext;
 import org.apache.blur.store.buffer.BufferStore;
 import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.JavaHome;
 import org.apache.blur.utils.ShardUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -53,52 +53,36 @@ import org.junit.Test;
 
 public class BlurOutputFormatTest {
 
-  private static Configuration conf = new Configuration();
-  private static FileSystem localFs;
-  private static Path TEST_ROOT_DIR;
-  private static MiniCluster miniCluster;
-  private Path outDir = new Path(TEST_ROOT_DIR + "/out");
-  private Path inDir = new Path(TEST_ROOT_DIR + "/in");
+  private static Configuration _conf = new Configuration();
+  private static FileSystem _fileSystem;
+  private static MiniCluster _miniCluster;
+
+  private static Path _root;
 
   @BeforeClass
   public static void setupTest() throws Exception {
-    setupJavaHome();
+    JavaHome.checkJavaHome();
     File file = new File("./target/tmp/BlurOutputFormatTest_tmp");
     String pathStr = file.getAbsoluteFile().toURI().toString();
-    System.setProperty("test.build.data", pathStr + "/data");
+    String hdfsPath = pathStr + "/hdfs";
+    System.setProperty("test.build.data", hdfsPath);
     System.setProperty("hadoop.log.dir", pathStr + "/hadoop_log");
-    try {
-      localFs = FileSystem.getLocal(conf);
-    } catch (IOException io) {
-      throw new RuntimeException("problem getting local fs", io);
-    }
-    TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", pathStr));
 
-    URI uri = new URI("file:///");
-    FileSystem.setDefaultUri(conf, uri);
-    
-    miniCluster = new MiniCluster();
-    miniCluster.startMrMiniCluster(uri.toString());
-    conf = miniCluster.getMRConfiguration();
+    _miniCluster = new MiniCluster();
+    _miniCluster.startDfs(hdfsPath);
+    _fileSystem = _miniCluster.getFileSystem();
+    _root = new Path(_fileSystem.getUri() + "/testroot");
+    _miniCluster.startMrMiniCluster();
+    _conf = _miniCluster.getMRConfiguration();
 
     BufferStore.initNewBuffer(128, 128 * 128);
   }
 
-  public static void setupJavaHome() {
-    String str = System.getenv("JAVA_HOME");
-    if (str == null) {
-      String property = System.getProperty("java.home");
-      if (property != null) {
-        throw new RuntimeException("JAVA_HOME not set should probably be [" + property +
"].");
-      }
-      throw new RuntimeException("JAVA_HOME not set.");
-    }
-  }
-
   @AfterClass
   public static void teardown() throws IOException {
-    if (miniCluster != null) {
-      miniCluster.stopMrMiniCluster();
+    if (_miniCluster != null) {
+      _miniCluster.stopMrMiniCluster();
+      _miniCluster.shutdownDfs();
     }
     rm(new File("build"));
   }
@@ -122,28 +106,31 @@ public class BlurOutputFormatTest {
 
   @Test
   public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException
{
-    localFs.delete(inDir, true);
-    localFs.delete(outDir, true);
-    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
-    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
-
-    Job job = Job.getInstance(conf, "blur index");
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
+    writeRecordsFile(new Path(input, "part1"), 1, 1, 1, 1, "cf1");
+    writeRecordsFile(new Path(input, "part2"), 1, 1, 2, 1, "cf1");
+
+    Job job = Job.getInstance(_conf, "blur index");
     job.setJarByClass(BlurOutputFormatTest.class);
     job.setMapperClass(CsvBlurMapper.class);
     job.setInputFormatClass(TextInputFormat.class);
 
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    FileInputFormat.addInputPath(job, input);
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setTableUri(tablePath.toString());
     tableDescriptor.setName("test");
 
-    createShardDirectories(outDir, 1);
+    createShardDirectories(tablePath, 1);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
     BlurOutputFormat.setOutputPath(job, output);
 
     assertTrue(job.waitForCompletion(true));
@@ -151,14 +138,22 @@ public class BlurOutputFormatTest {
     System.out.println("Counters: " + ctrs);
 
     Path path = new Path(output, ShardUtil.getShardName(0));
-    dump(path, conf);
+    dump(path, _conf);
     Collection<Path> commitedTasks = getCommitedTasks(path);
     assertEquals(1, commitedTasks.size());
-    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
     assertEquals(2, reader.numDocs());
     reader.close();
   }
 
+  private Path getOutDir() {
+    return new Path(_root, "out");
+  }
+
+  private Path getInDir() {
+    return new Path(_root, "in");
+  }
+
   private void dump(Path path, Configuration conf) throws IOException {
     FileSystem fileSystem = path.getFileSystem(conf);
     System.out.println(path);
@@ -172,7 +167,7 @@ public class BlurOutputFormatTest {
 
   private Collection<Path> getCommitedTasks(Path path) throws IOException {
     Collection<Path> result = new TreeSet<Path>();
-    FileSystem fileSystem = path.getFileSystem(conf);
+    FileSystem fileSystem = path.getFileSystem(_conf);
     FileStatus[] listStatus = fileSystem.listStatus(path);
     for (FileStatus fileStatus : listStatus) {
       Path p = fileStatus.getPath();
@@ -185,29 +180,33 @@ public class BlurOutputFormatTest {
 
   @Test
   public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException,
ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
-
-    Job job = Job.getInstance(conf, "blur index");
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
+    // 1500 * 50 = 75,000
+    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
+    // 100 * 50 = 5,000
+    writeRecordsFile(new Path(input, "part2"), 1, 50, 2000, 100, "cf1");
+
+    Job job = Job.getInstance(_conf, "blur index");
     job.setJarByClass(BlurOutputFormatTest.class);
     job.setMapperClass(CsvBlurMapper.class);
     job.setInputFormatClass(TextInputFormat.class);
 
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    FileInputFormat.addInputPath(job, input);
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setTableUri(tablePath.toString());
     tableDescriptor.setName("test");
 
-    createShardDirectories(outDir, 1);
+    createShardDirectories(tablePath, 1);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
     BlurOutputFormat.setOutputPath(job, output);
     BlurOutputFormat.setIndexLocally(job, true);
     BlurOutputFormat.setOptimizeInFlight(job, false);
@@ -220,7 +219,7 @@ public class BlurOutputFormatTest {
     Collection<Path> commitedTasks = getCommitedTasks(path);
     assertEquals(1, commitedTasks.size());
 
-    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+    DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
     assertEquals(80000, reader.numDocs());
     reader.close();
   }
@@ -228,29 +227,33 @@ public class BlurOutputFormatTest {
   @Test
   public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
       ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
-
-    Job job = Job.getInstance(conf, "blur index");
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
+    // 1500 * 50 = 75,000
+    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
+    // 100 * 50 = 5,000
+    writeRecordsFile(new Path(input, "part2"), 1, 50, 2000, 100, "cf1");
+
+    Job job = Job.getInstance(_conf, "blur index");
     job.setJarByClass(BlurOutputFormatTest.class);
     job.setMapperClass(CsvBlurMapper.class);
     job.setInputFormatClass(TextInputFormat.class);
 
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    FileInputFormat.addInputPath(job, input);
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(2);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setTableUri(tablePath.toString());
     tableDescriptor.setName("test");
 
-    createShardDirectories(outDir, 2);
+    createShardDirectories(output, 2);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
     BlurOutputFormat.setOutputPath(job, output);
     BlurOutputFormat.setIndexLocally(job, false);
     BlurOutputFormat.setDocumentBufferStrategy(job, DocumentBufferStrategyHeapSize.class);
@@ -266,7 +269,7 @@ public class BlurOutputFormatTest {
       Collection<Path> commitedTasks = getCommitedTasks(path);
       assertEquals(1, commitedTasks.size());
 
-      DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, commitedTasks.iterator().next()));
+      DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
       total += reader.numDocs();
       reader.close();
     }
@@ -277,29 +280,34 @@ public class BlurOutputFormatTest {
   @Test
   public void testBlurOutputFormatOverFlowMultipleReducersWithReduceMultiplierTest() throws
IOException,
       InterruptedException, ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
 
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 50, 2000, 100, "cf1"); // 100 * 50 = 5,000
+    // 1500 * 50 = 75,000
+    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
+    // 100 * 50 = 5,000
+    writeRecordsFile(new Path(input, "part2"), 1, 50, 2000, 100, "cf1");
 
-    Job job = Job.getInstance(conf, "blur index");
+    Job job = Job.getInstance(_conf, "blur index");
     job.setJarByClass(BlurOutputFormatTest.class);
     job.setMapperClass(CsvBlurMapper.class);
     job.setInputFormatClass(TextInputFormat.class);
 
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    FileInputFormat.addInputPath(job, input);
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(7);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setTableUri(tablePath.toString());
     tableDescriptor.setName("test");
 
-    createShardDirectories(outDir, 7);
+    createShardDirectories(output, 7);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
     BlurOutputFormat.setOutputPath(job, output);
     int multiple = 2;
     BlurOutputFormat.setReducerMultiplier(job, multiple);
@@ -314,7 +322,7 @@ public class BlurOutputFormatTest {
       Collection<Path> commitedTasks = getCommitedTasks(path);
       assertTrue(multiple >= commitedTasks.size());
       for (Path p : commitedTasks) {
-        DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(conf, p));
+        DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, p));
         total += reader.numDocs();
         reader.close();
       }
@@ -326,28 +334,32 @@ public class BlurOutputFormatTest {
   @Test(expected = IllegalArgumentException.class)
   public void testBlurOutputFormatValidateReducerCount() throws IOException, InterruptedException,
       ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-    writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
-    writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
 
-    Job job = Job.getInstance(conf, "blur index");
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
+    writeRecordsFile(new Path(input, "part1"), 1, 1, 1, 1, "cf1");
+    writeRecordsFile(new Path(input, "part2"), 1, 1, 2, 1, "cf1");
+
+    Job job = Job.getInstance(_conf, "blur index");
     job.setJarByClass(BlurOutputFormatTest.class);
     job.setMapperClass(CsvBlurMapper.class);
     job.setInputFormatClass(TextInputFormat.class);
 
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    FileInputFormat.addInputPath(job, input);
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(1);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setTableUri(tablePath.toString());
     tableDescriptor.setName("test");
 
-    createShardDirectories(outDir, 1);
+    createShardDirectories(getOutDir(), 1);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
     BlurOutputFormat.setOutputPath(job, output);
     BlurOutputFormat.setReducerMultiplier(job, 2);
     job.setNumReduceTasks(4);
@@ -359,30 +371,33 @@ public class BlurOutputFormatTest {
   // @Test
   public void testBlurOutputFormatCleanupDuringJobKillTest() throws IOException, InterruptedException,
       ClassNotFoundException {
-    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
-    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
-
-    writeRecordsFile("in/part1", 1, 50, 1, 1500, "cf1"); // 1500 * 50 = 75,000
-    writeRecordsFile("in/part2", 1, 5000, 2000, 100, "cf1"); // 100 * 5000 =
-                                                             // 500,000
-
-    Job job = Job.getInstance(conf, "blur index");
+    Path input = getInDir();
+    Path output = getOutDir();
+    _fileSystem.delete(input, true);
+    _fileSystem.delete(output, true);
+    // 1500 * 50 = 75,000
+    writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
+    // 100 * 5000 = 500,000
+    writeRecordsFile(new Path(input, "part2"), 1, 5000, 2000, 100, "cf1");
+
+    Job job = Job.getInstance(_conf, "blur index");
     job.setJarByClass(BlurOutputFormatTest.class);
     job.setMapperClass(CsvBlurMapper.class);
     job.setInputFormatClass(TextInputFormat.class);
 
-    FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
+    FileInputFormat.addInputPath(job, input);
     CsvBlurMapper.addColumns(job, "cf1", "col");
 
+    Path tablePath = new Path(new Path(_root, "table"), "test");
+
     TableDescriptor tableDescriptor = new TableDescriptor();
     tableDescriptor.setShardCount(2);
-    tableDescriptor.setTableUri(new Path(TEST_ROOT_DIR + "/table/test").toString());
+    tableDescriptor.setTableUri(tablePath.toString());
     tableDescriptor.setName("test");
 
-    createShardDirectories(outDir, 2);
+    createShardDirectories(getOutDir(), 2);
 
     BlurOutputFormat.setupJob(job, tableDescriptor);
-    Path output = new Path(TEST_ROOT_DIR + "/out");
     BlurOutputFormat.setOutputPath(job, output);
     BlurOutputFormat.setIndexLocally(job, false);
 
@@ -422,8 +437,8 @@ public class BlurOutputFormatTest {
     return s;
   }
 
-  public static String readFile(String name) throws IOException {
-    DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
+  public static String readFile(Path file) throws IOException {
+    DataInputStream f = _fileSystem.open(file);
     BufferedReader b = new BufferedReader(new InputStreamReader(f));
     StringBuilder result = new StringBuilder();
     String line = b.readLine();
@@ -436,12 +451,10 @@ public class BlurOutputFormatTest {
     return result.toString();
   }
 
-  private Path writeRecordsFile(String name, int starintgRowId, int numberOfRows, int startRecordId,
-      int numberOfRecords, String family) throws IOException {
-    // "1,1,cf1,val1"
-    Path file = new Path(TEST_ROOT_DIR + "/" + name);
-    localFs.delete(file, false);
-    DataOutputStream f = localFs.create(file);
+  private Path writeRecordsFile(Path file, int starintgRowId, int numberOfRows, int startRecordId,
int numberOfRecords,
+      String family) throws IOException {
+    _fileSystem.delete(file, false);
+    DataOutputStream f = _fileSystem.create(file);
     PrintWriter writer = new PrintWriter(f);
     for (int row = 0; row < numberOfRows; row++) {
       for (int record = 0; record < numberOfRecords; record++) {
@@ -453,9 +466,9 @@ public class BlurOutputFormatTest {
   }
 
   private void createShardDirectories(Path outDir, int shardCount) throws IOException {
-    localFs.mkdirs(outDir);
+    _fileSystem.mkdirs(outDir);
     for (int i = 0; i < shardCount; i++) {
-      localFs.mkdirs(new Path(outDir, ShardUtil.getShardName(i)));
+      _fileSystem.mkdirs(new Path(outDir, ShardUtil.getShardName(i)));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/18c8db8f/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
index 340d2b3..7e96a70 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
@@ -43,17 +43,19 @@ import org.junit.Test;
 
 public class CsvBlurDriverTest {
 
-  protected String tableUri = "file:///tmp/tmppath";
+  protected Path root = new Path("./target/tmp/CsvBlurDriverTest/");
   protected int shardCount = 13;
+  protected Path _path1;
+  protected Path _path2;
 
   @Before
   public void setup() throws IOException {
     Configuration configuration = new Configuration();
-    Path path1 = new Path("file:///tmp/test1");
-    Path path2 = new Path("file:///tmp/test2");
-    FileSystem fileSystem = path1.getFileSystem(configuration);
-    fileSystem.mkdirs(path1);
-    fileSystem.mkdirs(path2);
+    _path1 = new Path(root, "test1");
+    _path2 = new Path(root, "test2");
+    FileSystem fileSystem = _path1.getFileSystem(configuration);
+    fileSystem.mkdirs(_path1);
+    fileSystem.mkdirs(_path2);
   }
 
   @Test
@@ -80,8 +82,8 @@ public class CsvBlurDriverTest {
     };
     AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
     Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010",
"-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i",
-        "file:///tmp/test2");
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", _path1.toString(),
"-i",
+        _path2.toString());
     assertNotNull(job);
     Configuration configuration = job.getConfiguration();
     TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
@@ -103,8 +105,8 @@ public class CsvBlurDriverTest {
     };
     AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
     Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010",
"-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i",
-        "file:///tmp/test2", "-S", "-C", "1000000", "2000000");
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", _path1.toString(),
"-i",
+        _path2.toString(), "-S", "-C", "1000000", "2000000");
     assertNotNull(job);
     Configuration configuration = job.getConfiguration();
     TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
@@ -126,8 +128,8 @@ public class CsvBlurDriverTest {
     };
     AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
     Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010",
"-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i",
-        "file:///tmp/test2", "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", _path1.toString(),
"-i",
+        _path2.toString(), "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
     assertNotNull(job);
     Configuration configuration = job.getConfiguration();
     TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
@@ -152,8 +154,8 @@ public class CsvBlurDriverTest {
     int multiplierParam = 10;
     AtomicReference<Callable<Void>> ref = new AtomicReference<Callable<Void>>();
     Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, ref, "-c", "host:40010",
"-d", "family1",
-        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i",
-        "file:///tmp/test2", "-S", "-C", "1000000", "2000000", "-p", "SNAPPY", "-r", Integer.toString(multiplierParam));
+        "col1", "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", _path1.toString(),
"-i",
+        _path2.toString(), "-S", "-C", "1000000", "2000000", "-p", "SNAPPY", "-r", Integer.toString(multiplierParam));
     assertNotNull(job);
 
     assertEquals(multiplierParam * shardCount, job.getNumReduceTasks());
@@ -167,7 +169,7 @@ public class CsvBlurDriverTest {
         if (method.getName().equals("describe")) {
           TableDescriptor tableDescriptor = new TableDescriptor();
           tableDescriptor.setName((String) args[0]);
-          tableDescriptor.setTableUri(tableUri);
+          tableDescriptor.setTableUri(new Path(root, "tmppath").toString());
           tableDescriptor.setShardCount(shardCount);
           return tableDescriptor;
         }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/18c8db8f/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 0ff4ab4..1e703c5 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -56,6 +56,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.lucene.store.BufferedIndexOutput;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
@@ -176,18 +180,23 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
         if (!fileStatus.isDir()) {
           Path p = fileStatus.getPath();
           String name = p.getName();
+          long lastMod;
+          long length;
+          String resolvedName;
           if (name.endsWith(LNK)) {
-            String resolvedName = getRealFileName(name);
+            resolvedName = getRealFileName(name);
             Path resolvedPath = getPath(resolvedName);
             FileStatus resolvedFileStatus = _fileSystem.getFileStatus(resolvedPath);
-            _fileStatusMap.put(resolvedName, new FStat(resolvedFileStatus));
+            lastMod = resolvedFileStatus.getModificationTime();
           } else if (name.endsWith(COPY)) {
-            String resolvedName = getRealFileName(name);
-            long lastModTime = getLastModTimeFromCopyFile(name);
-            _fileStatusMap.put(resolvedName, new FStat(lastModTime, fileStatus.getLen()));
+            resolvedName = getRealFileName(name);
+            lastMod = getLastModTimeFromCopyFile(name);
           } else {
-            _fileStatusMap.put(name, new FStat(fileStatus));
+            resolvedName = name;
+            lastMod = fileStatus.getModificationTime();
           }
+          length = length(resolvedName);
+          _fileStatusMap.put(resolvedName, new FStat(lastMod, length));
         }
       }
     }
@@ -518,7 +527,16 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     Path path = getPath(name);
     Tracer trace = Trace.trace("filesystem - length", Trace.param("path", path));
     try {
-      return _fileSystem.getFileStatus(path).getLen();
+      if (_fileSystem instanceof DistributedFileSystem) {
+        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) _fileSystem;
+        DFSClient client = distributedFileSystem.getClient();
+        DFSInputStream inputStream = client.open(path.toUri().getPath());
+        long fileLength = inputStream.getFileLength();
+        inputStream.close();
+        return fileLength;
+      } else {
+        return _fileSystem.getFileStatus(path).getLen();
+      }
     } finally {
       trace.done();
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/18c8db8f/blur-store/src/test/java/org/apache/blur/store/hdfs/MultiInstancesHdfsDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs/MultiInstancesHdfsDirectoryTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs/MultiInstancesHdfsDirectoryTest.java
new file mode 100644
index 0000000..52940c6
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs/MultiInstancesHdfsDirectoryTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.blur.HdfsMiniClusterUtil;
+import org.apache.blur.utils.JavaHome;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class MultiInstancesHdfsDirectoryTest {
+
+  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir",
+      "./target/tmp_MultiInstancesHdfsDirectoryTest"));
+
+  private static Configuration _configuration = new Configuration();
+  private static MiniDFSCluster _cluster;
+  private static Path _root;
+
+  @BeforeClass
+  public static void setupClass() throws IOException {
+    JavaHome.checkJavaHome();
+    _cluster = HdfsMiniClusterUtil.startDfs(_configuration, true, TMPDIR.getAbsolutePath());
+    _root = new Path(_cluster.getFileSystem().getUri() + "/");
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    HdfsMiniClusterUtil.shutdownDfs(_cluster);
+  }
+
+  @Test
+  public void testMultiInstancesHdfsDirectoryTest1() throws IOException, InterruptedException
{
+    HdfsDirectory dir1 = new HdfsDirectory(_configuration, new Path(_root, "dir"));
+    
+
+    IndexOutput output = dir1.createOutput("a", IOContext.DEFAULT);
+    output.writeInt(1234);
+    output.close();
+    
+    HdfsDirectory dir2 = new HdfsDirectory(_configuration, new Path(_root, "dir"));
+
+    IndexInput input = dir2.openInput("a", IOContext.READ);
+    assertEquals(4, input.length());
+    assertEquals(1234, input.readInt());
+    input.close();
+
+    dir1.close();
+    dir2.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/18c8db8f/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
index 0122c63..64bf7a2 100644
--- a/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStoreTest.java
@@ -41,7 +41,7 @@ public class HdfsKeyValueStoreTest {
 
   private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/tmp_HdfsKeyValueStoreTest"));
 
-  private Configuration _configuration = new Configuration();
+  private static Configuration _configuration = new Configuration();
   private static MiniDFSCluster _cluster;
 
   private static Timer _timer;
@@ -49,8 +49,7 @@ public class HdfsKeyValueStoreTest {
 
   @BeforeClass
   public static void startCluster() {
-    Configuration conf = new Configuration();
-    _cluster = HdfsMiniClusterUtil.startDfs(conf, true, TMPDIR.getAbsolutePath());
+    _cluster = HdfsMiniClusterUtil.startDfs(_configuration, true, TMPDIR.getAbsolutePath());
     _timer = new Timer("IndexImporter", true);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/18c8db8f/blur-util/src/main/java/org/apache/blur/utils/JavaHome.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/JavaHome.java b/blur-util/src/main/java/org/apache/blur/utils/JavaHome.java
new file mode 100644
index 0000000..17b6061
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/utils/JavaHome.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.utils;
+
+public class JavaHome {
+
+  public static void checkJavaHome() {
+    String str = System.getenv("JAVA_HOME");
+    if (str == null) {
+      String property = System.getProperty("java.home");
+      if (property != null) {
+        throw new RuntimeException("JAVA_HOME not set should probably be [" + property +
"].");
+      }
+      throw new RuntimeException("JAVA_HOME not set.");
+    }
+  }
+}


Mime
View raw message