incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Adding a new shard collapse utility.
Date Wed, 08 May 2013 13:30:30 GMT
Adding a new shard collapse utility.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/27912aef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/27912aef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/27912aef

Branch: refs/heads/0.1.5
Commit: 27912aef75c21f105825e3b5118173bbab9567b1
Parents: 4c99b46
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed May 8 09:30:12 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed May 8 09:30:12 2013 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/blur/utils/BlurUtil.java  |    4 +
 .../blur/utils/TableShardCountCollapser.java       |  164 +++++++++++++++
 .../blur/utils/TableShardCountCollapserTest.java   |  117 ++++++++++
 .../org/apache/blur/store/hdfs/HdfsDirectory.java  |    4 +-
 4 files changed, 287 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27912aef/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index 25c5d12..d75ee50 100644
--- a/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -442,6 +442,10 @@ public class BlurUtil {
     }
   }
 
+  public static String getShardName(int id) {
+    return getShardName(BlurConstants.SHARD_PREFIX, id);
+  }
+
   public static String getShardName(String prefix, int id) {
     return prefix + buffer(id, 8);
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27912aef/src/blur-core/src/main/java/org/apache/blur/utils/TableShardCountCollapser.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/utils/TableShardCountCollapser.java
b/src/blur-core/src/main/java/org/apache/blur/utils/TableShardCountCollapser.java
new file mode 100644
index 0000000..f45be2b
--- /dev/null
+++ b/src/blur-core/src/main/java/org/apache/blur/utils/TableShardCountCollapser.java
@@ -0,0 +1,164 @@
+package org.apache.blur.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Version;
+
+/**
+ * This class is used to reduce the total number of shards of a table. The main
+ * use would be if during an indexing job the number of reducers were increased
+ * to make indexing faster, but the total number of shards in the table needed
+ * to be smaller. This utility safely collapses indexes together thus reducing
+ * the total number of shards in the table.
+ * 
+ * For example if you wanted to run 1024 reducers but only wanted to run 128
+ * shards in a table. After the bulk map reducer job finishes, this utility
+ * could be executed:
+ * 
+ * TableShardCountCollapser <hdfs path> 128
+ * 
+ * The result would be 128 shards in the table path.
+ * 
+ */
+public class TableShardCountCollapser extends Configured implements Tool {
+
+  public static void main(String[] args) throws Exception {
+    // Let ToolRunner handle generic command-line options
+    int res = ToolRunner.run(new Configuration(), new TableShardCountCollapser(), args);
+    System.exit(res);
+  }
+
+  private Path path;
+
+  @Override
+  public int run(String[] args) throws Exception {
+    // prompt to make sure the table is not enabled
+
+    Path path = new Path(args[0]);
+    int count = Integer.parseInt(args[1]);
+    setTablePath(path);
+    collapseShardsTo(count);
+    return 0;
+  }
+
+  public boolean validateCount(int count) throws IOException {
+    if (getCollapsePossibilities().contains(count)) {
+      return true;
+    }
+    return false;
+  }
+
+  public void setTablePath(Path path) {
+    this.path = path;
+  }
+
+  public List<Integer> getCollapsePossibilities() throws IOException {
+    FileSystem fileSystem = path.getFileSystem(getConf());
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    SortedSet<String> shards = new TreeSet<String>();
+    for (FileStatus status : listStatus) {
+      Path shardPath = status.getPath();
+      if (shardPath.getName().startsWith(BlurConstants.SHARD_PREFIX)) {
+        shards.add(shardPath.getName());
+      }
+    }
+    validateShards(shards);
+    List<Integer> result = getFactors(shards.size());
+    return result;
+  }
+
+  private List<Integer> getFactors(int size) {
+    List<Integer> result = new ArrayList<Integer>();
+    for (int i = 1; i < size; i++) {
+      if (size % i == 0) {
+        result.add(i);
+      }
+    }
+    return result;
+  }
+
+  private void validateShards(SortedSet<String> shards) {
+    int count = shards.size();
+    for (int i = 0; i < count; i++) {
+      if (!shards.contains(BlurUtil.getShardName(i))) {
+        throw new RuntimeException("Invalid table");
+      }
+    }
+  }
+
+  public void collapseShardsTo(int newShardCount) throws IOException {
+    if (!validateCount(newShardCount)) {
+      throw new RuntimeException("Count [" + newShardCount + "] is not valid, valid values
are ["
+          + getCollapsePossibilities() + "]");
+    }
+
+    Path[] paths = getPaths();
+    int numberOfShardsToMergePerPass = paths.length / newShardCount;
+    for (int i = 0; i < newShardCount; i++) {
+      System.out.println("Base Index [" + paths[i] + "]");
+      IndexWriterConfig lconf = new IndexWriterConfig(Version.LUCENE_42, new KeywordAnalyzer());
+      HdfsDirectory dir = new HdfsDirectory(getConf(), paths[i]);
+      IndexWriter indexWriter = new IndexWriter(dir, lconf);
+      Directory[] dirs = new Directory[numberOfShardsToMergePerPass - 1];
+      Path[] pathsToDelete = new Path[numberOfShardsToMergePerPass - 1];
+      for (int p = 1; p < numberOfShardsToMergePerPass; p++) {
+        Path pathToMerge = paths[i + p * newShardCount];
+        System.out.println("Merge [" + pathToMerge + "]");
+        dirs[p - 1] = new HdfsDirectory(getConf(), pathToMerge);
+        pathsToDelete[p - 1] = pathToMerge;
+      }
+      indexWriter.addIndexes(dirs);
+      indexWriter.close();
+      FileSystem fileSystem = path.getFileSystem(getConf());
+      for (Path p : pathsToDelete) {
+        fileSystem.delete(p, true);
+      }
+    }
+  }
+
+  private Path[] getPaths() throws IOException {
+    FileSystem fileSystem = path.getFileSystem(getConf());
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    SortedSet<Path> shards = new TreeSet<Path>();
+    for (FileStatus status : listStatus) {
+      Path shardPath = status.getPath();
+      if (shardPath.getName().startsWith(BlurConstants.SHARD_PREFIX)) {
+        shards.add(shardPath);
+      }
+    }
+    return shards.toArray(new Path[shards.size()]);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27912aef/src/blur-core/src/test/java/org/apache/blur/utils/TableShardCountCollapserTest.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/test/java/org/apache/blur/utils/TableShardCountCollapserTest.java
b/src/blur-core/src/test/java/org/apache/blur/utils/TableShardCountCollapserTest.java
new file mode 100644
index 0000000..79361d8
--- /dev/null
+++ b/src/blur-core/src/test/java/org/apache/blur/utils/TableShardCountCollapserTest.java
@@ -0,0 +1,117 @@
+package org.apache.blur.utils;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.lucene.analysis.core.KeywordAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.IntField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.util.Version;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TableShardCountCollapserTest {
+
+  private static final int NUMBER_OF_BASE_SHARDS = 128;
+  private Configuration configuration;
+  private Path path;
+
+  @Before
+  public void setup() throws IOException {
+    BufferStore.init(128, 128);
+    configuration = new Configuration();
+    path = new Path("./target/tmp-shards-for-testing");
+    FileSystem fileSystem = path.getFileSystem(configuration);
+    fileSystem.delete(path, true);
+    createShards(NUMBER_OF_BASE_SHARDS);
+  }
+
+  private void createShards(int shardCount) throws IOException {
+    for (int i = 0; i < shardCount; i++) {
+      String shardName = BlurUtil.getShardName(i);
+      createShard(configuration, i, new Path(path, shardName), shardCount);
+    }
+  }
+
+  @Test
+  public void testShardCountReducer() throws IOException {
+    assertData(NUMBER_OF_BASE_SHARDS);
+    TableShardCountCollapser t = new TableShardCountCollapser();
+    t.setConf(configuration);
+    t.setTablePath(path);
+    int totalShardCount = 4;
+    t.collapseShardsTo(totalShardCount);
+    assertData(totalShardCount);
+  }
+
+  private void assertData(int totalShardCount) throws IOException {
+    Partitioner<IntWritable, IntWritable> partitioner = new HashPartitioner<IntWritable,
IntWritable>();
+    for (int i = 0; i < totalShardCount; i++) {
+      HdfsDirectory directory = new HdfsDirectory(configuration, new Path(path, BlurUtil.getShardName(i)));
+      DirectoryReader reader = DirectoryReader.open(directory);
+      int numDocs = reader.numDocs();
+      for (int d = 0; d < numDocs; d++) {
+        Document document = reader.document(d);
+        IndexableField field = document.getField("id");
+        Integer id = (Integer) field.numericValue();
+        int partition = partitioner.getPartition(new IntWritable(id), null, totalShardCount);
+        assertEquals(i, partition);
+      }
+      reader.close();
+    }
+  }
+
+  private static void createShard(Configuration configuration, int i, Path path, int totalShardCount)
+      throws IOException {
+    HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, path);
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_42, new KeywordAnalyzer());
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+    IndexWriter indexWriter = new IndexWriter(hdfsDirectory, conf);
+
+    Partitioner<IntWritable, IntWritable> partitioner = new HashPartitioner<IntWritable,
IntWritable>();
+    int partition = partitioner.getPartition(new IntWritable(i), null, totalShardCount);
+    assertEquals(i, partition);
+
+    Document doc = getDoc(i);
+    indexWriter.addDocument(doc);
+    indexWriter.close();
+  }
+
+  private static Document getDoc(int i) {
+    Document document = new Document();
+    document.add(new IntField("id", i, Store.YES));
+    return document;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/27912aef/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index a3e11f8..f9cbefa 100644
--- a/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/src/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -328,7 +328,7 @@ public class HdfsDirectory extends Directory {
 
   private boolean quickMove(Directory to, String src, String dest, IOContext context) throws
IOException {
     HdfsDirectory simpleTo = (HdfsDirectory) to;
-    if (ifSameCluster(to, this)) {
+    if (ifSameCluster(simpleTo, this)) {
       Path newDest = simpleTo.getPath(dest);
       Path oldSrc = getPath(src);
       renameCounter.incrementAndGet();
@@ -337,7 +337,7 @@ public class HdfsDirectory extends Directory {
     return false;
   }
 
-  private boolean ifSameCluster(Directory to, HdfsDirectory simpleHDFSDirectory) {
+  private boolean ifSameCluster(HdfsDirectory dest, HdfsDirectory src) {
     // @TODO
     return true;
   }


Mime
View raw message