incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Inital commit to add an InputFormat to Blur. This input format reads directly from hdfs for bulk reads.
Date Fri, 17 Apr 2015 01:36:26 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 4aa976fd0 -> bca39998f


Inital commit to add an InputFormat to Blur.  This input format reads directly from hdfs for
bulk reads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/bca39998
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/bca39998
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/bca39998

Branch: refs/heads/master
Commit: bca39998fa00a9bd3c0f6e529cee1e4172086181
Parents: 4aa976f
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Apr 16 21:36:16 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Apr 16 21:36:16 2015 -0400

----------------------------------------------------------------------
 .../manager/writer/BlurIndexSimpleWriter.java   |   4 +-
 .../writer/SnapshotIndexDeletionPolicy.java     |  10 +-
 .../test/java/org/apache/blur/MiniCluster.java  |   2 +-
 .../blur/mapreduce/lib/BlurInputFormat.java     | 419 +++++++++++++++++++
 .../apache/blur/mapreduce/lib/BlurRecord.java   |  11 +
 .../blur/mapreduce/lib/TableBlurRecord.java     | 113 +++++
 .../blur/mapreduce/lib/BlurInputFormatTest.java | 251 +++++++++++
 7 files changed, 806 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bca39998/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index fbec370..02a55ae 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -188,8 +188,8 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
     mergePolicy.setUseCompoundFile(false);
     _conf.setMergeScheduler(mergeScheduler.getMergeScheduler());
-    _snapshotIndexDeletionPolicy = new SnapshotIndexDeletionPolicy(_tableContext.getConfiguration(),
new Path(
-        shardContext.getHdfsDirPath(), "generations"));
+    _snapshotIndexDeletionPolicy = new SnapshotIndexDeletionPolicy(_tableContext.getConfiguration(),
+        SnapshotIndexDeletionPolicy.getGenerationsPath(_shardContext.getHdfsDirPath()));
     _policy = new IndexDeletionPolicyReader(_snapshotIndexDeletionPolicy);
     _conf.setIndexDeletionPolicy(_policy);
     BlurConfiguration blurConfiguration = _tableContext.getBlurConfiguration();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bca39998/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
index b3034c2..4dd143c 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
@@ -105,7 +105,7 @@ public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy {
   }
 
   private void cleanupOldFiles() {
-    
+
   }
 
   private String buffer(long number) {
@@ -183,5 +183,13 @@ public class SnapshotIndexDeletionPolicy extends IndexDeletionPolicy
{
   public Path getSnapshotsDirectoryPath() {
     return _path;
   }
+  
+  public Long getGeneration(String name) {
+    return _namesToGenerations.get(name);
+  }
+
+  public static Path getGenerationsPath(Path shardDir) {
+    return new Path(shardDir, "generations");
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bca39998/blur-core/src/test/java/org/apache/blur/MiniCluster.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/MiniCluster.java b/blur-core/src/test/java/org/apache/blur/MiniCluster.java
index 0fd70ae..8db632f 100644
--- a/blur-core/src/test/java/org/apache/blur/MiniCluster.java
+++ b/blur-core/src/test/java/org/apache/blur/MiniCluster.java
@@ -760,7 +760,7 @@ public class MiniCluster {
   }
 
   public URI getFileSystemUri() throws IOException {
-    return cluster.getFileSystem().getUri();
+    return getFileSystem().getUri();
   }
 
   public void startDfs(String path) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bca39998/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
new file mode 100644
index 0000000..36fe2ee
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurInputFormat.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.blur.lucene.codec.Blur024Codec;
+import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.lucene.codecs.StoredFieldsReader;
+import org.apache.lucene.document.DocumentStoredFieldVisitor;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentInfos;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.util.Bits;
+
+public class BlurInputFormat extends FileInputFormat<Text, TableBlurRecord> {
+  private static final Log LOG = LogFactory.getLog(BlurInputFormat.class);
+
+  private static final String BLUR_TABLE_PATH_MAPPING = "blur.table.path.mapping.";
+  private static final String BLUR_INPUT_FORMAT_DISCOVERY_THREADS = "blur.input.format.discovery.threads";
+  private static final String BLUR_TABLE_SNAPSHOT_MAPPING = "blur.table.snapshot.mapping.";
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    Path[] dirs = getInputPaths(context);
+    Configuration configuration = context.getConfiguration();
+    int threads = configuration.getInt(BLUR_INPUT_FORMAT_DISCOVERY_THREADS, 10);
+    ExecutorService service = Executors.newFixedThreadPool(threads);
+    try {
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      for (Path dir : dirs) {
+        Text table = BlurInputFormat.getTableFromPath(configuration, dir);
+        String snapshot = getSnapshotForTable(configuration, table.toString());
+        splits.addAll(getSegmentSplits(dir, service, configuration, table, new Text(snapshot)));
+      }
+      return splits;
+    } finally {
+      service.shutdownNow();
+    }
+  }
+
+  public static void putPathToTable(Configuration configuration, String tableName, Path path)
{
+    configuration.set(BLUR_TABLE_PATH_MAPPING + tableName, path.toString());
+  }
+
+  public static Text getTableFromPath(Configuration configuration, Path path) throws IOException
{
+    for (Entry<String, String> e : configuration) {
+      if (e.getKey().startsWith(BLUR_TABLE_PATH_MAPPING)) {
+        String k = e.getKey();
+        String table = k.substring(BLUR_TABLE_PATH_MAPPING.length());
+        String pathStr = e.getValue();
+        Path tablePath = new Path(pathStr);
+        if (tablePath.equals(path)) {
+          return new Text(table);
+        }
+      }
+    }
+    throw new IOException("Table name not found for path [" + path + "]");
+  }
+
+  public static void putSnapshotForTable(Configuration configuration, String tableName, String
snapshot) {
+    configuration.set(BLUR_TABLE_SNAPSHOT_MAPPING + tableName, snapshot);
+  }
+
+  public static String getSnapshotForTable(Configuration configuration, String tableName)
throws IOException {
+    for (Entry<String, String> e : configuration) {
+      if (e.getKey().startsWith(BLUR_TABLE_SNAPSHOT_MAPPING)) {
+        String k = e.getKey();
+        String table = k.substring(BLUR_TABLE_SNAPSHOT_MAPPING.length());
+        if (table.equals(tableName)) {
+          return e.getValue();
+        }
+      }
+    }
+    throw new IOException("Snaphost not found for table [" + tableName + "]");
+  }
+
+  private List<InputSplit> getSegmentSplits(final Path dir, ExecutorService service,
final Configuration configuration,
+      final Text table, final Text snapshot) throws IOException {
+
+    FileSystem fileSystem = dir.getFileSystem(configuration);
+    FileStatus[] shardDirs = fileSystem.listStatus(dir, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith(BlurConstants.SHARD_PREFIX);
+      }
+    });
+
+    List<Future<List<InputSplit>>> futures = new ArrayList<Future<List<InputSplit>>>();
+    for (final FileStatus shardFileStatus : shardDirs) {
+      futures.add(service.submit(new Callable<List<InputSplit>>() {
+        @Override
+        public List<InputSplit> call() throws Exception {
+          return getSegmentSplits(shardFileStatus.getPath(), configuration, table, snapshot);
+        }
+      }));
+    }
+
+    List<InputSplit> results = new ArrayList<InputSplit>();
+    for (Future<List<InputSplit>> future : futures) {
+      try {
+        results.addAll(future.get());
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof IOException) {
+          throw (IOException) cause;
+        } else {
+          throw new IOException(cause);
+        }
+      }
+    }
+    return results;
+  }
+
+  private List<InputSplit> getSegmentSplits(Path shardDir, Configuration configuration,
Text table, Text snapshot)
+      throws IOException {
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    HdfsDirectory hdfsDirectory = new HdfsDirectory(configuration, shardDir);
+    try {
+      SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(configuration,
+          SnapshotIndexDeletionPolicy.getGenerationsPath(shardDir));
+
+      Long generation = policy.getGeneration(snapshot.toString());
+      if (generation == null) {
+        throw new IOException("Snapshot [" + snapshot + "] not found in shard [" + shardDir
+ "]");
+      }
+
+      List<IndexCommit> listCommits = DirectoryReader.listCommits(hdfsDirectory);
+      IndexCommit indexCommit = findIndexCommit(listCommits, generation, shardDir);
+
+      String segmentsFileName = indexCommit.getSegmentsFileName();
+      SegmentInfos segmentInfos = new SegmentInfos();
+      segmentInfos.read(hdfsDirectory, segmentsFileName);
+      for (SegmentInfoPerCommit commit : segmentInfos) {
+        SegmentInfo segmentInfo = commit.info;
+        if (commit.getDelCount() == segmentInfo.getDocCount()) {
+          LOG.info("Segment [" + segmentInfo.name + "] in dir [" + shardDir + "] has all
records deleted.");
+        } else {
+          String name = segmentInfo.name;
+          Set<String> files = segmentInfo.files();
+          long fileLength = 0;
+          for (String file : files) {
+            fileLength += hdfsDirectory.fileLength(file);
+          }
+          splits.add(new BlurInputSplit(shardDir, segmentsFileName, name, fileLength, table));
+        }
+      }
+      return splits;
+    } finally {
+      hdfsDirectory.close();
+    }
+  }
+
+  private IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation,
Path shardDir) throws IOException {
+    for (IndexCommit commit : listCommits) {
+      if (commit.getGeneration() == generation) {
+        return commit;
+      }
+    }
+    throw new IOException("Generation [" + generation + "] not found in shard [" + shardDir
+ "]");
+  }
+
+  @Override
+  public RecordReader<Text, TableBlurRecord> createRecordReader(InputSplit split, TaskAttemptContext
context)
+      throws IOException, InterruptedException {
+    BlurRecordReader blurRecordReader = new BlurRecordReader();
+    blurRecordReader.initialize(split, context);
+    return blurRecordReader;
+  }
+
+  public static class BlurRecordReader extends RecordReader<Text, TableBlurRecord>
{
+
+    private boolean _setup;
+    private Text _rowId;
+    private TableBlurRecord _tableBlurRecord;
+    private Bits _liveDocs;
+    private StoredFieldsReader _fieldsReader;
+    private HdfsDirectory _directory;
+
+    private int _docId = -1;
+    private int _maxDoc;
+    private Text _table;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
+      if (_setup) {
+        return;
+      }
+      _setup = true;
+
+      Configuration configuration = context.getConfiguration();
+      BlurInputSplit blurInputSplit = (BlurInputSplit) split;
+
+      _table = blurInputSplit.getTable();
+
+      _directory = new HdfsDirectory(configuration, blurInputSplit.getDir());
+      SegmentInfos segmentInfos = new SegmentInfos();
+      segmentInfos.read(_directory, blurInputSplit.getSegmentsName());
+      SegmentInfoPerCommit commit = findSegmentInfoPerCommit(segmentInfos, blurInputSplit);
+
+      Blur024Codec blur024Codec = new Blur024Codec();
+      IOContext iocontext = IOContext.READ;
+      SegmentInfo segmentInfo = commit.info;
+      String segmentName = segmentInfo.name;
+      FieldInfos fieldInfos = blur024Codec.fieldInfosFormat().getFieldInfosReader()
+          .read(_directory, segmentName, iocontext);
+      if (commit.getDelCount() > 0) {
+        _liveDocs = blur024Codec.liveDocsFormat().readLiveDocs(_directory, commit, iocontext);
+      }
+      _fieldsReader = blur024Codec.storedFieldsFormat().fieldsReader(_directory, segmentInfo,
fieldInfos, iocontext);
+
+      _maxDoc = commit.info.getDocCount();
+    }
+
+    private SegmentInfoPerCommit findSegmentInfoPerCommit(SegmentInfos segmentInfos, BlurInputSplit
blurInputSplit)
+        throws IOException {
+      String segmentInfoName = blurInputSplit.getSegmentInfoName();
+      for (SegmentInfoPerCommit commit : segmentInfos) {
+        if (commit.info.name.equals(segmentInfoName)) {
+          return commit;
+        }
+      }
+      throw new IOException("SegmentInfoPerCommit of [" + segmentInfoName + "] not found.");
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (_docId >= _maxDoc) {
+        return false;
+      }
+      while (true) {
+        _docId++;
+        if (_docId >= _maxDoc) {
+          return false;
+        }
+        if (_liveDocs == null) {
+          fetchBlurRecord();
+          return true;
+        } else if (_liveDocs.get(_docId)) {
+          fetchBlurRecord();
+          return true;
+        }
+      }
+    }
+
+    private void fetchBlurRecord() throws IOException {
+      DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor();
+      _fieldsReader.visitDocument(_docId, visitor);
+      BlurRecord blurRecord = new BlurRecord();
+      String rowId = RowDocumentUtil.readRecord(visitor.getDocument(), blurRecord);
+      blurRecord.setRowId(rowId);
+      _rowId = new Text(rowId);
+      _tableBlurRecord = new TableBlurRecord(_table, blurRecord);
+    }
+
+    @Override
+    public Text getCurrentKey() throws IOException, InterruptedException {
+      return _rowId;
+    }
+
+    @Override
+    public TableBlurRecord getCurrentValue() throws IOException, InterruptedException {
+      return _tableBlurRecord;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return (float) _docId / (float) _maxDoc;
+    }
+
+    @Override
+    public void close() throws IOException {
+      _fieldsReader.close();
+      _directory.close();
+    }
+
+  }
+
+  public static class BlurInputSplit extends InputSplit implements Writable {
+
+    private static final String UTF_8 = "UTF-8";
+    private long _fileLength;
+    private String _segmentsName;
+    private Path _dir;
+    private String _segmentInfoName;
+    private Text _table = new Text();
+
+    public BlurInputSplit() {
+
+    }
+
+    public BlurInputSplit(Path dir, String segmentsName, String segmentInfoName, long fileLength,
Text table) {
+      _fileLength = fileLength;
+      _segmentsName = segmentsName;
+      _segmentInfoName = segmentInfoName;
+      _table = table;
+      _dir = dir;
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return _fileLength;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      // @TODO create locations for fdt file
+      return new String[] {};
+    }
+
+    public String getSegmentInfoName() {
+      return _segmentInfoName;
+    }
+
+    public String getSegmentsName() {
+      return _segmentsName;
+    }
+
+    public Path getDir() {
+      return _dir;
+    }
+
+    public Text getTable() {
+      return _table;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      writeString(out, _dir.toString());
+      writeString(out, _segmentsName);
+      writeString(out, _segmentInfoName);
+      _table.write(out);
+      out.writeLong(_fileLength);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      _dir = new Path(readString(in));
+      _segmentsName = readString(in);
+      _segmentInfoName = readString(in);
+      _table.readFields(in);
+      _fileLength = in.readLong();
+    }
+
+    private void writeString(DataOutput out, String s) throws IOException {
+      byte[] bs = s.getBytes(UTF_8);
+      out.writeInt(bs.length);
+      out.write(bs);
+    }
+
+    private String readString(DataInput in) throws IOException {
+      int length = in.readInt();
+      byte[] buf = new byte[length];
+      in.readFully(buf);
+      return new String(buf, UTF_8);
+    }
+
+  }
+
+  public static void addTable(Job job, TableDescriptor tableDescriptor, String snapshot)
+      throws IllegalArgumentException, IOException {
+    String tableName = tableDescriptor.getName();
+    Path path = new Path(tableDescriptor.getTableUri());
+    FileInputFormat.addInputPath(job, path);
+    putPathToTable(job.getConfiguration(), tableName, path);
+    putSnapshotForTable(job.getConfiguration(), tableName, snapshot);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bca39998/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
index 7c12a76..1236f7f 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/BlurRecord.java
@@ -33,6 +33,17 @@ public class BlurRecord implements Writable, ReaderBlurRecord {
 
   private List<BlurColumn> _columns = new ArrayList<BlurColumn>();
 
+  public BlurRecord() {
+
+  }
+
+  public BlurRecord(BlurRecord blurRecord) {
+    _rowId = blurRecord._rowId;
+    _recordId = blurRecord._recordId;
+    _family = blurRecord._family;
+    _columns = new ArrayList<BlurColumn>(blurRecord._columns);
+  }
+
   @Override
   public void readFields(DataInput in) throws IOException {
     _rowId = IOUtil.readString(in);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bca39998/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/TableBlurRecord.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/TableBlurRecord.java
b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/TableBlurRecord.java
new file mode 100644
index 0000000..01ff34b
--- /dev/null
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/TableBlurRecord.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class TableBlurRecord implements Writable {
+
+  private Text _table = new Text();
+  private BlurRecord _blurRecord = new BlurRecord();
+
+  public TableBlurRecord() {
+
+  }
+
+  public TableBlurRecord(String table, BlurRecord blurRecord) {
+    this(new Text(table), blurRecord);
+  }
+
+  public TableBlurRecord(Text table, BlurRecord blurRecord) {
+    _table = table;
+    _blurRecord = blurRecord;
+  }
+
+  public TableBlurRecord(TableBlurRecord tableBlurRecord) {
+    _table = new Text(tableBlurRecord.getTable());
+    _blurRecord = new BlurRecord(tableBlurRecord.getBlurRecord());
+  }
+
+  public Text getTable() {
+    return _table;
+  }
+
+  public void setTable(Text table) {
+    _table = table;
+  }
+
+  public BlurRecord getBlurRecord() {
+    return _blurRecord;
+  }
+
+  public void setBlurRecord(BlurRecord blurRecord) {
+    _blurRecord = blurRecord;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    _table.write(out);
+    _blurRecord.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _table.readFields(in);
+    _blurRecord.readFields(in);
+  }
+
+  @Override
+  public String toString() {
+    return "TableBlurRecord [_table=" + _table + ", _blurRecord=" + _blurRecord + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((_blurRecord == null) ? 0 : _blurRecord.hashCode());
+    result = prime * result + ((_table == null) ? 0 : _table.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    TableBlurRecord other = (TableBlurRecord) obj;
+    if (_blurRecord == null) {
+      if (other._blurRecord != null)
+        return false;
+    } else if (!_blurRecord.equals(other._blurRecord))
+      return false;
+    if (_table == null) {
+      if (other._table != null)
+        return false;
+    } else if (!_table.equals(other._table))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/bca39998/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
new file mode 100644
index 0000000..71cc571
--- /dev/null
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/BlurInputFormatTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.mapreduce.lib;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.blur.MiniCluster;
+import org.apache.blur.store.buffer.BufferStore;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
+import org.apache.blur.thrift.BlurClient;
+import org.apache.blur.thrift.generated.Blur.Iface;
+import org.apache.blur.thrift.generated.BlurException;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.ColumnDefinition;
+import org.apache.blur.thrift.generated.Record;
+import org.apache.blur.thrift.generated.RecordMutation;
+import org.apache.blur.thrift.generated.RecordMutationType;
+import org.apache.blur.thrift.generated.RowMutation;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BlurInputFormatTest {
+
+  private static Configuration conf = new Configuration();
+  private static MiniCluster miniCluster;
+
+  @BeforeClass
+  public static void setupTest() throws Exception {
+    setupJavaHome();
+    File file = new File("./target/tmp/BlurInputFormatTest_tmp");
+    String pathStr = file.getAbsoluteFile().toURI().toString();
+    System.setProperty("test.build.data", pathStr + "/data");
+    System.setProperty("hadoop.log.dir", pathStr + "/hadoop_log");
+    miniCluster = new MiniCluster();
+    miniCluster.startBlurCluster(pathStr + "/blur", 2, 2);
+    miniCluster.startMrMiniCluster();
+    conf = miniCluster.getMRConfiguration();
+
+    BufferStore.initNewBuffer(128, 128 * 128);
+  }
+
+  public static void setupJavaHome() {
+    String str = System.getenv("JAVA_HOME");
+    if (str == null) {
+      String property = System.getProperty("java.home");
+      if (property != null) {
+        throw new RuntimeException("JAVA_HOME not set should probably be [" + property +
"].");
+      }
+      throw new RuntimeException("JAVA_HOME not set.");
+    }
+  }
+
+  @AfterClass
+  public static void teardown() throws IOException {
+    if (miniCluster != null) {
+      miniCluster.stopMrMiniCluster();
+    }
+    rm(new File("build"));
+  }
+
+  private static void rm(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rm(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Test
+  public void testBlurInputFormat() throws IOException, BlurException, TException, ClassNotFoundException,
+      InterruptedException {
+    String tableName = "testBlurInputFormat";
+
+    FileSystem fileSystem = miniCluster.getFileSystem();
+    Path root = new Path(fileSystem.getUri() + "/");
+
+    creatTable(tableName, new Path(root, "tables"), true);
+    loadTable(tableName, 100, 100);
+
+    Iface client = getClient();
+
+    TableDescriptor tableDescriptor = client.describe(tableName);
+
+    Job job = Job.getInstance(conf, "Read Data");
+    job.setJarByClass(BlurInputFormatTest.class);
+    job.setMapperClass(TestMapper.class);
+    job.setInputFormatClass(BlurInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(TableBlurRecord.class);
+
+    Path output = new Path(root, "output");
+
+    String snapshot = UUID.randomUUID().toString();
+    client.createSnapshot(tableName, snapshot);
+
+    BlurInputFormat.addTable(job, tableDescriptor, snapshot);
+    FileOutputFormat.setOutputPath(job, output);
+
+    try {
+      assertTrue(job.waitForCompletion(true));
+    } finally {
+      client.removeSnapshot(tableName, snapshot);
+    }
+
+    final Map<Text, TableBlurRecord> results = new TreeMap<Text, TableBlurRecord>();
+    walkOutput(output, conf, new ResultReader() {
+      @Override
+      public void read(Text rowId, TableBlurRecord tableBlurRecord) {
+        results.put(new Text(rowId), new TableBlurRecord(tableBlurRecord));
+      }
+    });
+    int rowId = 100;
+    for (Entry<Text, TableBlurRecord> e : results.entrySet()) {
+      Text r = e.getKey();
+      assertEquals(new Text("row-" + rowId), r);
+      BlurRecord blurRecord = new BlurRecord();
+      blurRecord.setRowId("row-" + rowId);
+      blurRecord.setRecordId("record-" + rowId);
+      blurRecord.setFamily("fam0");
+      blurRecord.addColumn("col0", "value-" + rowId);
+      TableBlurRecord tableBlurRecord = new TableBlurRecord(new Text(tableName), blurRecord);
+      assertEquals(tableBlurRecord, e.getValue());
+
+      rowId++;
+    }
+    assertEquals(200, rowId);
+
+  }
+
+  public interface ResultReader {
+
+    void read(Text rowId, TableBlurRecord tableBlurRecord);
+
+  }
+
+  @SuppressWarnings("deprecation")
+  private void walkOutput(Path output, Configuration conf, ResultReader resultReader) throws
IOException {
+    FileSystem fileSystem = output.getFileSystem(conf);
+    FileStatus fileStatus = fileSystem.getFileStatus(output);
+    if (fileStatus.isDir()) {
+      FileStatus[] listStatus = fileSystem.listStatus(output, new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          return !path.getName().startsWith("_");
+        }
+      });
+      for (FileStatus fs : listStatus) {
+        walkOutput(fs.getPath(), conf, resultReader);
+      }
+    } else {
+      Reader reader = new SequenceFile.Reader(fileSystem, output, conf);
+      Text rowId = new Text();
+      TableBlurRecord tableBlurRecord = new TableBlurRecord();
+      while (reader.next(rowId, tableBlurRecord)) {
+        resultReader.read(rowId, tableBlurRecord);
+      }
+      reader.close();
+    }
+  }
+
+  private Iface getClient() {
+    return BlurClient.getClientFromZooKeeperConnectionStr(miniCluster.getZkConnectionString());
+  }
+
+  private void loadTable(String tableName, int startId, int numb) throws BlurException, TException
{
+    Iface client = getClient();
+    List<RowMutation> batch = new ArrayList<RowMutation>();
+    for (int i = 0; i < numb; i++) {
+      int id = startId + i;
+      RowMutation rowMutation = new RowMutation();
+      rowMutation.setTable(tableName);
+      rowMutation.setRowId("row-" + Integer.toString(id));
+      Record record = new Record();
+      record.setFamily("fam0");
+      record.setRecordId("record-" + id);
+      record.addToColumns(new Column("col0", "value-" + id));
+      rowMutation.addToRecordMutations(new RecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD,
record));
+      batch.add(rowMutation);
+    }
+    client.mutateBatch(batch);
+  }
+
+  private void creatTable(String tableName, Path tables, boolean fastDisable) throws BlurException,
TException {
+    Path tablePath = new Path(tables, tableName);
+    Iface client = getClient();
+    TableDescriptor tableDescriptor = new TableDescriptor();
+    tableDescriptor.setTableUri(tablePath.toString());
+    tableDescriptor.setName(tableName);
+    tableDescriptor.setShardCount(2);
+    tableDescriptor.putToTableProperties(BlurConstants.BLUR_TABLE_DISABLE_FAST_DIR, Boolean.toString(fastDisable));
+    client.createTable(tableDescriptor);
+
+    ColumnDefinition colDef = new ColumnDefinition();
+    colDef.setFamily("fam0");
+    colDef.setColumnName("col0");
+    colDef.setFieldType("string");
+    client.addColumnDefinition(tableName, colDef);
+  }
+
+  public static class TestMapper extends Mapper<Text, TableBlurRecord, Text, TableBlurRecord>
{
+    @Override
+    protected void map(Text key, TableBlurRecord value, Context context) throws IOException,
InterruptedException {
+      context.write(key, value);
+    }
+  }
+
+}


Mime
View raw message