incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Updating the blur input format to allow for fast directory use.
Date Sat, 18 Apr 2015 14:27:54 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 77def152b -> 1b7eaf7fd


Updating the blur input format to allow for fast directory use.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/1b7eaf7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/1b7eaf7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/1b7eaf7f

Branch: refs/heads/master
Commit: 1b7eaf7fde14fb9e2934ce742b81ae591f7351ca
Parents: 77def15
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sat Apr 18 10:27:44 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sat Apr 18 10:27:44 2015 -0400

----------------------------------------------------------------------
 .../indexserver/DistributedIndexServer.java     | 21 ++------
 .../blur/mapreduce/lib/BlurInputFormat.java     | 27 +++++++---
 .../blur/mapreduce/lib/BlurInputFormatTest.java | 21 +++++---
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 28 ++++++++---
 .../apache/blur/store/hdfs/DirectoryUtil.java   | 53 ++++++++++++++++++++
 5 files changed, 111 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1b7eaf7f/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index e9d7150..6972abb 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -19,7 +19,6 @@ package org.apache.blur.manager.indexserver;
 import static org.apache.blur.utils.BlurConstants.BLUR_TABLE_DISABLE_FAST_DIR;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -54,10 +53,9 @@ import org.apache.blur.server.TableContext;
 import org.apache.blur.server.cache.ThriftCache;
 import org.apache.blur.store.BlockCacheDirectoryFactory;
 import org.apache.blur.store.hdfs.BlurLockFactory;
+import org.apache.blur.store.hdfs.DirectoryUtil;
 import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.store.hdfs.SequentialReadControl;
-import org.apache.blur.store.hdfs_v2.FastHdfsKeyValueDirectory;
-import org.apache.blur.store.hdfs_v2.JoinDirectory;
 import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurUtil;
@@ -511,22 +509,9 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
     HdfsDirectory longTermStorage = new HdfsDirectory(_configuration, hdfsDirPath, _sequentialReadControl);
     longTermStorage.setLockFactory(lockFactory);
 
-    Directory directory;
-    URI uri = hdfsDirPath.toUri();
-    String scheme = uri.getScheme();
-
     boolean disableFast = tableContext.getBlurConfiguration().getBoolean(BLUR_TABLE_DISABLE_FAST_DIR,
false);
-
-    if (scheme != null && scheme.equals("hdfs") && !disableFast) {
-      LOG.info("Using Fast HDFS directory implementation on shard [{0}] for table [{1}]",
shard, table);
-      FastHdfsKeyValueDirectory shortTermStorage = new FastHdfsKeyValueDirectory(false, _hdfsKeyValueTimer,
-          _configuration, new Path(hdfsDirPath, "fast"));
-      directory = new JoinDirectory(longTermStorage, shortTermStorage);
-    } else {
-      LOG.info("Using regular HDFS directory.");
-      directory = longTermStorage;
-    }
-
+    Directory directory = DirectoryUtil.getDirectory(_configuration, longTermStorage, disableFast,
_hdfsKeyValueTimer,
+        table, shard, false);
     ShardContext shardContext = ShardContext.create(tableContext, shard);
 
     TableDescriptor descriptor = tableContext.getDescriptor();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1b7eaf7f/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
index 36fe2ee..b479774 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Future;
 
 import org.apache.blur.lucene.codec.Blur024Codec;
 import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
+import org.apache.blur.store.hdfs.DirectoryUtil;
 import org.apache.blur.store.hdfs.HdfsDirectory;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
@@ -58,6 +59,7 @@ import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.SegmentInfo;
 import org.apache.lucene.index.SegmentInfoPerCommit;
 import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.util.Bits;
 
@@ -164,8 +166,9 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
 
   private List<InputSplit> getSegmentSplits(Path shardDir, Configuration configuration,
Text table, Text snapshot)
       throws IOException {
+    final long start = System.nanoTime();
     List<InputSplit> splits = new ArrayList<InputSplit>();
-    HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, shardDir);
+    Directory directory = getDirectory(configuration, table.toString(), shardDir);
     try {
       SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(configuration,
           SnapshotIndexDeletionPolicy.getGenerationsPath(shardDir));
@@ -175,12 +178,12 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
         throw new IOException("Snapshot [" + snapshot + "] not found in shard [" + shardDir
+ "]");
       }
 
-      List<IndexCommit> listCommits = DirectoryReader.listCommits(hdfsDirectory);
+      List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
       IndexCommit indexCommit = findIndexCommit(listCommits, generation, shardDir);
 
       String segmentsFileName = indexCommit.getSegmentsFileName();
       SegmentInfos segmentInfos = new SegmentInfos();
-      segmentInfos.read(hdfsDirectory, segmentsFileName);
+      segmentInfos.read(directory, segmentsFileName);
       for (SegmentInfoPerCommit commit : segmentInfos) {
         SegmentInfo segmentInfo = commit.info;
         if (commit.getDelCount() == segmentInfo.getDocCount()) {
@@ -190,14 +193,16 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
           Set<String> files = segmentInfo.files();
           long fileLength = 0;
           for (String file : files) {
-            fileLength += hdfsDirectory.fileLength(file);
+            fileLength += directory.fileLength(file);
           }
           splits.add(new BlurInputSplit(shardDir, segmentsFileName, name, fileLength, table));
         }
       }
       return splits;
     } finally {
-      hdfsDirectory.close();
+      directory.close();
+      final long end = System.nanoTime();
+      LOG.info("Found split in shard [" + shardDir + "] in [" + (end - start) / 1000000000.0
+ " ms].");
     }
   }
 
@@ -225,7 +230,7 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
     private TableBlurRecord _tableBlurRecord;
     private Bits _liveDocs;
     private StoredFieldsReader _fieldsReader;
-    private HdfsDirectory _directory;
+    private Directory _directory;
 
     private int _docId = -1;
     private int _maxDoc;
@@ -243,7 +248,8 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
 
       _table = blurInputSplit.getTable();
 
-      _directory = new HdfsDirectory(configuration, blurInputSplit.getDir());
+      _directory = getDirectory(configuration, _table.toString(), blurInputSplit.getDir());
+
       SegmentInfos segmentInfos = new SegmentInfos();
       segmentInfos.read(_directory, blurInputSplit.getSegmentsName());
       SegmentInfoPerCommit commit = findSegmentInfoPerCommit(segmentInfos, blurInputSplit);
@@ -416,4 +422,11 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
     putSnapshotForTable(job.getConfiguration(), tableName, snapshot);
   }
 
+  public static Directory getDirectory(Configuration configuration, String table, Path shardDir)
throws IOException {
+    Path fastPath = DirectoryUtil.getFastDirectoryPath(shardDir);
+    FileSystem fileSystem = shardDir.getFileSystem(configuration);
+    boolean disableFast = !fileSystem.exists(fastPath);
+    return DirectoryUtil.getDirectory(configuration, new HdfsDirectory(configuration, shardDir),
disableFast, null,
+        table, shardDir.getName(), true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1b7eaf7f/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
index 71cc571..45d482c 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
@@ -109,14 +109,25 @@ public class BlurInputFormatTest {
   }
 
   @Test
-  public void testBlurInputFormat() throws IOException, BlurException, TException, ClassNotFoundException,
+  public void testBlurInputFormatFastDisabled() throws IOException, BlurException, TException,
ClassNotFoundException,
       InterruptedException {
-    String tableName = "testBlurInputFormat";
+    String tableName = "testBlurInputFormatFastDisabled";
+    runTest(tableName, true);
+  }
+
+  @Test
+  public void testBlurInputFormatFastEnabled() throws IOException, BlurException, TException,
ClassNotFoundException,
+      InterruptedException {
+    String tableName = "testBlurInputFormatFastEnabled";
+    runTest(tableName, false);
+  }
 
+  private void runTest(String tableName, boolean disableFast) throws IOException, BlurException,
TException,
+      InterruptedException, ClassNotFoundException {
     FileSystem fileSystem = miniCluster.getFileSystem();
     Path root = new Path(fileSystem.getUri() + "/");
 
-    creatTable(tableName, new Path(root, "tables"), true);
+    creatTable(tableName, new Path(root, "tables"), disableFast);
     loadTable(tableName, 100, 100);
 
     Iface client = getClient();
@@ -132,7 +143,7 @@ public class BlurInputFormatTest {
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(TableBlurRecord.class);
 
-    Path output = new Path(root, "output");
+    Path output = new Path(new Path(root, "output"), tableName);
 
     String snapshot = UUID.randomUUID().toString();
     client.createSnapshot(tableName, snapshot);
@@ -168,7 +179,6 @@ public class BlurInputFormatTest {
       rowId++;
     }
     assertEquals(200, rowId);
-
   }
 
   public interface ResultReader {
@@ -177,7 +187,6 @@ public class BlurInputFormatTest {
 
   }
 
-  @SuppressWarnings("deprecation")
   private void walkOutput(Path output, Configuration conf, ResultReader resultReader) throws
IOException {
     FileSystem fileSystem = output.getFileSystem(conf);
     FileStatus fileStatus = fileSystem.getFileStatus(output);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1b7eaf7f/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index 06fa078..f763ab1 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -186,11 +186,17 @@ public class HdfsKeyValueStore implements Store {
     removeAnyTruncatedFiles();
     loadIndexes();
     cleanupOldFiles();
-    _idleLogTimerTask = getIdleLogTimer();
-    _oldFileCleanerTimerTask = getOldFileCleanerTimer();
-    _hdfsKeyValueTimer = hdfsKeyValueTimer;
-    _hdfsKeyValueTimer.schedule(_idleLogTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
-    _hdfsKeyValueTimer.schedule(_oldFileCleanerTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
+    if (!_readOnly) {
+      _idleLogTimerTask = getIdleLogTimer();
+      _oldFileCleanerTimerTask = getOldFileCleanerTimer();
+      _hdfsKeyValueTimer = hdfsKeyValueTimer;
+      _hdfsKeyValueTimer.schedule(_idleLogTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
+      _hdfsKeyValueTimer.schedule(_oldFileCleanerTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
+    } else {
+      _idleLogTimerTask = null;
+      _oldFileCleanerTimerTask = null;
+      _hdfsKeyValueTimer = null;
+    }
     // Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, HDFS_KV, SIZE,
     // path.getParent().toString()), new Gauge<Long>() {
     // @Override
@@ -467,9 +473,15 @@ public class HdfsKeyValueStore implements Store {
   public void close() throws IOException {
     if (!_isClosed) {
       _isClosed = true;
-      _idleLogTimerTask.cancel();
-      _oldFileCleanerTimerTask.cancel();
-      _hdfsKeyValueTimer.purge();
+      if (_idleLogTimerTask != null) {
+        _idleLogTimerTask.cancel();
+      }
+      if (_oldFileCleanerTimerTask != null) {
+        _oldFileCleanerTimerTask.cancel();
+      }
+      if (_hdfsKeyValueTimer != null) {
+        _hdfsKeyValueTimer.purge();
+      }
       _writeLock.lock();
       try {
         if (isOpenForWriting()) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/1b7eaf7f/blur-store/src/test/java/org/apache/blur/store/hdfs/DirectoryUtil.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs/DirectoryUtil.java b/blur-store/src/test/java/org/apache/blur/store/hdfs/DirectoryUtil.java
new file mode 100644
index 0000000..394c3d9
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs/DirectoryUtil.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import java.io.IOException;
+import java.util.Timer;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.store.hdfs_v2.FastHdfsKeyValueDirectory;
+import org.apache.blur.store.hdfs_v2.JoinDirectory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.Directory;
+
+public class DirectoryUtil {
+
+  private static final Log LOG = LogFactory.getLog(DirectoryUtil.class);
+
+  public static Directory getDirectory(Configuration configuration, HdfsDirectory dir, boolean
disableFast,
+      Timer hdfsKeyValueTimer, String table, String shard, boolean readOnly) throws IOException
{
+    Path hdfsDirPath = dir.getPath();
+    String scheme = hdfsDirPath.toUri().getScheme();
+    if (scheme != null && scheme.equals("hdfs") && !disableFast) {
+      LOG.info("Using Fast HDFS directory implementation on shard [{0}] for table [{1}]",
shard, table);
+      FastHdfsKeyValueDirectory shortTermStorage = new FastHdfsKeyValueDirectory(readOnly,
hdfsKeyValueTimer,
+          configuration, getFastDirectoryPath(hdfsDirPath));
+      return new JoinDirectory(dir, shortTermStorage);
+    } else {
+      LOG.info("Using regular HDFS directory.");
+      return dir;
+    }
+  }
+
+  public static Path getFastDirectoryPath(Path hdfsDirPath) {
+    return new Path(hdfsDirPath, "fast");
+  }
+
+}


Mime
View raw message