incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject incubator-blur git commit: Changes to allow for different read phases based on either normal usage (search) or stream usage (merge). Currently the feature is disabled.
Date Thu, 13 Nov 2014 03:59:04 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master ac7aaacd2 -> ffc39d19d


Changes to allow for different read phases based on either normal usage (search) or stream
usage (merge).  Currently the feature is disabled.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/ffc39d19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/ffc39d19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/ffc39d19

Branch: refs/heads/master
Commit: ffc39d19dd2285df735f20f831c5e26226113b35
Parents: ac7aaac
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Nov 12 22:58:53 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Nov 12 22:58:53 2014 -0500

----------------------------------------------------------------------
 .../writer/BlurIndexSimpleWriterTest.java       | 23 ++++--
 .../SharedMergeSchedulerThroughputTest.java     | 83 ++++++++++++++++++++
 .../apache/blur/store/hdfs/HdfsDirectory.java   | 62 +++++++++++----
 .../store/hdfs/HdfsRandomAccessIndexInput.java  | 20 ++++-
 .../blur/store/hdfs/HdfsStreamIndexInput.java   | 26 ++----
 5 files changed, 170 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ffc39d19/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
index c5efdb4..c2d3b15 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
@@ -36,6 +36,8 @@ import org.apache.blur.concurrent.Executors;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
+import org.apache.blur.store.hdfs.BlurLockFactory;
+import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.Column;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.RecordMutation;
@@ -49,11 +51,13 @@ import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.TraceCollector;
 import org.apache.blur.trace.TraceStorage;
 import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.LockFactory;
 import org.json.JSONException;
 import org.junit.After;
 import org.junit.Before;
@@ -115,7 +119,14 @@ public class BlurIndexSimpleWriterTest {
     TableContext tableContext = TableContext.create(tableDescriptor);
     File path = new File(_base, "index_" + uuid);
     path.mkdirs();
-    FSDirectory directory = FSDirectory.open(path);
+
+    Path hdfsPath = new Path(path.toURI());
+    HdfsDirectory directory = new HdfsDirectory(_configuration, hdfsPath);
+    BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsPath, "unit-test",
BlurUtil.getPid());
+    directory.setLockFactory(lockFactory);
+
+    // FSDirectory directory = FSDirectory.open(path);
+
     ShardContext shardContext = ShardContext.create(tableContext, "test-shard-" + uuid);
     _writer = new BlurIndexSimpleWriter(shardContext, directory, _mergeScheduler, _service,
_closer, _timer);
   }
@@ -192,11 +203,11 @@ public class BlurIndexSimpleWriterTest {
 
       @Override
       public void store(TraceCollector collector) {
-        try {
-          System.out.println(collector.toJsonObject());
-        } catch (JSONException e) {
-          e.printStackTrace();
-        }
+//        try {
+//          System.out.println(collector.toJsonObject().toString(1));
+//        } catch (JSONException e) {
+//          e.printStackTrace();
+//        }
       }
     });
     Trace.setupTrace("test");

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ffc39d19/blur-core/src/test/java/org/apache/blur/manager/writer/SharedMergeSchedulerThroughputTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/SharedMergeSchedulerThroughputTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/SharedMergeSchedulerThroughputTest.java
new file mode 100644
index 0000000..630cda2
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/SharedMergeSchedulerThroughputTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.writer;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Random;
+
+import org.apache.blur.BlurConfiguration;
+import org.apache.blur.MiniCluster;
+import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Version;
+import org.junit.Test;
+
+public class SharedMergeSchedulerThroughputTest {
+
+//  @Test
+  public void test() throws IOException {
+    MiniCluster miniCluster = new MiniCluster();
+    miniCluster.startDfs("./tmp/hdfs");
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_43, new StandardAnalyzer(Version.LUCENE_43));
+    SharedMergeScheduler sharedMergeScheduler = new SharedMergeScheduler(10);
+    conf.setMergeScheduler(sharedMergeScheduler.getMergeScheduler());
+    Configuration configuration = new Configuration();
+    URI fileSystemUri = miniCluster.getFileSystemUri();
+    Path path = new Path(fileSystemUri.toString() + "/merge-test");
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    fileSystem.delete(path, true);
+    HdfsDirectory directory = new HdfsDirectory(configuration, path);
+    BlurConfiguration blurConfiguration = new BlurConfiguration();
+    BlockCacheDirectoryFactoryV2 factory = new BlockCacheDirectoryFactoryV2(blurConfiguration,
1000000l);
+
+    Directory cdir = factory.newDirectory("t", "s", directory, null);
+    IndexWriter writer = new IndexWriter(cdir, conf);
+    Random random = new Random(1);
+    StringBuilder stringBuilder = new StringBuilder();
+    long s = System.nanoTime();
+    for (int i = 0; i < 250000; i++) {
+      if (i % 5000 == 0) {
+        System.out.println(i);
+      }
+      stringBuilder.setLength(0);
+      for (int w = 0; w < 2000; w++) {
+        stringBuilder.append(Integer.toString(random.nextInt(100000))).append(' ');
+      }
+      Document document = new Document();
+      document.add(new TextField("body", stringBuilder.toString(), Store.YES));
+      writer.addDocument(document);
+    }
+    writer.close(true);
+    sharedMergeScheduler.close();
+    factory.close();
+    long e = System.nanoTime();
+    System.out.println("Total Time [" + (e - s) / 1000000.0 + " ms]");
+    miniCluster.shutdownDfs();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ffc39d19/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index d9c616e..60fc46e 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -37,6 +37,7 @@ import org.apache.blur.log.LogFactory;
 import org.apache.blur.store.blockcache.LastModified;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -83,13 +84,37 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     final long _length;
   }
 
+  static class StreamPair {
+
+    final FSDataInputStream _random;
+    final FSDataInputStream _stream;
+
+    StreamPair(FSDataInputStream random, FSDataInputStream stream) {
+      _random = random;
+      _stream = stream;
+    }
+
+    void close() {
+      IOUtils.closeQuietly(_random);
+      IOUtils.closeQuietly(_stream);
+    }
+
+    FSDataInputStream getInputStream(boolean stream) {
+      if (stream) {
+        return _stream;
+      }
+      return _random;
+    }
+
+  }
+
   protected final Path _path;
   protected final FileSystem _fileSystem;
   protected final MetricsGroup _metricsGroup;
   protected final Map<String, FStat> _fileStatusMap = new ConcurrentHashMap<String,
FStat>();
   protected final Map<String, Boolean> _symlinkMap = new ConcurrentHashMap<String,
Boolean>();
   protected final Map<String, Path> _symlinkPathMap = new ConcurrentHashMap<String,
Path>();
-  protected final Map<Path, FSDataInputStream> _inputMap = new ConcurrentHashMap<Path,
FSDataInputStream>();
+  protected final Map<Path, StreamPair> _inputMap = new ConcurrentHashMap<Path,
StreamPair>();
   protected final boolean _useCache = true;
 
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
@@ -188,7 +213,8 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
         super.close();
         _fileStatusMap.put(name, new FStat(System.currentTimeMillis(), outputStream.getPos()));
         outputStream.close();
-        openForInput(name);
+        openForInput(name, true);
+        openForInput(name, false);
       }
 
       @Override
@@ -202,7 +228,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     Path path = getPath(name);
     Tracer trace = Trace.trace("filesystem - create", Trace.param("path", path));
     try {
-      return _fileSystem.create(path);
+      return _fileSystem.create(path, true, 1024 * 1024);
     } finally {
       trace.done();
     }
@@ -214,23 +240,27 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     if (!fileExists(name)) {
       throw new FileNotFoundException("File [" + name + "] not found.");
     }
-    FSDataInputStream inputStream = openForInput(name);
+    FSDataInputStream inputRandomAccess = openForInput(name, false);
+    FSDataInputStream inputStreamAccess = openForInput(name, true);
     long fileLength = fileLength(name);
-    
-    return new HdfsRandomAccessIndexInput(name, inputStream, fileLength, _metricsGroup, getPath(name));
+    Path path = getPath(name);
+    HdfsStreamIndexInput streamInput = new HdfsStreamIndexInput(inputStreamAccess, fileLength,
_metricsGroup, path);
+    return new HdfsRandomAccessIndexInput(inputRandomAccess, fileLength, _metricsGroup, path,
streamInput);
   }
 
-  protected synchronized FSDataInputStream openForInput(String name) throws IOException {
+  protected synchronized FSDataInputStream openForInput(String name, boolean stream) throws
IOException {
     Path path = getPath(name);
-    FSDataInputStream inputStream = _inputMap.get(path);
-    if (inputStream != null) {
-      return inputStream;
+    StreamPair streamPair = _inputMap.get(path);
+    if (streamPair != null) {
+      return streamPair.getInputStream(stream);
     }
     Tracer trace = Trace.trace("filesystem - open", Trace.param("path", path));
     try {
-      inputStream = _fileSystem.open(path);
-      _inputMap.put(path, inputStream);
-      return inputStream;
+      FSDataInputStream randomInputStream = _fileSystem.open(path);
+      FSDataInputStream streamInputStream = _fileSystem.open(path);
+      streamPair = new StreamPair(randomInputStream, streamInputStream);
+      _inputMap.put(path, streamPair);
+      return streamPair.getInputStream(stream);
     } finally {
       trace.done();
     }
@@ -301,10 +331,10 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
 
   protected void delete(String name) throws IOException {
     Path path = getPathOrSymlink(name);
-    FSDataInputStream inputStream = _inputMap.remove(path);
+    StreamPair streamPair = _inputMap.remove(path);
     Tracer trace = Trace.trace("filesystem - delete", Trace.param("path", path));
-    if (inputStream != null) {
-      inputStream.close();
+    if (streamPair != null) {
+      streamPair.close();
     }
     if (_useCache) {
       _symlinkMap.remove(name);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ffc39d19/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
index 2bfca7e..2db383f 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsRandomAccessIndexInput.java
@@ -23,21 +23,24 @@ import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.IndexInput;
 
 public class HdfsRandomAccessIndexInput extends ReusedBufferedIndexInput {
 
   private final long _length;
-  private FSDataInputStream _inputStream;
+  private final FSDataInputStream _inputStream;
   private final MetricsGroup _metricsGroup;
   private final Path _path;
+  private final HdfsStreamIndexInput _streamInput;
 
-  public HdfsRandomAccessIndexInput(String name, FSDataInputStream inputStream, long length,
MetricsGroup metricsGroup,
-      Path path) throws IOException {
+  public HdfsRandomAccessIndexInput(FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup, Path path,
+      HdfsStreamIndexInput streamInput) throws IOException {
     super("HdfsRandomAccessIndexInput(" + path.toString() + ")");
     _inputStream = inputStream;
     _length = length;
     _metricsGroup = metricsGroup;
     _path = path;
+    _streamInput = streamInput;
   }
 
   @Override
@@ -74,7 +77,16 @@ public class HdfsRandomAccessIndexInput extends ReusedBufferedIndexInput
{
   }
 
   @Override
-  public ReusedBufferedIndexInput clone() {
+  public IndexInput clone() {
+    if (IndexInputMergeUtil.isMergeThread()) {
+      IndexInput clone = _streamInput.clone();
+      try {
+        clone.seek(getFilePointer());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return clone;
+    }
     return (HdfsRandomAccessIndexInput) super.clone();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/ffc39d19/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
index a23fc8d..df83c32 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsStreamIndexInput.java
@@ -18,7 +18,6 @@ package org.apache.blur.store.hdfs;
 
 import java.io.IOException;
 
-import org.apache.blur.utils.BlurConstants;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.store.IndexInput;
@@ -37,8 +36,8 @@ public class HdfsStreamIndexInput extends IndexInput {
 
   private long _postion;
 
-  public HdfsStreamIndexInput(String name, FSDataInputStream inputStream, long length, MetricsGroup
metricsGroup,
-      Path path) throws IOException {
+  public HdfsStreamIndexInput(FSDataInputStream inputStream, long length, MetricsGroup metricsGroup,
Path path)
+      throws IOException {
     super("HdfsStreamIndexInput(" + path.toString() + ")");
     _inputStream = inputStream;
     _length = length;
@@ -52,22 +51,17 @@ public class HdfsStreamIndexInput extends IndexInput {
   private void checkPosition() throws IOException {
     long pos = _inputStream.getPos();
     if (pos != _postion) {
-      _inputStream.seek(pos);
+      _inputStream.seek(_postion);
       _readStreamSeek.mark();
     }
   }
 
-  public static boolean isMergeThread() {
-    String name = Thread.currentThread().getName();
-    if (name.startsWith(BlurConstants.SHARED_MERGE_SCHEDULER_PREFIX)) {
-      return true;
-    }
-    return false;
-  }
-
   @Override
   public IndexInput clone() {
-    return super.clone();
+    if (IndexInputMergeUtil.isMergeThread()) {
+      return super.clone();
+    }
+    throw new RuntimeException("who is doing this?");
   }
 
   @Override
@@ -77,11 +71,7 @@ public class HdfsStreamIndexInput extends IndexInput {
 
   @Override
   public long getFilePointer() {
-    try {
-      return _inputStream.getPos();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    return _postion;
   }
 
   @Override


Mime
View raw message