incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/6] git commit: Adding softlink directory.
Date Sun, 22 Sep 2013 00:08:02 GMT
Updated Branches:
  refs/heads/master 0ee24620c -> cde28869b


Adding softlink directory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/862691aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/862691aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/862691aa

Branch: refs/heads/master
Commit: 862691aa0c97944dfe398c6d092dc565e2452fb7
Parents: 0ee2462
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Sep 20 13:25:28 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Sep 20 13:27:16 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 294 +++++++------------
 .../apache/blur/store/hdfs/HdfsIndexInput.java  | 107 +++++++
 .../apache/blur/store/hdfs/MetricsGroup.java    |  35 +++
 .../blur/store/hdfs/SoftlinkHdfsDirectory.java  | 201 +++++++++++++
 .../apache/blur/store/BaseDirectoryTest.java    | 273 +++++++++++++++++
 .../apache/blur/store/HdfsDirectoryTest.java    | 175 +----------
 .../blur/store/SoftlinkHdfsDirectoryTest.java   |  43 +++
 7 files changed, 777 insertions(+), 351 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/862691aa/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index 94686e2..2b3f5b0 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.store.blockcache.LastModified;
-import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -58,186 +57,91 @@ public class HdfsDirectory extends Directory implements LastModified
{
 
   public static AtomicInteger fetchImpl = new AtomicInteger(1);
 
-//  static {
-//    Thread thread = new Thread(new Runnable() {
-//      @Override
-//      public void run() {
-//        while (true) {
-//          File file = new File("/tmp/fetch.impl");
-//          if (file.exists()) {
-//            try {
-//              BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
-//              String line = reader.readLine();
-//              String trim = line.trim();
-//              int i = Integer.parseInt(trim);
-//              if (i != fetchImpl.get()) {
-//                LOG.info("Changing fetch impl [" + i + "]");
-//                fetchImpl.set(i);
-//              }
-//              reader.close();
-//            } catch (Exception e) {
-//              LOG.error("Unknown error", e);
-//            }
-//          }
-//          try {
-//            Thread.sleep(5000);
-//          } catch (InterruptedException e) {
-//            return;
-//          }
-//        }
-//      }
-//    });
-//    thread.setDaemon(true);
-//    thread.start();
-//  }
-
-  private final Path path;
-  private final FileSystem fileSystem;
-  private final MetricsGroup metricsGroup;
-
-  static class MetricsGroup {
-    final Histogram readAccess;
-    final Histogram writeAccess;
-    final Meter writeThroughput;
-    final Meter readThroughput;
-
-    MetricsGroup(Histogram readAccess, Histogram writeAccess, Meter readThroughput, Meter
writeThroughput) {
-      this.readAccess = readAccess;
-      this.writeAccess = writeAccess;
-      this.readThroughput = readThroughput;
-      this.writeThroughput = writeThroughput;
-    }
-  }
+  // static {
+  // Thread thread = new Thread(new Runnable() {
+  // @Override
+  // public void run() {
+  // while (true) {
+  // File file = new File("/tmp/fetch.impl");
+  // if (file.exists()) {
+  // try {
+  // BufferedReader reader = new BufferedReader(new InputStreamReader(new
+  // FileInputStream(file)));
+  // String line = reader.readLine();
+  // String trim = line.trim();
+  // int i = Integer.parseInt(trim);
+  // if (i != fetchImpl.get()) {
+  // LOG.info("Changing fetch impl [" + i + "]");
+  // fetchImpl.set(i);
+  // }
+  // reader.close();
+  // } catch (Exception e) {
+  // LOG.error("Unknown error", e);
+  // }
+  // }
+  // try {
+  // Thread.sleep(5000);
+  // } catch (InterruptedException e) {
+  // return;
+  // }
+  // }
+  // }
+  // });
+  // thread.setDaemon(true);
+  // thread.start();
+  // }
 
   /**
    * We keep the metrics separate per filesystem.
    */
-  private static Map<URI, MetricsGroup> metricsGroupMap = new WeakHashMap<URI, MetricsGroup>();
+  protected static Map<URI, MetricsGroup> _metricsGroupMap = new WeakHashMap<URI,
MetricsGroup>();
+  
+  protected final Path _path;
+  protected final FileSystem _fileSystem;
+  protected final MetricsGroup _metricsGroup;
 
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
-    this.path = path;
-    fileSystem = path.getFileSystem(configuration);
-    fileSystem.mkdirs(path);
+    this._path = path;
+    _fileSystem = path.getFileSystem(configuration);
+    _fileSystem.mkdirs(path);
     setLockFactory(NoLockFactory.getNoLockFactory());
-    synchronized (metricsGroupMap) {
-      URI uri = fileSystem.getUri();
-      MetricsGroup metricsGroup = metricsGroupMap.get(uri);
+    synchronized (_metricsGroupMap) {
+      URI uri = _fileSystem.getUri();
+      MetricsGroup metricsGroup = _metricsGroupMap.get(uri);
       if (metricsGroup == null) {
         String scope = uri.toString();
-
-        Histogram readAccess = Metrics.newHistogram(new MetricName(ORG_APACHE_BLUR, HDFS,
"Read Latency in \u00B5s",
-            scope));
-        Histogram writeAccess = Metrics.newHistogram(new MetricName(ORG_APACHE_BLUR, HDFS,
"Write Latency in \u00B5s",
-            scope));
-        Meter readThroughput = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, HDFS, "Read
Throughput", scope),
-            "Read Bytes", TimeUnit.SECONDS);
-        Meter writeThroughput = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, HDFS, "Write
Throughput", scope),
-            "Write Bytes", TimeUnit.SECONDS);
-        metricsGroup = new MetricsGroup(readAccess, writeAccess, readThroughput, writeThroughput);
-        metricsGroupMap.put(uri, metricsGroup);
+        metricsGroup = createNewMetricsGroup(scope);
+        _metricsGroupMap.put(uri, metricsGroup);
       }
-      this.metricsGroup = metricsGroup;
+      _metricsGroup = metricsGroup;
     }
   }
 
-  @Override
-  public String toString() {
-    return "HdfsDirectory path=[" + path + "]";
+  private MetricsGroup createNewMetricsGroup(String scope) {
+    MetricName readAccessName = new MetricName(ORG_APACHE_BLUR, HDFS, "Read Latency in \u00B5s",
scope);
+    MetricName writeAcccessName = new MetricName(ORG_APACHE_BLUR, HDFS, "Write Latency in
\u00B5s", scope);
+    MetricName readThroughputName = new MetricName(ORG_APACHE_BLUR, HDFS, "Read Throughput",
scope);
+    MetricName writeThroughputName = new MetricName(ORG_APACHE_BLUR, HDFS, "Write Throughput",
scope);
+    
+    Histogram readAccess = Metrics.newHistogram(readAccessName);
+    Histogram writeAccess = Metrics.newHistogram(writeAcccessName);
+    Meter readThroughput = Metrics.newMeter(readThroughputName, "Read Bytes", TimeUnit.SECONDS);
+    Meter writeThroughput = Metrics.newMeter(writeThroughputName, "Write Bytes", TimeUnit.SECONDS);
+    return new MetricsGroup(readAccess, writeAccess, readThroughput, writeThroughput);
   }
 
-  public static class HdfsIndexInput extends ReusedBufferedIndexInput {
-
-    private final long len;
-    private FSDataInputStream inputStream;
-    private boolean isClone;
-    private final MetricsGroup metricsGroup;
-    private int _readVersion;
-
-    public HdfsIndexInput(FileSystem fileSystem, Path filePath, MetricsGroup metricsGroup)
throws IOException {
-      super(filePath.toString());
-      inputStream = fileSystem.open(filePath);
-      FileStatus fileStatus = fileSystem.getFileStatus(filePath);
-      len = fileStatus.getLen();
-      this.metricsGroup = metricsGroup;
-      _readVersion = fetchImpl.get();
-    }
-
-    @Override
-    public long length() {
-      return len;
-    }
-
-    @Override
-    protected void seekInternal(long pos) throws IOException {
-
-    }
-
-    @Override
-    protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      long start = System.nanoTime();
-      long filePointer = getFilePointer();
-      switch (_readVersion) {
-      case 0:
-        synchronized (inputStream) {
-          inputStream.seek(getFilePointer());
-          inputStream.readFully(b, offset, length);
-        }
-        break;
-      case 1:
-        while (length > 0) {
-          int amount;
-          synchronized (inputStream) {
-            inputStream.seek(filePointer);
-            amount = inputStream.read(b, offset, length);
-          }
-          length -= amount;
-          offset += amount;
-          filePointer += amount;
-        }
-        break;
-      case 2:
-        inputStream.readFully(filePointer, b, offset, length);
-        break;
-      case 3:
-        while (length > 0) {
-          int amount;
-          amount = inputStream.read(filePointer, b, offset, length);
-          length -= amount;
-          offset += amount;
-          filePointer += amount;
-        }
-        break;
-      default:
-        break;
-      }
-      long end = System.nanoTime();
-      metricsGroup.readAccess.update((end - start) / 1000);
-      metricsGroup.readThroughput.mark(length);
-    }
-
-    @Override
-    protected void closeInternal() throws IOException {
-      if (!isClone) {
-        inputStream.close();
-      }
-    }
-
-    @Override
-    public ReusedBufferedIndexInput clone() {
-      HdfsIndexInput clone = (HdfsIndexInput) super.clone();
-      clone.isClone = true;
-      clone._readVersion = fetchImpl.get();
-      return clone;
-    }
+  @Override
+  public String toString() {
+    return "HdfsDirectory path=[" + _path + "]";
   }
 
   @Override
   public IndexOutput createOutput(String name, IOContext context) throws IOException {
-    LOG.debug("createOutput [{0}] [{1}] [{2}]", name, context, path);
+    LOG.debug("createOutput [{0}] [{1}] [{2}]", name, context, _path);
     if (fileExists(name)) {
       throw new IOException("File [" + name + "] already exists found.");
     }
-    final FSDataOutputStream outputStream = fileSystem.create(getPath(name));
+    final FSDataOutputStream outputStream = openForOutput(name);
     return new BufferedIndexOutput() {
 
       @Override
@@ -250,8 +154,8 @@ public class HdfsDirectory extends Directory implements LastModified {
         long start = System.nanoTime();
         outputStream.write(b, offset, len);
         long end = System.nanoTime();
-        metricsGroup.writeAccess.update((end - start) / 1000);
-        metricsGroup.writeThroughput.mark(len);
+        _metricsGroup.writeAccess.update((end - start) / 1000);
+        _metricsGroup.writeThroughput.mark(len);
       }
 
       @Override
@@ -267,24 +171,33 @@ public class HdfsDirectory extends Directory implements LastModified
{
     };
   }
 
+  protected FSDataOutputStream openForOutput(String name) throws IOException {
+    return _fileSystem.create(getPath(name));
+  }
+
   @Override
   public IndexInput openInput(String name, IOContext context) throws IOException {
-    LOG.debug("openInput [{0}] [{1}] [{2}]", name, context, path);
+    LOG.debug("openInput [{0}] [{1}] [{2}]", name, context, _path);
     if (!fileExists(name)) {
       throw new FileNotFoundException("File [" + name + "] not found.");
     }
-    Path filePath = getPath(name);
-    return new HdfsIndexInput(fileSystem, filePath, metricsGroup);
+    FSDataInputStream inputStream = openForInput(name);
+    long fileLength = fileLength(name);
+    return new HdfsIndexInput(name, inputStream, fileLength, _metricsGroup, fetchImpl.get());
+  }
+
+  protected FSDataInputStream openForInput(String name) throws IOException {
+    return _fileSystem.open(getPath(name));
   }
 
   @Override
   public String[] listAll() throws IOException {
-    LOG.debug("listAll [{0}]", path);
-    FileStatus[] files = fileSystem.listStatus(path, new PathFilter() {
+    LOG.debug("listAll [{0}]", _path);
+    FileStatus[] files = _fileSystem.listStatus(_path, new PathFilter() {
       @Override
       public boolean accept(Path path) {
         try {
-          return fileSystem.isFile(path);
+          return _fileSystem.isFile(path);
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
@@ -299,25 +212,36 @@ public class HdfsDirectory extends Directory implements LastModified
{
 
   @Override
   public boolean fileExists(String name) throws IOException {
-    LOG.debug("fileExists [{0}] [{1}]", name, path);
-    return fileSystem.exists(getPath(name));
+    LOG.debug("fileExists [{0}] [{1}]", name, _path);
+    return exists(name);
+  }
+
+  protected boolean exists(String name) throws IOException {
+    return _fileSystem.exists(getPath(name));
   }
 
   @Override
   public void deleteFile(String name) throws IOException {
-    LOG.debug("deleteFile [{0}] [{1}]", name, path);
+    LOG.debug("deleteFile [{0}] [{1}]", name, _path);
     if (fileExists(name)) {
-      fileSystem.delete(getPath(name), true);
+      delete(name);
     } else {
       throw new FileNotFoundException("File [" + name + "] not found");
     }
   }
 
+  protected void delete(String name) throws IOException {
+    _fileSystem.delete(getPath(name), true);
+  }
+
   @Override
   public long fileLength(String name) throws IOException {
-    LOG.debug("fileLength [{0}] [{1}]", name, path);
-    FileStatus fileStatus = fileSystem.getFileStatus(getPath(name));
-    return fileStatus.getLen();
+    LOG.debug("fileLength [{0}] [{1}]", name, _path);
+    return length(name);
+  }
+
+  protected long length(String name) throws IOException {
+    return _fileSystem.getFileStatus(getPath(name)).getLen();
   }
 
   @Override
@@ -331,24 +255,27 @@ public class HdfsDirectory extends Directory implements LastModified
{
   }
 
   public Path getPath() {
-    return path;
+    return _path;
   }
 
   private Path getPath(String name) {
-    return new Path(path, name);
+    return new Path(_path, name);
   }
 
   public long getFileModified(String name) throws IOException {
     if (!fileExists(name)) {
       throw new FileNotFoundException("File [" + name + "] not found");
     }
-    Path file = getPath(name);
-    return fileSystem.getFileStatus(file).getModificationTime();
+    return fileModified(name);
+  }
+
+  protected long fileModified(String name) throws IOException {
+    return _fileSystem.getFileStatus(getPath(name)).getModificationTime();
   }
 
   @Override
   public void copy(Directory to, String src, String dest, IOContext context) throws IOException
{
-    LOG.debug("copy [{0}] [{1}] [{2}] [{3}] [{4}]", to, src, dest, context, path);
+    LOG.warn("DANGEROUS copy [{0}] [{1}] [{2}] [{3}] [{4}]", to, src, dest, context, _path);
     if (to instanceof DirectoryDecorator) {
       copy(((DirectoryDecorator) to).getOriginalDirectory(), src, dest, context);
     } else if (to instanceof HdfsDirectory) {
@@ -356,16 +283,21 @@ public class HdfsDirectory extends Directory implements LastModified
{
         return;
       }
     } else {
-      super.copy(to, src, dest, context);
+      slowCopy(to, src, dest, context);
+      
     }
   }
 
+  protected void slowCopy(Directory to, String src, String dest, IOContext context) throws
IOException {
+    super.copy(to, src, dest, context);
+  }
+
   private boolean quickMove(Directory to, String src, String dest, IOContext context) throws
IOException {
     HdfsDirectory simpleTo = (HdfsDirectory) to;
     if (ifSameCluster(simpleTo, this)) {
       Path newDest = simpleTo.getPath(dest);
       Path oldSrc = getPath(src);
-      return fileSystem.rename(oldSrc, newDest);
+      return _fileSystem.rename(oldSrc, newDest);
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/862691aa/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
new file mode 100644
index 0000000..1c454a9
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import java.io.IOException;
+
+import org.apache.blur.store.buffer.ReusedBufferedIndexInput;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+public class HdfsIndexInput extends ReusedBufferedIndexInput {
+
+  private final long _length;
+  private FSDataInputStream _inputStream;
+  private boolean _isClone;
+  private final MetricsGroup _metricsGroup;
+  private int _readVersion;
+
+  public HdfsIndexInput(String name, FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup, int readVersion) throws IOException {
+    super(name);
+    _inputStream = inputStream;
+    _length = length;
+    _metricsGroup = metricsGroup;
+    _readVersion = readVersion;
+  }
+
+  @Override
+  public long length() {
+    return _length;
+  }
+
+  @Override
+  protected void seekInternal(long pos) throws IOException {
+
+  }
+
+  @Override
+  protected void readInternal(byte[] b, int offset, int length) throws IOException {
+    long start = System.nanoTime();
+    long filePointer = getFilePointer();
+    switch (_readVersion) {
+    case 0:
+      synchronized (_inputStream) {
+        _inputStream.seek(getFilePointer());
+        _inputStream.readFully(b, offset, length);
+      }
+      break;
+    case 1:
+      while (length > 0) {
+        int amount;
+        synchronized (_inputStream) {
+          _inputStream.seek(filePointer);
+          amount = _inputStream.read(b, offset, length);
+        }
+        length -= amount;
+        offset += amount;
+        filePointer += amount;
+      }
+      break;
+    case 2:
+      _inputStream.readFully(filePointer, b, offset, length);
+      break;
+    case 3:
+      while (length > 0) {
+        int amount;
+        amount = _inputStream.read(filePointer, b, offset, length);
+        length -= amount;
+        offset += amount;
+        filePointer += amount;
+      }
+      break;
+    default:
+      break;
+    }
+    long end = System.nanoTime();
+    _metricsGroup.readAccess.update((end - start) / 1000);
+    _metricsGroup.readThroughput.mark(length);
+  }
+
+  @Override
+  protected void closeInternal() throws IOException {
+    if (!_isClone) {
+      _inputStream.close();
+    }
+  }
+
+  @Override
+  public ReusedBufferedIndexInput clone() {
+    HdfsIndexInput clone = (HdfsIndexInput) super.clone();
+    clone._isClone = true;
+    clone._readVersion = HdfsDirectory.fetchImpl.get();
+    return clone;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/862691aa/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java
new file mode 100644
index 0000000..1088504
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/MetricsGroup.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.blur.store.hdfs;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+
+public class MetricsGroup {
+  final Histogram readAccess;
+  final Histogram writeAccess;
+  final Meter writeThroughput;
+  final Meter readThroughput;
+
+  MetricsGroup(Histogram readAccess, Histogram writeAccess, Meter readThroughput, Meter writeThroughput)
{
+    this.readAccess = readAccess;
+    this.writeAccess = writeAccess;
+    this.readThroughput = readThroughput;
+    this.writeThroughput = writeThroughput;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/862691aa/blur-store/src/main/java/org/apache/blur/store/hdfs/SoftlinkHdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/SoftlinkHdfsDirectory.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/SoftlinkHdfsDirectory.java
new file mode 100644
index 0000000..deee52f
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/SoftlinkHdfsDirectory.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+
+public class SoftlinkHdfsDirectory extends HdfsDirectory {
+
+  private static final Log LOG = LogFactory.getLog(HdfsDirectory.class);
+
+  private static final String UTF_8 = "UTF-8";
+  private static final String EXT = ".blur_lnk";
+  private final Path _storePath;
+  private final Path _linkPath;
+
+  /**
+   * Creates a new SoftlinkHdfsDirectory.
+   * 
+   * @param configuration
+   *          the {@link Configuration} object.
+   * @param storePath
+   *          the path where the data is actually stored.
+   * @param linkPath
+   *          the path where the links are stored.
+   * @throws IOException
+   */
+  public SoftlinkHdfsDirectory(Configuration configuration, Path storePath, Path linkPath)
throws IOException {
+    super(configuration, linkPath);
+    FileSystem fileSystem = storePath.getFileSystem(configuration);
+    _storePath = fileSystem.makeQualified(storePath);
+    _linkPath = fileSystem.makeQualified(linkPath);
+  }
+
+  @Override
+  protected FSDataOutputStream openForOutput(String name) throws IOException {
+    createLinkForNewFile(name);
+    return _fileSystem.create(getDataPath(name));
+  }
+
+  private void createLinkForNewFile(String name) throws IOException {
+    String uuid = UUID.randomUUID().toString();
+    Path dataPath = new Path(_storePath, uuid);
+    createLinkForNewFile(name, dataPath.toUri());
+  }
+
+  private void createLinkForNewFile(String name, URI uri) throws IOException, UnsupportedEncodingException
{
+    String uriStr = uri.toString();
+    Path linkPath = createLinkPath(name);
+    FSDataOutputStream outputStream = _fileSystem.create(linkPath, false);
+    outputStream.write(uriStr.getBytes(UTF_8));
+    outputStream.close();
+  }
+
+  private Path getDataPath(String name) throws IOException {
+    Path linkPath = createLinkPath(name);
+    boolean exists = _fileSystem.exists(linkPath);
+    if (exists) {
+      return new Path(getUri(linkPath));
+    } else {
+      return new Path(_linkPath, name);
+    }
+  }
+
+  private URI getUri(Path linkPath) throws IOException {
+    FileStatus fileStatus = _fileSystem.getFileStatus(linkPath);
+    byte[] buf = new byte[(int) fileStatus.getLen()];
+    FSDataInputStream inputStream = _fileSystem.open(linkPath);
+    inputStream.readFully(buf);
+    inputStream.close();
+    try {
+      return new URI(new String(buf, UTF_8));
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private Path getLinkPath(String name) throws IOException {
+    Path linkPath = createLinkPath(name);
+    boolean exists = _fileSystem.exists(linkPath);
+    if (exists) {
+      return new Path(_linkPath, name + EXT);
+    } else {
+      return new Path(_linkPath, name);
+    }
+  }
+
+  private Path createLinkPath(String name) {
+    return new Path(_linkPath, name + EXT);
+  }
+
+  private String removeLinkExtensionSuffix(String name) {
+    if (name.endsWith(EXT)) {
+      return name.substring(0, name.length() - EXT.length());
+    }
+    return name;
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    LOG.debug("listAll [{0}]", _path);
+    FileStatus[] files = _fileSystem.listStatus(_path, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        try {
+          return _fileSystem.isFile(path);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+    String[] result = new String[files.length];
+    for (int i = 0; i < result.length; i++) {
+      result[i] = removeLinkExtensionSuffix(files[i].getPath().getName());
+    }
+    return result;
+  }
+
+  @Override
+  protected FSDataInputStream openForInput(String name) throws IOException {
+    return _fileSystem.open(getDataPath(name));
+  }
+
+  @Override
+  protected boolean exists(String name) throws IOException {
+    return _fileSystem.exists(getLinkPath(name));
+  }
+
+  @Override
+  protected void delete(String name) throws IOException {
+    _fileSystem.delete(getLinkPath(name), true);
+  }
+
+  @Override
+  protected long length(String name) throws IOException {
+    return _fileSystem.getFileStatus(getDataPath(name)).getLen();
+  }
+
+  @Override
+  protected long fileModified(String name) throws IOException {
+    return _fileSystem.getFileStatus(getDataPath(name)).getModificationTime();
+  }
+
+  @Override
+  public void copy(Directory to, String src, String dest, IOContext context) throws IOException
{
+    if (to instanceof DirectoryDecorator) {
+      copy(((DirectoryDecorator) to).getOriginalDirectory(), src, dest, context);
+      return;
+    } else if (to instanceof SoftlinkHdfsDirectory) {
+      LOG.warn("This needs to be tested....");
+      SoftlinkHdfsDirectory softlinkHdfsDirectory = (SoftlinkHdfsDirectory) to;
+      if (canQuickCopy(softlinkHdfsDirectory, src)) {
+        Path linkPath = getLinkPath(src);
+        softlinkHdfsDirectory.quickCopy(linkPath, dest);
+      }
+    }
+    slowCopy(to, src, dest, context);
+  }
+
+  private void quickCopy(Path linkPath, String dest) throws IOException {
+    URI uri = getUri(linkPath);
+    createLinkForNewFile(dest, uri);
+  }
+
+  private boolean canQuickCopy(SoftlinkHdfsDirectory to, String src) throws IOException {
+    if (to._storePath.equals(_storePath)) {
+      return _fileSystem.exists(createLinkPath(src));
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/862691aa/blur-store/src/test/java/org/apache/blur/store/BaseDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/BaseDirectoryTest.java b/blur-store/src/test/java/org/apache/blur/store/BaseDirectoryTest.java
new file mode 100644
index 0000000..666dcfe
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/BaseDirectoryTest.java
@@ -0,0 +1,273 @@
+package org.apache.blur.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.blur.lucene.LuceneVersionConstant;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.IntField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.NumericRangeQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RAMDirectory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public abstract class BaseDirectoryTest {
+  protected static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "/tmp"));
+
+  protected static final int MAX_NUMBER_OF_WRITES = 10000;
+  protected static final int MIN_FILE_SIZE = 100;
+  protected static final int MAX_FILE_SIZE = 100000;
+  protected static final int MIN_BUFFER_SIZE = 1;
+  protected static final int MAX_BUFFER_SIZE = 5000;
+  protected static final int MAX_NUMBER_OF_READS = 10000;
+  protected Directory directory;
+  protected File file;
+  protected long seed;
+  protected Random random;
+
+  @Before
+  public void setUp() throws IOException {
+    BufferStore.init(128, 128);
+    file = new File(TMPDIR, "hdfsdirectorytest");
+    rm(file);
+    seed = new Random().nextLong();
+    random = new Random(seed);
+    setupDirectory();
+  }
+
+  @After
+  public void tearDown() {
+    print(file, "");
+  }
+
+  private void print(File f, String buf) {
+    if (f.isDirectory()) {
+      System.out.println(buf + "\\" + f.getName());
+      for (File fl : f.listFiles()) {
+        if (fl.getName().startsWith(".")) {
+          continue;
+        }
+        print(fl, buf + " ");
+      }
+    } else {
+      System.out.println(buf + f.getName() + " " + f.length());
+    }
+  }
+
+  protected abstract void setupDirectory() throws IOException;
+
+  @Test
+  public void testWritingAndReadingAFile() throws IOException {
+
+    IndexOutput output = directory.createOutput("testing.test", IOContext.DEFAULT);
+    output.writeInt(12345);
+    output.flush();
+    output.close();
+
+    IndexInput input = directory.openInput("testing.test", IOContext.DEFAULT);
+    assertEquals(12345, input.readInt());
+    input.close();
+
+    String[] listAll = directory.listAll();
+    assertEquals(1, listAll.length);
+    assertEquals("testing.test", listAll[0]);
+
+    assertEquals(4, directory.fileLength("testing.test"));
+
+    IndexInput input1 = directory.openInput("testing.test", IOContext.DEFAULT);
+
+    IndexInput input2 = (IndexInput) input1.clone();
+    assertEquals(12345, input2.readInt());
+    input2.close();
+
+    assertEquals(12345, input1.readInt());
+    input1.close();
+
+    assertFalse(directory.fileExists("testing.test.other"));
+    assertTrue(directory.fileExists("testing.test"));
+    directory.deleteFile("testing.test");
+    assertFalse(directory.fileExists("testing.test"));
+  }
+
+  @Test
+  public void testEOF() throws IOException {
+    Directory fsDir = new RAMDirectory();
+    String name = "test.eof";
+    createFile(name, fsDir, directory);
+    long fsLength = fsDir.fileLength(name);
+    long hdfsLength = directory.fileLength(name);
+    assertEquals(fsLength, hdfsLength);
+    testEof(name, fsDir, fsLength);
+    testEof(name, directory, hdfsLength);
+  }
+
+  private void testEof(String name, Directory directory, long length) throws IOException
{
+    IndexInput input = directory.openInput(name, IOContext.DEFAULT);
+    input.seek(length);
+    try {
+      input.readByte();
+      fail("should throw eof");
+    } catch (IOException e) {
+    }
+  }
+
+  @Test
+  public void testWrites() throws IOException {
+    int i = 0;
+    try {
+      Set<String> names = new HashSet<String>();
+      for (; i < 10; i++) {
+        Directory fsDir = new RAMDirectory();
+        String name = getName();
+        System.out.println("Working on pass [" + i + "] seed [" + seed + "] contains [" +
names.contains(name) + "]");
+        names.add(name);
+        createFile(name, fsDir, directory);
+        assertInputsEquals(name, fsDir, directory);
+        fsDir.close();
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("Test failed with seed [" + seed + "] on pass [" + i + "]");
+    }
+  }
+
+  @Test
+  public void testCreateIndex() throws IOException {
+    IndexWriterConfig conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION,
new KeywordAnalyzer());
+    IndexWriter writer = new IndexWriter(directory, conf);
+    int numDocs = 1000;
+    DirectoryReader reader = null;
+    for (int i = 0; i < 100; i++) {
+      if (reader == null) {
+        reader = DirectoryReader.open(writer, true);
+      } else {
+        DirectoryReader old = reader;
+        reader = DirectoryReader.openIfChanged(old, writer, true);
+        if (reader == null) {
+          reader = old;
+        } else {
+          old.close();
+        }
+      }
+      assertEquals(i * numDocs, reader.numDocs());
+      IndexSearcher searcher = new IndexSearcher(reader);
+      NumericRangeQuery<Integer> query = NumericRangeQuery.newIntRange("id", 42, 42,
true, true);
+      TopDocs topDocs = searcher.search(query, 10);
+      assertEquals(i, topDocs.totalHits);
+      addDocuments(writer, numDocs);
+    }
+    writer.close(false);
+    reader.close();
+  }
+
+  private void addDocuments(IndexWriter writer, int numDocs) throws IOException {
+    for (int i = 0; i < numDocs; i++) {
+      writer.addDocument(getDoc(i));
+    }
+  }
+
+  private Document getDoc(int i) {
+    Document document = new Document();
+    document.add(new IntField("id", i, Store.YES));
+    return document;
+  }
+
+  private void assertInputsEquals(String name, Directory fsDir, Directory hdfs) throws IOException
{
+    int reads = random.nextInt(MAX_NUMBER_OF_READS);
+    IndexInput fsInput = fsDir.openInput(name, IOContext.DEFAULT);
+    IndexInput hdfsInput = hdfs.openInput(name, IOContext.DEFAULT);
+    assertEquals(fsInput.length(), hdfsInput.length());
+    int fileLength = (int) fsInput.length();
+    for (int i = 0; i < reads; i++) {
+      byte[] fsBuf = new byte[random.nextInt(Math.min(MAX_BUFFER_SIZE - MIN_BUFFER_SIZE,
fileLength)) + MIN_BUFFER_SIZE];
+      byte[] hdfsBuf = new byte[fsBuf.length];
+      int offset = random.nextInt(fsBuf.length);
+      int length = random.nextInt(fsBuf.length - offset);
+      int pos = random.nextInt(fileLength - length);
+      fsInput.seek(pos);
+      fsInput.readBytes(fsBuf, offset, length);
+      hdfsInput.seek(pos);
+      hdfsInput.readBytes(hdfsBuf, offset, length);
+      for (int f = offset; f < length; f++) {
+        if (fsBuf[f] != hdfsBuf[f]) {
+          fail();
+        }
+      }
+    }
+    fsInput.close();
+    hdfsInput.close();
+  }
+
+  private void createFile(String name, Directory fsDir, Directory hdfs) throws IOException
{
+    int writes = random.nextInt(MAX_NUMBER_OF_WRITES);
+    int fileLength = random.nextInt(MAX_FILE_SIZE - MIN_FILE_SIZE) + MIN_FILE_SIZE;
+    IndexOutput fsOutput = fsDir.createOutput(name, IOContext.DEFAULT);
+    fsOutput.setLength(fileLength);
+    IndexOutput hdfsOutput = hdfs.createOutput(name, IOContext.DEFAULT);
+    hdfsOutput.setLength(fileLength);
+    for (int i = 0; i < writes; i++) {
+      byte[] buf = new byte[random.nextInt(Math.min(MAX_BUFFER_SIZE - MIN_BUFFER_SIZE, fileLength))
+ MIN_BUFFER_SIZE];
+      random.nextBytes(buf);
+      int offset = random.nextInt(buf.length);
+      int length = random.nextInt(buf.length - offset);
+      fsOutput.writeBytes(buf, offset, length);
+      hdfsOutput.writeBytes(buf, offset, length);
+    }
+    fsOutput.close();
+    hdfsOutput.close();
+  }
+
+  private String getName() {
+    return Long.toString(Math.abs(random.nextLong()));
+  }
+
+  public static void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/862691aa/blur-store/src/test/java/org/apache/blur/store/HdfsDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/HdfsDirectoryTest.java b/blur-store/src/test/java/org/apache/blur/store/HdfsDirectoryTest.java
index fba2a42..26501c8 100644
--- a/blur-store/src/test/java/org/apache/blur/store/HdfsDirectoryTest.java
+++ b/blur-store/src/test/java/org/apache/blur/store/HdfsDirectoryTest.java
@@ -16,191 +16,26 @@ package org.apache.blur.store;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
 
-import org.apache.blur.store.buffer.BufferStore;
 import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IOContext;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.RAMDirectory;
-import org.junit.Before;
 import org.junit.Test;
 
-public class HdfsDirectoryTest {
-  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "/tmp"));
-
-  private static final int MAX_NUMBER_OF_WRITES = 10000;
-  private static final int MIN_FILE_SIZE = 100;
-  private static final int MAX_FILE_SIZE = 100000;
-  private static final int MIN_BUFFER_SIZE = 1;
-  private static final int MAX_BUFFER_SIZE = 5000;
-  private static final int MAX_NUMBER_OF_READS = 10000;
-  private HdfsDirectory directory;
-  private File file;
-  private long seed;
-  private Random random;
+public class HdfsDirectoryTest extends BaseDirectoryTest {
 
-  @Before
-  public void setUp() throws IOException {
-    BufferStore.init(128, 128);
-    file = new File(TMPDIR, "hdfsdirectorytest");
-    rm(file);
+  @Override
+  protected void setupDirectory() throws IOException {
     URI uri = new File(file, "hdfs").toURI();
     Path hdfsDirPath = new Path(uri.toString());
     Configuration conf = new Configuration();
     directory = new HdfsDirectory(conf, hdfsDirPath);
-    seed = new Random().nextLong();
-    random = new Random(seed);
-  }
-
-  @Test
-  public void testWritingAndReadingAFile() throws IOException {
-
-    IndexOutput output = directory.createOutput("testing.test", IOContext.DEFAULT);
-    output.writeInt(12345);
-    output.flush();
-    output.close();
-
-    IndexInput input = directory.openInput("testing.test", IOContext.DEFAULT);
-    assertEquals(12345, input.readInt());
-    input.close();
-
-    String[] listAll = directory.listAll();
-    assertEquals(1, listAll.length);
-    assertEquals("testing.test", listAll[0]);
-
-    assertEquals(4, directory.fileLength("testing.test"));
-
-    IndexInput input1 = directory.openInput("testing.test", IOContext.DEFAULT);
-
-    IndexInput input2 = (IndexInput) input1.clone();
-    assertEquals(12345, input2.readInt());
-    input2.close();
-
-    assertEquals(12345, input1.readInt());
-    input1.close();
-
-    assertFalse(directory.fileExists("testing.test.other"));
-    assertTrue(directory.fileExists("testing.test"));
-    directory.deleteFile("testing.test");
-    assertFalse(directory.fileExists("testing.test"));
-  }
-
-  @Test
-  public void testEOF() throws IOException {
-    Directory fsDir = new RAMDirectory();
-    String name = "test.eof";
-    createFile(name, fsDir, directory);
-    long fsLength = fsDir.fileLength(name);
-    long hdfsLength = directory.fileLength(name);
-    assertEquals(fsLength, hdfsLength);
-    testEof(name, fsDir, fsLength);
-    testEof(name, directory, hdfsLength);
-  }
-
-  private void testEof(String name, Directory directory, long length) throws IOException
{
-    IndexInput input = directory.openInput(name, IOContext.DEFAULT);
-    input.seek(length);
-    try {
-      input.readByte();
-      fail("should throw eof");
-    } catch (IOException e) {
-    }
   }
 
   @Test
-  public void testWrites() throws IOException {
-    int i = 0;
-    try {
-      Set<String> names = new HashSet<String>();
-      for (; i < 10; i++) {
-        Directory fsDir = new RAMDirectory();
-        String name = getName();
-        System.out.println("Working on pass [" + i + "] seed [" + seed + "] contains [" +
names.contains(name) + "]");
-        names.add(name);
-        createFile(name, fsDir, directory);
-        assertInputsEquals(name, fsDir, directory);
-        fsDir.close();
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail("Test failed with seed [" + seed + "] on pass [" + i + "]");
-    }
-  }
-
-  private void assertInputsEquals(String name, Directory fsDir, HdfsDirectory hdfs) throws
IOException {
-    int reads = random.nextInt(MAX_NUMBER_OF_READS);
-    IndexInput fsInput = fsDir.openInput(name, IOContext.DEFAULT);
-    IndexInput hdfsInput = hdfs.openInput(name, IOContext.DEFAULT);
-    assertEquals(fsInput.length(), hdfsInput.length());
-    int fileLength = (int) fsInput.length();
-    for (int i = 0; i < reads; i++) {
-      byte[] fsBuf = new byte[random.nextInt(Math.min(MAX_BUFFER_SIZE - MIN_BUFFER_SIZE,
fileLength)) + MIN_BUFFER_SIZE];
-      byte[] hdfsBuf = new byte[fsBuf.length];
-      int offset = random.nextInt(fsBuf.length);
-      int length = random.nextInt(fsBuf.length - offset);
-      int pos = random.nextInt(fileLength - length);
-      fsInput.seek(pos);
-      fsInput.readBytes(fsBuf, offset, length);
-      hdfsInput.seek(pos);
-      hdfsInput.readBytes(hdfsBuf, offset, length);
-      for (int f = offset; f < length; f++) {
-        if (fsBuf[f] != hdfsBuf[f]) {
-          fail();
-        }
-      }
-    }
-    fsInput.close();
-    hdfsInput.close();
-  }
-
-  private void createFile(String name, Directory fsDir, HdfsDirectory hdfs) throws IOException
{
-    int writes = random.nextInt(MAX_NUMBER_OF_WRITES);
-    int fileLength = random.nextInt(MAX_FILE_SIZE - MIN_FILE_SIZE) + MIN_FILE_SIZE;
-    IndexOutput fsOutput = fsDir.createOutput(name, IOContext.DEFAULT);
-    fsOutput.setLength(fileLength);
-    IndexOutput hdfsOutput = hdfs.createOutput(name, IOContext.DEFAULT);
-    hdfsOutput.setLength(fileLength);
-    for (int i = 0; i < writes; i++) {
-      byte[] buf = new byte[random.nextInt(Math.min(MAX_BUFFER_SIZE - MIN_BUFFER_SIZE, fileLength))
+ MIN_BUFFER_SIZE];
-      random.nextBytes(buf);
-      int offset = random.nextInt(buf.length);
-      int length = random.nextInt(buf.length - offset);
-      fsOutput.writeBytes(buf, offset, length);
-      hdfsOutput.writeBytes(buf, offset, length);
-    }
-    fsOutput.close();
-    hdfsOutput.close();
-  }
-
-  private String getName() {
-    return Long.toString(Math.abs(random.nextLong()));
-  }
-
-  public static void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
+  public void runsTheTests() {}
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/862691aa/blur-store/src/test/java/org/apache/blur/store/SoftlinkHdfsDirectoryTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/SoftlinkHdfsDirectoryTest.java
b/blur-store/src/test/java/org/apache/blur/store/SoftlinkHdfsDirectoryTest.java
new file mode 100644
index 0000000..0ac2735
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/SoftlinkHdfsDirectoryTest.java
@@ -0,0 +1,43 @@
+package org.apache.blur.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.blur.store.hdfs.SoftlinkHdfsDirectory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+public class SoftlinkHdfsDirectoryTest extends BaseDirectoryTest {
+
+  @Override
+  protected void setupDirectory() throws IOException {
+    URI uri = new File(file, "hdfs").toURI();
+    Path hdfsDirPath = new Path(uri.toString());
+    Path store = new Path(hdfsDirPath, "store");
+    Path link = new Path(hdfsDirPath, "link");
+    Configuration conf = new Configuration();
+    directory = new SoftlinkHdfsDirectory(conf, store, link);
+  }
+
+  @Test
+  public void runsTheTests() {}
+  
+}


Mime
View raw message