incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixing BLUR-436.
Date Wed, 10 Jun 2015 18:29:05 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master bd03573dd -> b724bdf4a


Fixing BLUR-436.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/b724bdf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/b724bdf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/b724bdf4

Branch: refs/heads/master
Commit: b724bdf4af0f998160c6678ca25772d8c1e3eb0f
Parents: bd03573
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Jun 10 14:29:07 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Jun 10 14:29:07 2015 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/BlurInputFormat.java     | 178 +++++++++++++++----
 .../lib/BlurInputFormatSplitCommand.java        |  96 ++++++++++
 .../services/org.apache.blur.command.Commands   |   3 +-
 .../blur/mapreduce/lib/BlurInputFormatTest.java |  17 +-
 4 files changed, 245 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b724bdf4/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
index a0cc7de..ea6d5d4 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
@@ -21,6 +21,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
@@ -29,11 +30,15 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.blur.command.BlurArray;
+import org.apache.blur.command.BlurObject;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
 import org.apache.blur.store.hdfs.DirectoryUtil;
 import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.hadoop.conf.Configuration;
@@ -68,10 +73,101 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException {
     Path[] dirs = getInputPaths(context);
-    List<BlurInputSplit> splits = getSplits(context.getConfiguration(), dirs);
+    List<BlurInputSplit> splits;
+    Configuration configuration = context.getConfiguration();
+    if (isSplitCommandSupported(configuration)) {
+      splits = getSplitsFromCommand(configuration, dirs);
+    } else {
+      splits = getSplits(configuration, dirs);
+    }
     return toList(splits);
   }
 
+  private static List<BlurInputSplit> getSplitsFromCommand(Configuration configuration,
Path[] dirs) throws IOException {
+    String zkConnection = configuration.get(BlurConstants.BLUR_ZOOKEEPER_CONNECTION);
+    Iface client = BlurClient.getClientFromZooKeeperConnectionStr(zkConnection);
+    List<BlurInputSplit> splits = new ArrayList<BlurInputSplit>();
+    for (Path dir : dirs) {
+      Text table = BlurInputFormat.getTableFromPath(configuration, dir);
+      String snapshot = getSnapshotForTable(configuration, table.toString());
+      BlurInputFormatSplitCommand splitCommand = new BlurInputFormatSplitCommand();
+      splitCommand.setSnapshot(snapshot);
+      splitCommand.setTable(table.toString());
+      List<BlurInputSplit> splitsList = toSplitList(splitCommand.run(client));
+      splits.addAll(splitsList);
+    }
+    return splits;
+  }
+
+  private static List<BlurInputSplit> toSplitList(BlurObject bo) {
+    Iterator<String> keys = bo.keys();
+    List<BlurInputSplit> splits = new ArrayList<BlurInputSplit>();
+    while (keys.hasNext()) {
+      String shard = keys.next();
+      BlurArray blurArray = bo.getBlurArray(shard);
+      splits.addAll(toSplits(blurArray));
+    }
+    return splits;
+  }
+
+  public static List<BlurInputSplit> toSplits(BlurArray blurArray) {
+    List<BlurInputSplit> splits = new ArrayList<BlurInputSplit>();
+    for (int i = 0; i < blurArray.length(); i++) {
+      BlurObject blurObject = blurArray.getBlurObject(i);
+      splits.add(toSplit(blurObject));
+    }
+    return splits;
+  }
+
+  public static BlurArray toBlurArray(List<BlurInputSplit> splits) throws IOException
{
+    BlurArray blurArray = new BlurArray();
+    for (BlurInputSplit inputSplit : splits) {
+      blurArray.put(toBlurObject(inputSplit));
+    }
+    return blurArray;
+  }
+
+  private static BlurInputSplit toSplit(BlurObject blurObject) {
+    Path dir = new Path(blurObject.getString("dir"));
+    String segmentsName = blurObject.getString("segmentsName");
+    String segmentInfoName = blurObject.getString("segmentInfoName");
+    long fileLength = blurObject.getLong("fileLength");
+    Text table = new Text(blurObject.getString("table"));
+    List<String> directoryFiles = toStringListFromBlurArray(blurObject.getBlurArray("directoryFiles"));
+    return new BlurInputSplit(dir, segmentsName, segmentInfoName, fileLength, table, directoryFiles);
+  }
+
+  private static List<String> toStringListFromBlurArray(BlurArray blurArray) {
+    List<String> list = new ArrayList<String>();
+    for (int i = 0; i < blurArray.length(); i++) {
+      list.add(blurArray.getString(i));
+    }
+    return list;
+  }
+
+  private static BlurArray toBlurArrayFromStringList(List<String> list) {
+    BlurArray array = new BlurArray();
+    for (String s : list) {
+      array.put(s);
+    }
+    return array;
+  }
+
+  private static BlurObject toBlurObject(BlurInputSplit inputSplit) throws IOException {
+    BlurObject blurObject = new BlurObject();
+    blurObject.put("dir", inputSplit.getDir().toString());
+    blurObject.put("segmentsName", inputSplit.getSegmentsName());
+    blurObject.put("segmentInfoName", inputSplit.getSegmentInfoName());
+    blurObject.put("fileLength", inputSplit.getLength());
+    blurObject.put("table", inputSplit.getTable().toString());
+    blurObject.put("directoryFiles", toBlurArrayFromStringList(inputSplit.getDirectoryFiles()));
+    return blurObject;
+  }
+
+  private boolean isSplitCommandSupported(Configuration configuration) {
+    return configuration.get(BlurConstants.BLUR_ZOOKEEPER_CONNECTION) != null;
+  }
+
   private List<InputSplit> toList(List<BlurInputSplit> splits) {
     List<InputSplit> inputSplits = new ArrayList<InputSplit>();
     for (BlurInputSplit inputSplit : splits) {
@@ -174,45 +270,55 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
   private static List<BlurInputSplit> getSegmentSplits(Path shardDir, Configuration
configuration, Text table,
       Text snapshot) throws IOException {
     final long start = System.nanoTime();
-    List<BlurInputSplit> splits = new ArrayList<BlurInputSplit>();
     Directory directory = getDirectory(configuration, table.toString(), shardDir, null);
     try {
-      SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(configuration,
-          SnapshotIndexDeletionPolicy.getGenerationsPath(shardDir));
+      return getSplitForDirectory(shardDir, configuration, table, snapshot, directory);
+    } finally {
+      directory.close();
+      final long end = System.nanoTime();
+      LOG.info("Found split in shard [{0}] in [{1} ms].", shardDir, (end - start) / 1000000000.0);
+    }
+  }
 
-      Long generation = policy.getGeneration(snapshot.toString());
-      if (generation == null) {
-        throw new IOException("Snapshot [" + snapshot + "] not found in shard [" + shardDir
+ "]");
-      }
+  public static List<BlurInputSplit> getSplitForDirectory(Path shardDir, Configuration
configuration, String table,
+      String snapshot, Directory directory) throws IOException {
+    return getSplitForDirectory(shardDir, configuration, new Text(table), new Text(snapshot),
directory);
+  }
 
-      List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
-      IndexCommit indexCommit = findIndexCommit(listCommits, generation, shardDir);
+  public static List<BlurInputSplit> getSplitForDirectory(Path shardDir, Configuration
configuration, Text table,
+      Text snapshot, Directory directory) throws IOException {
+    List<BlurInputSplit> splits = new ArrayList<BlurInputSplit>();
+    SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(configuration,
+        SnapshotIndexDeletionPolicy.getGenerationsPath(shardDir));
 
-      String segmentsFileName = indexCommit.getSegmentsFileName();
-      SegmentInfos segmentInfos = new SegmentInfos();
-      segmentInfos.read(directory, segmentsFileName);
-      for (SegmentInfoPerCommit commit : segmentInfos) {
-        SegmentInfo segmentInfo = commit.info;
-        if (commit.getDelCount() == segmentInfo.getDocCount()) {
-          LOG.info("Segment [{0}] in dir [{1}] has all records deleted.", segmentInfo.name,
shardDir);
-        } else {
-          String name = segmentInfo.name;
-          Collection<String> files = commit.files();
-          long fileLength = 0;
-          for (String file : files) {
-            fileLength += directory.fileLength(file);
-          }
-          List<String> dirFiles = new ArrayList<String>(files);
-          dirFiles.add(segmentsFileName);
-          splits.add(new BlurInputSplit(shardDir, segmentsFileName, name, fileLength, table,
dirFiles));
+    Long generation = policy.getGeneration(snapshot.toString());
+    if (generation == null) {
+      throw new IOException("Snapshot [" + snapshot + "] not found in shard [" + shardDir
+ "]");
+    }
+
+    List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
+    IndexCommit indexCommit = findIndexCommit(listCommits, generation, shardDir);
+
+    String segmentsFileName = indexCommit.getSegmentsFileName();
+    SegmentInfos segmentInfos = new SegmentInfos();
+    segmentInfos.read(directory, segmentsFileName);
+    for (SegmentInfoPerCommit commit : segmentInfos) {
+      SegmentInfo segmentInfo = commit.info;
+      if (commit.getDelCount() == segmentInfo.getDocCount()) {
+        LOG.info("Segment [{0}] in dir [{1}] has all records deleted.", segmentInfo.name,
shardDir);
+      } else {
+        String name = segmentInfo.name;
+        Collection<String> files = commit.files();
+        long fileLength = 0;
+        for (String file : files) {
+          fileLength += directory.fileLength(file);
         }
+        List<String> dirFiles = new ArrayList<String>(files);
+        dirFiles.add(segmentsFileName);
+        splits.add(new BlurInputSplit(shardDir, segmentsFileName, name, fileLength, table,
dirFiles));
       }
-      return splits;
-    } finally {
-      directory.close();
-      final long end = System.nanoTime();
-      LOG.info("Found split in shard [{0}] in [{1} ms].", shardDir, (end - start) / 1000000000.0);
     }
+    return splits;
   }
 
   private static IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation,
Path shardDir)
@@ -396,4 +502,12 @@ public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord>
{
     HdfsDirectory directory = new HdfsDirectory(configuration, shardDir, null, files);
     return DirectoryUtil.getDirectory(configuration, directory, disableFast, null, table,
shardDir.getName(), true);
   }
+
+  public static void setZooKeeperConnectionStr(Configuration configuration, String zk) {
+    configuration.set(BlurConstants.BLUR_ZOOKEEPER_CONNECTION, zk);
+  }
+
+  public static void setZooKeeperConnectionStr(Job job, String zk) {
+    setZooKeeperConnectionStr(job.getConfiguration(), zk);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b724bdf4/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormatSplitCommand.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormatSplitCommand.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormatSplitCommand.java
new file mode 100644
index 0000000..27cc719
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormatSplitCommand.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.blur.command.BlurArray;
+import org.apache.blur.command.BlurObject;
+import org.apache.blur.command.CombiningContext;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.Location;
+import org.apache.blur.command.annotation.RequiredArgument;
+import org.apache.blur.command.commandtype.ClusterServerReadCommandSingleTable;
+import org.apache.blur.mapreduce.lib.BlurInputFormat.BlurInputSplit;
+import org.apache.blur.server.ShardContext;
+import org.apache.blur.server.TableContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.store.Directory;
+
+public class BlurInputFormatSplitCommand extends ClusterServerReadCommandSingleTable<BlurObject>
{
+
+  @RequiredArgument
+  private String snapshot;
+
+  @Override
+  public BlurObject execute(IndexContext context) throws IOException, InterruptedException
{
+    String shard = context.getShard().getShard();
+    TableContext tableContext = context.getTableContext();
+    ShardContext shardContext = ShardContext.create(tableContext, shard);
+
+    Path shardDir = shardContext.getHdfsDirPath();
+    Configuration configuration = tableContext.getConfiguration();
+    String table = tableContext.getTable();
+    Directory directory = getDirectory(context.getIndexReader());
+
+    List<BlurInputSplit> list = BlurInputFormat.getSplitForDirectory(shardDir, configuration,
table, snapshot,
+        directory);
+    BlurArray splits = BlurInputFormat.toBlurArray(list);
+    BlurObject blurObject = new BlurObject();
+    blurObject.put(shard, splits);
+    return blurObject;
+  }
+
+  private Directory getDirectory(IndexReader indexReader) {
+    DirectoryReader directoryReader = (DirectoryReader) indexReader;
+    return directoryReader.directory();
+  }
+
+  @Override
+  public BlurObject combine(CombiningContext context, Map<? extends Location<?>,
BlurObject> results)
+      throws IOException, InterruptedException {
+    BlurObject blurObject = new BlurObject();
+    for (BlurObject shardSplits : results.values()) {
+      Iterator<String> keys = shardSplits.keys();
+      while (keys.hasNext()) {
+        String key = keys.next();
+        blurObject.put(key, shardSplits.getBlurArray(key));
+      }
+    }
+    return blurObject;
+  }
+
+  @Override
+  public String getName() {
+    return "input-format-split";
+  }
+
+  public String getSnapshot() {
+    return snapshot;
+  }
+
+  public void setSnapshot(String snapshot) {
+    this.snapshot = snapshot;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b724bdf4/blur-mapred/src/main/resources/META-INF/services/org.apache.blur.command.Commands
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/resources/META-INF/services/org.apache.blur.command.Commands
b/blur-mapred/src/main/resources/META-INF/services/org.apache.blur.command.Commands
index f6fc606..6c0330d 100644
--- a/blur-mapred/src/main/resources/META-INF/services/org.apache.blur.command.Commands
+++ b/blur-mapred/src/main/resources/META-INF/services/org.apache.blur.command.Commands
@@ -13,4 +13,5 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-org.apache.blur.mapreduce.lib.update.BulkTableUpdateCommand
\ No newline at end of file
+org.apache.blur.mapreduce.lib.update.BulkTableUpdateCommand
+org.apache.blur.mapreduce.lib.BlurInputFormatSplitCommand
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b724bdf4/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
index f7d5d1e..209a1fa 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
@@ -129,14 +129,6 @@ public class BlurInputFormatTest {
     String tableName = "testBlurInputFormatFastDisabledFileCache";
     Path fileCache = new Path(miniCluster.getFileSystemUri() + "/filecache");
     runTest(tableName, true, fileCache);
-    FileSystem fileSystem = miniCluster.getFileSystem();
-    // @TODO write some assertions.
-    // RemoteIterator<LocatedFileStatus> listFiles =
-    // fileSystem.listFiles(fileCache, true);
-    // while (listFiles.hasNext()) {
-    // LocatedFileStatus locatedFileStatus = listFiles.next();
-    // System.out.println(locatedFileStatus.getPath());
-    // }
   }
 
   @Test
@@ -145,14 +137,6 @@ public class BlurInputFormatTest {
     String tableName = "testBlurInputFormatFastEnabledFileCache";
     Path fileCache = new Path(miniCluster.getFileSystemUri() + "/filecache");
     runTest(tableName, false, fileCache);
-    FileSystem fileSystem = miniCluster.getFileSystem();
-    // @TODO write some assertions.
-    // RemoteIterator<LocatedFileStatus> listFiles =
-    // fileSystem.listFiles(fileCache, true);
-    // while (listFiles.hasNext()) {
-    // LocatedFileStatus locatedFileStatus = listFiles.next();
-    // System.out.println(locatedFileStatus.getPath());
-    // }
   }
 
   private void runTest(String tableName, boolean disableFast, Path fileCache) throws IOException,
BlurException,
@@ -185,6 +169,7 @@ public class BlurInputFormatTest {
       BlurInputFormat.setLocalCachePath(job, fileCache);
     }
 
+    BlurInputFormat.setZooKeeperConnectionStr(job, miniCluster.getZkConnectionString());
     BlurInputFormat.addTable(job, tableDescriptor, snapshot);
     FileOutputFormat.setOutputPath(job, output);
 


Mime
View raw message