incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [24/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleIndexerRebuild.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleIndexerRebuild.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleIndexerRebuild.java
deleted file mode 100644
index 177683f..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleIndexerRebuild.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.nearinfinity.blur.mapreduce.example;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-
-import com.nearinfinity.blur.mapreduce.BlurTask;
-import com.nearinfinity.blur.mapreduce.BlurTask.INDEXING_TYPE;
-import com.nearinfinity.blur.thrift.generated.AnalyzerDefinition;
-import com.nearinfinity.blur.thrift.generated.ColumnDefinition;
-import com.nearinfinity.blur.thrift.generated.TableDescriptor;
-
-public class BlurExampleIndexerRebuild {
-
-  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-    Configuration configuration = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-    if (otherArgs.length != 2) {
-      System.err.println("Usage: blurindexer <in> <out>");
-      System.exit(2);
-    }
-
-    AnalyzerDefinition ad = new AnalyzerDefinition();
-    ad.defaultDefinition = new ColumnDefinition(StandardAnalyzer.class.getName(), true, null);
-
-    TableDescriptor descriptor = new TableDescriptor();
-    descriptor.analyzerDefinition = ad;
-    descriptor.compressionBlockSize = 32768;
-    descriptor.compressionClass = DefaultCodec.class.getName();
-    descriptor.isEnabled = true;
-    descriptor.name = "test-table";
-    descriptor.shardCount = 1;
-    descriptor.cluster = "default";
-    descriptor.tableUri = "./blur-testing";
-
-    BlurTask blurTask = new BlurTask();
-    blurTask.setTableDescriptor(descriptor);
-    blurTask.setIndexingType(INDEXING_TYPE.REBUILD);
-    blurTask.setOptimize(false);
-    Job job = blurTask.configureJob(configuration);
-    job.setJarByClass(BlurExampleIndexerRebuild.class);
-    job.setMapperClass(BlurExampleMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-    job.setOutputFormatClass(TextOutputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
-    long s = System.currentTimeMillis();
-    boolean waitForCompletion = job.waitForCompletion(true);
-    long e = System.currentTimeMillis();
-    System.out.println("Completed in [" + (e - s) + " ms]");
-    System.exit(waitForCompletion ? 0 : 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleIndexerUpdate.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleIndexerUpdate.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleIndexerUpdate.java
deleted file mode 100644
index feb89db..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleIndexerUpdate.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package com.nearinfinity.blur.mapreduce.example;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-
-import com.nearinfinity.blur.manager.clusterstatus.ZookeeperClusterStatus;
-import com.nearinfinity.blur.mapreduce.BlurTask;
-import com.nearinfinity.blur.mapreduce.BlurTask.INDEXING_TYPE;
-import com.nearinfinity.blur.thrift.generated.AnalyzerDefinition;
-import com.nearinfinity.blur.thrift.generated.ColumnDefinition;
-import com.nearinfinity.blur.thrift.generated.TableDescriptor;
-
-public class BlurExampleIndexerUpdate {
-
-  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-    Configuration configuration = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-    if (otherArgs.length != 2) {
-      System.err.println("Usage: blurindexer <in> <out>");
-      System.exit(2);
-    }
-
-    AnalyzerDefinition ad = new AnalyzerDefinition();
-    ad.defaultDefinition = new ColumnDefinition(StandardAnalyzer.class.getName(), true, null);
-
-    ZookeeperClusterStatus status = new ZookeeperClusterStatus("localhost");
-    TableDescriptor descriptor = status.getTableDescriptor(false, "default", "test-table");
-
-    BlurTask blurTask = new BlurTask();
-    blurTask.setTableDescriptor(descriptor);
-    blurTask.setIndexingType(INDEXING_TYPE.UPDATE);
-    Job job = blurTask.configureJob(configuration);
-    job.setJarByClass(BlurExampleIndexerUpdate.class);
-    job.setMapperClass(BlurExampleMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-    job.setOutputFormatClass(TextOutputFormat.class);
-
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
-    boolean waitForCompletion = job.waitForCompletion(true);
-    System.exit(waitForCompletion ? 0 : 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleMapper.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleMapper.java
deleted file mode 100644
index aef55cb..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/example/BlurExampleMapper.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package com.nearinfinity.blur.mapreduce.example;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-import com.nearinfinity.blur.mapreduce.BlurMapper;
-import com.nearinfinity.blur.mapreduce.BlurRecord;
-import com.nearinfinity.blur.mapreduce.BlurMutate.MUTATE_TYPE;
-
-public class BlurExampleMapper extends BlurMapper<LongWritable, Text> {
-
-  @Override
-  protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException {
-    BlurRecord record = _mutate.getRecord();
-    record.clearColumns();
-    String str = value.toString();
-    String[] split = str.split("\\t");
-    record.setRowId(UUID.randomUUID().toString());
-    record.setRecordId(UUID.randomUUID().toString());
-    record.setFamily("cf1");
-    for (int i = 0; i < split.length; i++) {
-      record.addColumn("c" + i, split[i]);
-      _fieldCounter.increment(1);
-    }
-    byte[] bs = record.getRowId().getBytes();
-    _key.set(bs, 0, bs.length);
-    _mutate.setMutateType(MUTATE_TYPE.ADD);
-    context.write(_key, _mutate);
-    _recordCounter.increment(1);
-    context.progress();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurInputFormat.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurInputFormat.java
deleted file mode 100644
index f8042f3..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurInputFormat.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package com.nearinfinity.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexReader;
-
-import com.nearinfinity.blur.mapreduce.BlurRecord;
-import com.nearinfinity.blur.store.hdfs.HdfsDirectory;
-
-public class BlurInputFormat extends InputFormat<Text, BlurRecord> {
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
-    List<?> splits = new ArrayList<Object>();
-    Path[] paths = FileInputFormat.getInputPaths(context);
-    for (Path path : paths) {
-      findAllSegments(context.getConfiguration(), path, splits);
-    }
-    return (List<InputSplit>) splits;
-  }
-
-  @Override
-  public RecordReader<Text, BlurRecord> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-    BlurRecordReader blurRecordReader = new BlurRecordReader();
-    blurRecordReader.initialize(split, context);
-    return blurRecordReader;
-  }
-
-  public static void findAllSegments(Configuration configuration, Path path, List<?> splits) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(configuration);
-    if (fileSystem.isFile(path)) {
-      return;
-    } else {
-      FileStatus[] listStatus = fileSystem.listStatus(path);
-      for (FileStatus status : listStatus) {
-        Path p = status.getPath();
-        HdfsDirectory directory = new HdfsDirectory(p);
-        if (IndexReader.indexExists(directory)) {
-          addSplits(directory, splits);
-        } else {
-          findAllSegments(configuration, p, splits);
-        }
-      }
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public static void addSplits(HdfsDirectory directory, @SuppressWarnings("rawtypes") List splits) throws IOException {
-    IndexCommit commit = Utils.findLatest(directory);
-    List<String> segments = Utils.getSegments(directory, commit);
-    for (String segment : segments) {
-      BlurInputSplit split = new BlurInputSplit(directory.getHdfsDirPath(), segment, 0, Integer.MAX_VALUE);
-      splits.add(split);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurInputSplit.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurInputSplit.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurInputSplit.java
deleted file mode 100644
index 30303e3..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurInputSplit.java
+++ /dev/null
@@ -1,157 +0,0 @@
-package com.nearinfinity.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-public class BlurInputSplit extends InputSplit implements Writable, org.apache.hadoop.mapred.InputSplit {
-
-  private int _endingDocId;
-  private int _startingDocId;
-  private String _segmentName;
-  private Path _path;
-
-  public BlurInputSplit() {
-
-  }
-
-  public BlurInputSplit(Path path, String segmentName, int startingDocId, int endingDocId) {
-    _endingDocId = endingDocId;
-    _startingDocId = startingDocId;
-    _segmentName = segmentName;
-    _path = path;
-  }
-
-  @Override
-  public long getLength() {
-    return _endingDocId - _startingDocId;
-  }
-
-  @Override
-  public String[] getLocations() {
-    return new String[] {};
-  }
-
-  public Path getIndexPath() {
-    return _path;
-  }
-
-  public String getSegmentName() {
-    return _segmentName;
-  }
-
-  public int getStartingDocId() {
-    return _startingDocId;
-  }
-
-  public int getEndingDocId() {
-    return _endingDocId;
-  }
-
-  public void setEndingDocId(int endingDocId) {
-    _endingDocId = endingDocId;
-  }
-
-  public void setStartingDocId(int startingDocId) {
-    _startingDocId = startingDocId;
-  }
-
-  public void setSegmentName(String segmentName) {
-    _segmentName = segmentName;
-  }
-
-  public void setPath(Path path) {
-    _path = path;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(_startingDocId);
-    out.writeInt(_endingDocId);
-    writeString(out, _segmentName);
-    writeString(out, _path.toUri().toString());
-  }
-
-  private void writeString(DataOutput out, String s) throws IOException {
-    byte[] bs = s.getBytes();
-    out.writeInt(bs.length);
-    out.write(bs);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    _startingDocId = in.readInt();
-    _endingDocId = in.readInt();
-    _segmentName = readString(in);
-    _path = new Path(readString(in));
-  }
-
-  private String readString(DataInput in) throws IOException {
-    int length = in.readInt();
-    byte[] buf = new byte[length];
-    in.readFully(buf);
-    return new String(buf);
-  }
-
-  @Override
-  public String toString() {
-    return "path=" + _path + ", segmentName=" + _segmentName + ", startingDocId=" + _startingDocId + ", endingDocId=" + _endingDocId;
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + _endingDocId;
-    result = prime * result + ((_path == null) ? 0 : _path.hashCode());
-    result = prime * result + ((_segmentName == null) ? 0 : _segmentName.hashCode());
-    result = prime * result + _startingDocId;
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    BlurInputSplit other = (BlurInputSplit) obj;
-    if (_endingDocId != other._endingDocId)
-      return false;
-    if (_path == null) {
-      if (other._path != null)
-        return false;
-    } else if (!_path.equals(other._path))
-      return false;
-    if (_segmentName == null) {
-      if (other._segmentName != null)
-        return false;
-    } else if (!_segmentName.equals(other._segmentName))
-      return false;
-    if (_startingDocId != other._startingDocId)
-      return false;
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurOutputCommitter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurOutputCommitter.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurOutputCommitter.java
deleted file mode 100644
index b9b2ed2..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurOutputCommitter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package com.nearinfinity.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-public class BlurOutputCommitter extends OutputCommitter {
-
-  public BlurOutputCommitter(TaskAttemptContext context) {
-
-  }
-
-  @Override
-  public void setupJob(JobContext jobContext) throws IOException {
-
-  }
-
-  @Override
-  public void setupTask(TaskAttemptContext taskContext) throws IOException {
-
-  }
-
-  @Override
-  public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
-    return false;
-  }
-
-  @Override
-  public void commitTask(TaskAttemptContext taskContext) throws IOException {
-
-  }
-
-  @Override
-  public void abortTask(TaskAttemptContext taskContext) throws IOException {
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurOutputFormat.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurOutputFormat.java
deleted file mode 100644
index c6855fc..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurOutputFormat.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.nearinfinity.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import com.nearinfinity.blur.mapreduce.BlurRecord;
-
-public class BlurOutputFormat extends OutputFormat<Text, BlurRecord> {
-
-  @Override
-  public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
-
-  }
-
-  @Override
-  public RecordWriter<Text, BlurRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-    return new BlurRecordWriter(context);
-  }
-
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-    return new BlurOutputCommitter(context);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordReader.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordReader.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordReader.java
deleted file mode 100644
index da4cba3..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordReader.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package com.nearinfinity.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.store.Directory;
-
-import com.nearinfinity.blur.mapreduce.BlurRecord;
-import com.nearinfinity.blur.store.hdfs.HdfsDirectory;
-import com.nearinfinity.blur.utils.RowDocumentUtil;
-
-public class BlurRecordReader extends RecordReader<Text, BlurRecord> {
-
-  private IndexReader reader;
-  private Directory directory;
-  private int startingDocId;
-  private int endingDocId;
-  private int position;
-  private Text rowid = new Text();
-  private BlurRecord record = new BlurRecord();
-
-  @Override
-  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-    BlurInputSplit blurSplit = (BlurInputSplit) split;
-    Path path = blurSplit.getIndexPath();
-    String segmentName = blurSplit.getSegmentName();
-    startingDocId = blurSplit.getStartingDocId();
-    endingDocId = blurSplit.getEndingDocId();
-    directory = new HdfsDirectory(path);
-
-    IndexCommit commit = Utils.findLatest(directory);
-    reader = Utils.openSegmentReader(directory, commit, segmentName, Utils.getTermInfosIndexDivisor(context.getConfiguration()));
-    int maxDoc = reader.maxDoc();
-    if (endingDocId >= maxDoc) {
-      endingDocId = maxDoc - 1;
-    }
-    position = startingDocId - 1;
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    do {
-      position++;
-      if (position > endingDocId) {
-        return false;
-      }
-    } while (reader.isDeleted(position));
-    readDocument();
-    return true;
-  }
-
-  private void readDocument() throws CorruptIndexException, IOException {
-    Document document = reader.document(position);
-    record.reset();
-    rowid.set(RowDocumentUtil.readRecord(document, record));
-  }
-
-  @Override
-  public Text getCurrentKey() throws IOException, InterruptedException {
-    return rowid;
-  }
-
-  @Override
-  public BlurRecord getCurrentValue() throws IOException, InterruptedException {
-    return record;
-  }
-
-  @Override
-  public float getProgress() throws IOException, InterruptedException {
-    int total = endingDocId - startingDocId;
-    return (float) position / (float) total;
-  }
-
-  @Override
-  public void close() throws IOException {
-    reader.close();
-    directory.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordWriter.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordWriter.java
deleted file mode 100644
index a7997c6..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/BlurRecordWriter.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package com.nearinfinity.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.KeywordAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Index;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.NoLockFactory;
-import org.apache.lucene.util.Version;
-
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-import com.nearinfinity.blur.mapreduce.BlurColumn;
-import com.nearinfinity.blur.mapreduce.BlurRecord;
-import com.nearinfinity.blur.store.hdfs.HdfsDirectory;
-import com.nearinfinity.blur.utils.BlurConstants;
-import com.nearinfinity.blur.utils.BlurUtil;
-
-public class BlurRecordWriter extends RecordWriter<Text, BlurRecord> {
-
-  private static Log LOG = LogFactory.getLog(BlurRecordWriter.class);
-
-  private Text prevKey = new Text();
-  private List<Document> documents = new ArrayList<Document>();
-  private IndexWriter writer;
-
-  public BlurRecordWriter(TaskAttemptContext context) throws IOException {
-    Configuration configuration = context.getConfiguration();
-    String outputPath = configuration.get("mapred.output.dir");
-    int id = context.getTaskAttemptID().getTaskID().getId();
-    String shardName = BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, id);
-    Path basePath = new Path(outputPath);
-    Path indexPath = new Path(basePath, shardName);
-
-    // @TODO
-    Analyzer analyzer = new KeywordAnalyzer();
-
-    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, analyzer);
-
-    // @TODO setup compressed directory, read compression codec from config,
-    // setup progressable dir, setup lock factory
-    Directory dir = new HdfsDirectory(indexPath);
-    dir.setLockFactory(NoLockFactory.getNoLockFactory());
-    writer = new IndexWriter(dir, conf);
-  }
-
-  @Override
-  public void write(Text key, BlurRecord value) throws IOException, InterruptedException {
-    if (!prevKey.equals(key)) {
-      flush();
-      prevKey.set(key);
-    }
-    add(value);
-  }
-
-  private void add(BlurRecord value) {
-    List<BlurColumn> columns = value.getColumns();
-    String family = value.getFamily();
-    Document document = new Document();
-    document.add(new Field(BlurConstants.ROW_ID, value.getRowId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
-    document.add(new Field(BlurConstants.RECORD_ID, value.getRecordId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
-    for (BlurColumn column : columns) {
-      document.add(convert(family, column));
-    }
-    documents.add(document);
-    LOG.error("Needs to use blur analyzer and field converter");
-  }
-
-  private Field convert(String family, BlurColumn column) {
-    return new Field(family + "." + column.getName(), column.getValue(), Store.YES, Index.ANALYZED_NO_NORMS);
-  }
-
-  private void flush() throws CorruptIndexException, IOException {
-    if (documents.isEmpty()) {
-      return;
-    }
-    writer.addDocuments(documents);
-    documents.clear();
-  }
-
-  @Override
-  public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-    flush();
-    writer.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/Utils.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/Utils.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/Utils.java
deleted file mode 100644
index 492a126..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/lib/Utils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.nearinfinity.blur.mapreduce.lib;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.SegmentInfo;
-import org.apache.lucene.index.SegmentInfos;
-import org.apache.lucene.index.SegmentReader;
-import org.apache.lucene.store.Directory;
-
-public class Utils {
-
-  public static int getTermInfosIndexDivisor(Configuration conf) {
-    return 128;
-  }
-
-  public static IndexCommit findLatest(Directory dir) throws IOException {
-    Collection<IndexCommit> listCommits = IndexReader.listCommits(dir);
-    if (listCommits.size() == 1) {
-      return listCommits.iterator().next();
-    }
-    throw new RuntimeException("Multiple commit points not supported yet.");
-  }
-
-  public static List<String> getSegments(Directory dir, IndexCommit commit) throws CorruptIndexException, IOException {
-    SegmentInfos infos = new SegmentInfos();
-    infos.read(dir, commit.getSegmentsFileName());
-    List<String> result = new ArrayList<String>();
-    for (SegmentInfo info : infos) {
-      result.add(info.name);
-    }
-    return result;
-  }
-
-  public static IndexReader openSegmentReader(Directory directory, IndexCommit commit, String segmentName, int termInfosIndexDivisor) throws CorruptIndexException, IOException {
-    SegmentInfos infos = new SegmentInfos();
-    infos.read(directory, commit.getSegmentsFileName());
-    SegmentInfo segmentInfo = null;
-    for (SegmentInfo info : infos) {
-      if (segmentName.equals(info.name)) {
-        segmentInfo = info;
-        break;
-      }
-    }
-    if (segmentInfo == null) {
-      throw new RuntimeException("SegmentInfo for [" + segmentName + "] not found in directory [" + directory + "] for commit [" + commit + "]");
-    }
-    return SegmentReader.get(true, segmentInfo, termInfosIndexDivisor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurInputFormat.java b/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurInputFormat.java
new file mode 100644
index 0000000..18506cc
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurInputFormat.java
@@ -0,0 +1,53 @@
+package org.apache.blur.mapred;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+
+public class BlurInputFormat implements InputFormat<Text, BlurRecord> {
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    List<?> splits = new ArrayList<Object>();
+    Path[] paths = FileInputFormat.getInputPaths(job);
+    for (Path path : paths) {
+      org.apache.blur.mapreduce.lib.BlurInputFormat.findAllSegments((Configuration) job, path, splits);
+    }
+    return splits.toArray(new InputSplit[] {});
+  }
+
+  @Override
+  public RecordReader<Text, BlurRecord> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    reporter.setStatus(split.toString());
+    return new BlurRecordReader(split, job);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurRecordReader.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurRecordReader.java b/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurRecordReader.java
new file mode 100644
index 0000000..54ab166
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapred/BlurRecordReader.java
@@ -0,0 +1,108 @@
+package org.apache.blur.mapred;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.blur.mapreduce.BlurRecord;
+import org.apache.blur.mapreduce.lib.BlurInputSplit;
+import org.apache.blur.mapreduce.lib.Utils;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.utils.RowDocumentUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.store.Directory;
+
+
+public class BlurRecordReader implements RecordReader<Text, BlurRecord> {
+
+  private IndexReader reader;
+  private Directory directory;
+  private int startingDocId;
+  private int endingDocId;
+  private int position;
+
+  public BlurRecordReader(InputSplit split, JobConf job) throws IOException {
+    BlurInputSplit blurSplit = (BlurInputSplit) split;
+    Path path = blurSplit.getIndexPath();
+    String segmentName = blurSplit.getSegmentName();
+    startingDocId = blurSplit.getStartingDocId();
+    endingDocId = blurSplit.getEndingDocId();
+    directory = new HdfsDirectory(path);
+
+    IndexCommit commit = Utils.findLatest(directory);
+    reader = Utils.openSegmentReader(directory, commit, segmentName, Utils.getTermInfosIndexDivisor(job));
+    int maxDoc = reader.maxDoc();
+    if (endingDocId >= maxDoc) {
+      endingDocId = maxDoc - 1;
+    }
+    position = startingDocId - 1;
+  }
+
+  @Override
+  public boolean next(Text key, BlurRecord value) throws IOException {
+    do {
+      position++;
+      if (position > endingDocId) {
+        return false;
+      }
+    } while (reader.isDeleted(position));
+    readDocument(key, value);
+    return true;
+  }
+
+  private void readDocument(Text rowid, BlurRecord record) throws CorruptIndexException, IOException {
+    Document document = reader.document(position);
+    record.reset();
+    rowid.set(RowDocumentUtil.readRecord(document, record));
+  }
+
+  @Override
+  public Text createKey() {
+    return new Text();
+  }
+
+  @Override
+  public BlurRecord createValue() {
+    return new BlurRecord();
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return position;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+    directory.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    int total = endingDocId - startingDocId;
+    return (float) position / (float) total;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurColumn.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurColumn.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurColumn.java
new file mode 100644
index 0000000..2ddc11f
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurColumn.java
@@ -0,0 +1,77 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class BlurColumn implements Writable {
+
+  private String name;
+  private String value;
+
+  public BlurColumn() {
+  }
+
+  public BlurColumn(String name, String value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  public boolean hasNull() {
+    if (name == null || value == null) {
+      return true;
+    }
+    return false;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    name = IOUtil.readString(in);
+    value = IOUtil.readString(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeString(out, name);
+    IOUtil.writeString(out, value);
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public void setValue(String value) {
+    this.value = value;
+  }
+
+  @Override
+  public String toString() {
+    return "{name=" + name + ", value=" + value + "}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java
new file mode 100644
index 0000000..4e301eb
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMapper.java
@@ -0,0 +1,58 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public abstract class BlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, BytesWritable, BlurMutate> {
+
+  protected BlurMutate _mutate;
+  protected BytesWritable _key;
+  protected BlurTask _blurTask;
+  protected Counter _recordCounter;
+  protected Counter _fieldCounter;
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    setup(context);
+    long maxRecordCount = _blurTask.getMaxRecordCount();
+    if (maxRecordCount == -1) {
+      maxRecordCount = Long.MAX_VALUE;
+    }
+    for (long l = 0; l < maxRecordCount && context.nextKeyValue(); l++) {
+      map(context.getCurrentKey(), context.getCurrentValue(), context);
+    }
+    cleanup(context);
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    _blurTask = BlurTask.read(context.getConfiguration());
+    _mutate = new BlurMutate();
+    _key = new BytesWritable();
+    _recordCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRecordCounterName());
+    _fieldCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getFieldCounterName());
+  }
+
+  @Override
+  protected abstract void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMutate.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMutate.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMutate.java
new file mode 100644
index 0000000..573f9c3
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurMutate.java
@@ -0,0 +1,84 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class BlurMutate implements Writable {
+
+  public enum MUTATE_TYPE {
+    ADD(0), UPDATE(1), DELETE(2);
+    private int _value;
+
+    private MUTATE_TYPE(int value) {
+      _value = value;
+    }
+
+    public int getValue() {
+      return _value;
+    }
+
+    public MUTATE_TYPE find(int value) {
+      switch (value) {
+      case 0:
+        return ADD;
+      case 1:
+        return UPDATE;
+      case 2:
+        return DELETE;
+      default:
+        throw new RuntimeException("Value [" + value + "] not found.");
+      }
+    }
+  }
+
+  private MUTATE_TYPE _mutateType = MUTATE_TYPE.UPDATE;
+  private BlurRecord _record = new BlurRecord();
+
+  public BlurRecord getRecord() {
+    return _record;
+  }
+
+  public void setRecord(BlurRecord record) {
+    _record = record;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeVInt(out, _mutateType.getValue());
+    _record.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _mutateType.find(IOUtil.readVInt(in));
+    _record.readFields(in);
+  }
+
+  public MUTATE_TYPE getMutateType() {
+    return _mutateType;
+  }
+
+  public void setMutateType(MUTATE_TYPE mutateType) {
+    _mutateType = mutateType;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurRecord.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurRecord.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurRecord.java
new file mode 100644
index 0000000..b45bcb3
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurRecord.java
@@ -0,0 +1,136 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.utils.ReaderBlurRecord;
+import org.apache.hadoop.io.Writable;
+
+
+public class BlurRecord implements Writable, ReaderBlurRecord {
+
+  private String _rowId;
+  private String _recordId;
+  private String _family;
+
+  private List<BlurColumn> _columns = new ArrayList<BlurColumn>();
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    _rowId = IOUtil.readString(in);
+    _recordId = IOUtil.readString(in);
+    _family = IOUtil.readString(in);
+    int size = IOUtil.readVInt(in);
+    _columns.clear();
+    for (int i = 0; i < size; i++) {
+      BlurColumn column = new BlurColumn();
+      column.readFields(in);
+      _columns.add(column);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    IOUtil.writeString(out, _rowId);
+    IOUtil.writeString(out, _recordId);
+    IOUtil.writeString(out, _family);
+    IOUtil.writeVInt(out, _columns.size());
+    for (BlurColumn column : _columns) {
+      column.write(out);
+    }
+  }
+
+  public String getRowId() {
+    return _rowId;
+  }
+
+  public void setRowId(String rowId) {
+    this._rowId = rowId;
+  }
+
+  public String getRecordId() {
+    return _recordId;
+  }
+
+  public void setRecordId(String recordId) {
+    this._recordId = recordId;
+  }
+
+  public String getFamily() {
+    return _family;
+  }
+
+  public void setFamily(String family) {
+    this._family = family;
+  }
+
+  public List<BlurColumn> getColumns() {
+    return _columns;
+  }
+
+  public void setColumns(List<BlurColumn> columns) {
+    this._columns = columns;
+  }
+
+  public void clearColumns() {
+    _columns.clear();
+  }
+
+  public void addColumn(BlurColumn column) {
+    _columns.add(column);
+  }
+
+  public void addColumn(String name, String value) {
+    BlurColumn blurColumn = new BlurColumn();
+    blurColumn.setName(name);
+    blurColumn.setValue(value);
+    addColumn(blurColumn);
+  }
+
+  @Override
+  public void setRecordIdStr(String value) {
+    setRecordId(value);
+  }
+
+  @Override
+  public void setFamilyStr(String family) {
+    setFamily(family);
+  }
+
+  public void reset() {
+    clearColumns();
+    _rowId = null;
+    _recordId = null;
+    _family = null;
+  }
+
+  @Override
+  public void setRowIdStr(String rowId) {
+    setRowId(rowId);
+  }
+
+  @Override
+  public String toString() {
+    return "{rowId=" + _rowId + ", recordId=" + _recordId + ", family=" + _family + ", columns=" + _columns + "}";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
new file mode 100644
index 0000000..3019798
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
@@ -0,0 +1,520 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.apache.blur.lucene.LuceneConstant.LUCENE_VERSION;
+import static org.apache.blur.utils.BlurConstants.RECORD_ID;
+import static org.apache.blur.utils.BlurConstants.ROW_ID;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.analysis.BlurAnalyzer;
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.lucene.search.FairSimilarity;
+import org.apache.blur.mapreduce.BlurMutate.MUTATE_TYPE;
+import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
+import org.apache.blur.store.compressed.CompressedFieldDataDirectory;
+import org.apache.blur.store.hdfs.HdfsDirectory;
+import org.apache.blur.thrift.generated.Column;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.blur.utils.Converter;
+import org.apache.blur.utils.IterableConverter;
+import org.apache.blur.utils.RowIndexWriter;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TermDocs;
+import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.NoLockFactory;
+import org.apache.lucene.util.IOUtils;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class BlurReducer extends Reducer<BytesWritable, BlurMutate, BytesWritable, BlurMutate> {
+
+  static class LuceneFileComparator implements Comparator<String> {
+
+    private Directory _directory;
+
+    public LuceneFileComparator(Directory directory) {
+      _directory = directory;
+    }
+
+    @Override
+    public int compare(String o1, String o2) {
+      try {
+        long fileLength1 = _directory.fileLength(o1);
+        long fileLength2 = _directory.fileLength(o2);
+        if (fileLength1 == fileLength2) {
+          return o1.compareTo(o2);
+        }
+        return (int) (fileLength2 - fileLength1);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  };
+
+  protected static final Log LOG = LogFactory.getLog(BlurReducer.class);
+  protected static final long REPORT_PERIOD = TimeUnit.SECONDS.toMillis(10);
+  protected static final double MB = 1024 * 1024;
+  protected IndexWriter _writer;
+  protected Directory _directory;
+  protected BlurAnalyzer _analyzer;
+  protected BlurTask _blurTask;
+
+  protected Counter _recordCounter;
+  protected Counter _rowCounter;
+  protected Counter _fieldCounter;
+  protected Counter _rowBreak;
+  protected Counter _rowFailures;
+  protected StringBuilder _builder = new StringBuilder();
+  protected byte[] _copyBuf;
+  protected Configuration _configuration;
+  protected long _start;
+  protected long _previousRow;
+  protected long _previousRecord;
+  protected long _prev;
+  protected IndexReader _reader;
+  protected Map<String, Document> _newDocs = new HashMap<String, Document>();
+  protected Set<String> _recordIdsToDelete = new HashSet<String>();
+  protected Term _rowIdTerm = new Term(BlurConstants.ROW_ID);
+  protected ZooKeeper _zookeeper;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    _blurTask = BlurTask.read(context.getConfiguration());
+    _configuration = context.getConfiguration();
+    setupCounters(context);
+    setupAnalyzer(context);
+    setupDirectory(context);
+    setupWriter(context);
+    if (_blurTask.getIndexingType() == INDEXING_TYPE.UPDATE) {
+      _reader = IndexReader.open(_directory);
+    }
+  }
+
+  protected void setupCounters(Context context) {
+    _rowCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRowCounterName());
+    _recordCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRecordCounterName());
+    _fieldCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getFieldCounterName());
+    _rowBreak = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRowBreakCounterName());
+    _rowFailures = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRowFailureCounterName());
+    _start = System.currentTimeMillis();
+    _prev = System.currentTimeMillis();
+  }
+
+  @Override
+  protected void reduce(BytesWritable key, Iterable<BlurMutate> values, Context context) throws IOException, InterruptedException {
+    if (!index(key, values, context)) {
+      _rowFailures.increment(1);
+    }
+  }
+
+  protected boolean index(BytesWritable key, Iterable<BlurMutate> values, Context context) throws IOException {
+    int recordCount = 0;
+    _newDocs.clear();
+    _recordIdsToDelete.clear();
+    boolean rowIdSet = false;
+
+    for (BlurMutate mutate : values) {
+      BlurRecord record = mutate.getRecord();
+      if (!rowIdSet) {
+        String rowId = record.getRowId();
+        _rowIdTerm = _rowIdTerm.createTerm(rowId);
+        rowIdSet = true;
+      }
+      if (mutate.getMutateType() == MUTATE_TYPE.DELETE) {
+        _recordIdsToDelete.add(record.getRecordId());
+        continue;
+      }
+      Document document = toDocument(record, _builder);
+      _newDocs.put(record.getRecordId(), document);
+
+      context.progress();
+      recordCount++;
+      if (recordCount >= _blurTask.getMaxRecordsPerRow()) {
+        return false;
+      }
+      if (_blurTask.getIndexingType() == INDEXING_TYPE.UPDATE) {
+        fetchOldRecords();
+      }
+    }
+
+    List<Document> docs = documentsToIndex(new ArrayList<Document>(_newDocs.values()));
+    if (docs.size() > 0) {
+      docs.get(0).add(BlurConstants.PRIME_DOC_FIELD);
+    }
+
+    switch (_blurTask.getIndexingType()) {
+    case REBUILD:
+      _writer.addDocuments(docs);
+      break;
+    case UPDATE:
+      _writer.updateDocuments(_rowIdTerm, docs);
+    default:
+      break;
+    }
+
+    _recordCounter.increment(recordCount);
+    _rowCounter.increment(1);
+    if (_prev + REPORT_PERIOD < System.currentTimeMillis()) {
+      long records = _recordCounter.getValue();
+      long rows = _rowCounter.getValue();
+
+      long now = System.currentTimeMillis();
+
+      double overAllSeconds = (now - _start) / 1000.0;
+      double overAllRecordRate = records / overAllSeconds;
+      double overAllRowsRate = rows / overAllSeconds;
+
+      double seconds = (now - _prev) / 1000.0;
+      double recordRate = (records - _previousRecord) / seconds;
+      double rowsRate = (rows - _previousRow) / seconds;
+
+      String status = String.format("Totals [%d Row, %d Records], Avg Rates [%.1f Row/s, %.1f Records/s] Rates [%.1f Row/s, %.1f Records/s]", rows, records, overAllRowsRate,
+          overAllRecordRate, rowsRate, recordRate);
+
+      LOG.info(status);
+      context.setStatus(status);
+
+      _previousRecord = records;
+      _previousRow = rows;
+      _prev = now;
+    }
+    return true;
+  }
+
+  protected List<Document> documentsToIndex(List<Document> list) {
+    return list;
+  }
+
+  protected void fetchOldRecords() throws IOException {
+    TermDocs termDocs = _reader.termDocs(_rowIdTerm);
+    // find all records for row that are not deleted.
+    while (termDocs.next()) {
+      int doc = termDocs.doc();
+      if (!_reader.isDeleted(doc)) {
+        Document document = _reader.document(doc);
+        String recordId = document.get(RECORD_ID);
+        // add them to the new records if the new records do not contain them.
+        if (!_newDocs.containsKey(recordId)) {
+          _newDocs.put(recordId, document);
+        }
+      }
+    }
+
+    // delete all records that should be removed.
+    for (String recordId : _recordIdsToDelete) {
+      _newDocs.remove(recordId);
+    }
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException, InterruptedException {
+    switch (_blurTask.getIndexingType()) {
+    case UPDATE:
+      cleanupFromUpdate(context);
+      return;
+    case REBUILD:
+      cleanupFromRebuild(context);
+      return;
+    default:
+      break;
+    }
+  }
+
+  protected void cleanupFromUpdate(Context context) throws IOException {
+    _writer.commit();
+    _writer.close();
+  }
+
+  protected void cleanupFromRebuild(Context context) throws IOException, InterruptedException {
+    _writer.commit();
+    _writer.close();
+
+    IndexReader reader = IndexReader.open(_directory);
+
+    TableDescriptor descriptor = _blurTask.getTableDescriptor();
+
+    Path directoryPath = _blurTask.getDirectoryPath(context);
+    remove(directoryPath);
+
+    NoLockFactory lockFactory = NoLockFactory.getNoLockFactory();
+
+    Directory destDirectory = getDestDirectory(descriptor, directoryPath);
+    destDirectory.setLockFactory(lockFactory);
+
+    boolean optimize = _blurTask.getOptimize();
+
+    if (optimize) {
+      context.setStatus("Starting Copy-Optimize Phase");
+      IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
+      TieredMergePolicy policy = (TieredMergePolicy) conf.getMergePolicy();
+      policy.setUseCompoundFile(false);
+      long s = System.currentTimeMillis();
+      IndexWriter writer = new IndexWriter(getBiggerBuffers(destDirectory), conf);
+      writer.addIndexes(reader);
+      writer.close();
+      long e = System.currentTimeMillis();
+      context.setStatus("Copying phase took [" + (e - s) + " ms]");
+      LOG.info("Copying phase took [" + (e - s) + " ms]");
+    } else {
+      context.setStatus("Starting Copy-Optimize Phase");
+      long s = System.currentTimeMillis();
+      List<String> files = getFilesOrderedBySize(_directory);
+      long totalBytesToCopy = getTotalBytes(_directory);
+      long totalBytesCopied = 0;
+      long startTime = System.currentTimeMillis();
+      for (String file : files) {
+        totalBytesCopied += copy(_directory, destDirectory, file, file, context, totalBytesCopied, totalBytesToCopy, startTime);
+      }
+      long e = System.currentTimeMillis();
+      context.setStatus("Copying phase took [" + (e - s) + " ms]");
+      LOG.info("Copying phase took [" + (e - s) + " ms]");
+    }
+  }
+
+  protected Directory getBiggerBuffers(Directory destDirectory) {
+    return new BufferedDirectory(destDirectory, 32768);
+  }
+
+  protected Directory getDestDirectory(TableDescriptor descriptor, Path directoryPath) throws IOException {
+    String compressionClass = descriptor.compressionClass;
+    int compressionBlockSize = descriptor.getCompressionBlockSize();
+    if (compressionClass == null) {
+      compressionClass = "org.apache.hadoop.io.compress.DefaultCodec";
+    }
+    // if (compressionBlockSize == 0) {
+    compressionBlockSize = 32768;
+    // }
+    HdfsDirectory dir = new HdfsDirectory(directoryPath);
+    return new CompressedFieldDataDirectory(dir, getInstance(compressionClass), compressionBlockSize);
+  }
+
+  protected CompressionCodec getInstance(String compressionClass) throws IOException {
+    try {
+      CompressionCodec codec = (CompressionCodec) Class.forName(compressionClass).newInstance();
+      if (codec instanceof Configurable) {
+        Configurable configurable = (Configurable) codec;
+        configurable.setConf(_configuration);
+      }
+      return codec;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  protected void remove(Path directoryPath) throws IOException {
+    FileSystem fileSystem = FileSystem.get(directoryPath.toUri(), _configuration);
+    fileSystem.delete(directoryPath, true);
+  }
+
+  protected long getTotalBytes(Directory directory) throws IOException {
+    long total = 0;
+    for (String file : directory.listAll()) {
+      total += directory.fileLength(file);
+    }
+    return total;
+  }
+
+  protected long copy(Directory from, Directory to, String src, String dest, Context context, long totalBytesCopied, long totalBytesToCopy, long startTime) throws IOException {
+    IndexOutput os = to.createOutput(dest);
+    IndexInput is = from.openInput(src);
+    IOException priorException = null;
+    try {
+      return copyBytes(is, os, is.length(), context, totalBytesCopied, totalBytesToCopy, startTime, src);
+    } catch (IOException ioe) {
+      priorException = ioe;
+    } finally {
+      IOUtils.closeWhileHandlingException(priorException, os, is);
+    }
+    return 0;// this should never be called
+  }
+
+  protected long copyBytes(IndexInput in, IndexOutput out, long numBytes, Context context, long totalBytesCopied, long totalBytesToCopy, long startTime, String src)
+      throws IOException {
+    if (_copyBuf == null) {
+      _copyBuf = new byte[BufferedIndexInput.BUFFER_SIZE];
+    }
+    long start = System.currentTimeMillis();
+    long copied = 0;
+    while (numBytes > 0) {
+      if (start + REPORT_PERIOD < System.currentTimeMillis()) {
+        report(context, totalBytesCopied + copied, totalBytesToCopy, startTime, src);
+        start = System.currentTimeMillis();
+      }
+      final int toCopy = (int) (numBytes > _copyBuf.length ? _copyBuf.length : numBytes);
+      in.readBytes(_copyBuf, 0, toCopy);
+      out.writeBytes(_copyBuf, 0, toCopy);
+      numBytes -= toCopy;
+      copied += toCopy;
+      context.progress();
+    }
+    return copied;
+  }
+
+  protected List<String> getFilesOrderedBySize(final Directory directory) throws IOException {
+    List<String> files = new ArrayList<String>(Arrays.asList(directory.listAll()));
+    Collections.sort(files, new LuceneFileComparator(_directory));
+    return files;
+  }
+
+  protected void setupDirectory(Context context) throws IOException {
+    TableDescriptor descriptor = _blurTask.getTableDescriptor();
+    switch (_blurTask.getIndexingType()) {
+    case UPDATE:
+      Path directoryPath = _blurTask.getDirectoryPath(context);
+      _directory = getDestDirectory(descriptor, directoryPath);
+
+      NoLockFactory lockFactory = NoLockFactory.getNoLockFactory();
+      _directory.setLockFactory(lockFactory);
+      return;
+    case REBUILD:
+      File dir = new File(System.getProperty("java.io.tmpdir"));
+      File path = new File(dir, "index");
+      rm(path);
+      LOG.info("Using local path [" + path + "] for indexing.");
+      String compressionClass = descriptor.compressionClass;
+      int compressionBlockSize = descriptor.getCompressionBlockSize();
+      if (compressionClass == null) {
+        compressionClass = "org.apache.hadoop.io.compress.DefaultCodec";
+      }
+
+      Directory localDirectory = FSDirectory.open(path);
+      // if (compressionBlockSize == 0) {
+      compressionBlockSize = 32768;
+      // }
+      CompressedFieldDataDirectory compressedFieldDataDirectory = new CompressedFieldDataDirectory(localDirectory, getInstance(compressionClass), compressionBlockSize);
+      _directory = new ProgressableDirectory(compressedFieldDataDirectory, context);
+      return;
+    default:
+      break;
+    }
+  }
+
+  protected String getNodeName(Context context) {
+    return context.getTaskAttemptID().toString();
+  }
+
+  protected void rm(File path) {
+    if (!path.exists()) {
+      return;
+    }
+    if (path.isDirectory()) {
+      for (File f : path.listFiles()) {
+        rm(f);
+      }
+    }
+    path.delete();
+  }
+
+  protected <T> T nullCheck(T o) {
+    if (o == null) {
+      throw new NullPointerException();
+    }
+    return o;
+  }
+
+  protected void setupWriter(Context context) throws IOException {
+    nullCheck(_directory);
+    nullCheck(_analyzer);
+    IndexWriterConfig config = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
+    config.setSimilarity(new FairSimilarity());
+    config.setRAMBufferSizeMB(_blurTask.getRamBufferSizeMB());
+    TieredMergePolicy mergePolicy = (TieredMergePolicy) config.getMergePolicy();
+    mergePolicy.setUseCompoundFile(false);
+    _writer = new IndexWriter(_directory, config);
+  }
+
+  protected void setupAnalyzer(Context context) {
+    _analyzer = new BlurAnalyzer(_blurTask.getTableDescriptor().getAnalyzerDefinition());
+  }
+
+  protected Document toDocument(BlurRecord record, StringBuilder builder) {
+    Document document = new Document();
+    document.add(new Field(ROW_ID, record.getRowId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
+    document.add(new Field(RECORD_ID, record.getRecordId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
+    String columnFamily = record.getFamily();
+    RowIndexWriter.addColumns(document, _analyzer, builder, columnFamily, new IterableConverter<BlurColumn, Column>(record.getColumns(), new Converter<BlurColumn, Column>() {
+      @Override
+      public Column convert(BlurColumn from) throws Exception {
+        _fieldCounter.increment(1);
+        return new Column(from.getName(), from.getValue());
+      }
+    }));
+    return document;
+  }
+
+  protected static void report(Context context, long totalBytesCopied, long totalBytesToCopy, long startTime, String src) {
+    long now = System.currentTimeMillis();
+    double seconds = (now - startTime) / 1000.0;
+    double rate = totalBytesCopied / seconds;
+    String time = estimateTimeToComplete(rate, totalBytesCopied, totalBytesToCopy);
+
+    String status = String.format("%.1f Complete - Time Remaining [%s s], Copy rate [%.1f MB/s], Total Copied [%.1f MB], Total To Copy [%.1f MB]",
+        getPerComplete(totalBytesCopied, totalBytesToCopy), time, getMb(rate), getMb(totalBytesCopied), getMb(totalBytesToCopy));
+    LOG.info(status);
+    context.setStatus(status);
+  }
+
+  protected static double getPerComplete(long totalBytesCopied, long totalBytesToCopy) {
+    return ((double) totalBytesCopied / (double) totalBytesToCopy) * 100.0;
+  }
+
+  protected static double getMb(double b) {
+    return b / MB;
+  }
+
+  protected static String estimateTimeToComplete(double rate, long totalBytesCopied, long totalBytesToCopy) {
+    long whatsLeft = totalBytesToCopy - totalBytesCopied;
+    long secondsLeft = (long) (whatsLeft / rate);
+    return BlurUtil.humanizeTime(secondsLeft, TimeUnit.SECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java
new file mode 100644
index 0000000..7e57070
--- /dev/null
+++ b/src/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurTask.java
@@ -0,0 +1,282 @@
+package org.apache.blur.mapreduce;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.manager.clusterstatus.ZookeeperClusterStatus;
+import org.apache.blur.manager.clusterstatus.ZookeeperPathConstants;
+import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class BlurTask implements Writable {
+
+  public enum INDEXING_TYPE {
+    REBUILD, UPDATE
+  }
+
+  private static final String BLUR_BLURTASK = "blur.blurtask";
+  private static final Log LOG = LogFactory.getLog(BlurTask.class);
+
+  public static String getCounterGroupName() {
+    return "Blur";
+  }
+
+  public static String getRowCounterName() {
+    return "Rows";
+  }
+
+  public static String getFieldCounterName() {
+    return "Fields";
+  }
+
+  public static String getRecordCounterName() {
+    return "Records";
+  }
+
+  public static String getRowBreakCounterName() {
+    return "Row Retries";
+  }
+
+  public static String getRowFailureCounterName() {
+    return "Row Failures";
+  }
+
+  private int _ramBufferSizeMB = 256;
+  private long _maxRecordCount = Long.MAX_VALUE;
+  private TableDescriptor _tableDescriptor;
+  private int _maxRecordsPerRow = 16384;
+  private boolean _optimize = true;
+  private INDEXING_TYPE _indexingType = INDEXING_TYPE.REBUILD;
+  private transient ZooKeeper _zooKeeper;
+
+  public String getShardName(TaskAttemptContext context) {
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    int id = taskAttemptID.getTaskID().getId();
+    return BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, id);
+  }
+
+  public Path getDirectoryPath(TaskAttemptContext context) {
+    String shardName = getShardName(context);
+    return new Path(new Path(_tableDescriptor.tableUri), shardName);
+  }
+
+  public int getNumReducers(Configuration configuration) {
+    Path tablePath = new Path(_tableDescriptor.tableUri);
+    try {
+      int num = _tableDescriptor.shardCount;
+      FileSystem fileSystem = FileSystem.get(tablePath.toUri(), configuration);
+      if (!fileSystem.exists(tablePath)) {
+        return num;
+      }
+      FileStatus[] files = fileSystem.listStatus(tablePath);
+      int shardCount = 0;
+      for (FileStatus fileStatus : files) {
+        if (fileStatus.isDir()) {
+          String name = fileStatus.getPath().getName();
+          if (name.startsWith(BlurConstants.SHARD_PREFIX)) {
+            shardCount++;
+          }
+        }
+      }
+
+      if (shardCount == 0) {
+        return num;
+      }
+      if (shardCount != num) {
+        LOG.warn("Asked for " + num + " reducers, but existing table " + _tableDescriptor.name + " has " + shardCount + " shards. Using " + shardCount + " reducers");
+      }
+      return shardCount;
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to connect to filesystem", e);
+    }
+  }
+
+  public int getRamBufferSizeMB() {
+    return _ramBufferSizeMB;
+  }
+
+  public void setRamBufferSizeMB(int ramBufferSizeMB) {
+    _ramBufferSizeMB = ramBufferSizeMB;
+  }
+
+  public long getMaxRecordCount() {
+    return _maxRecordCount;
+  }
+
+  public void setMaxRecordCount(long maxRecordCount) {
+    _maxRecordCount = maxRecordCount;
+  }
+
+  public void setTableDescriptor(TableDescriptor tableDescriptor) {
+    _tableDescriptor = tableDescriptor;
+  }
+
+  public TableDescriptor getTableDescriptor() {
+    return _tableDescriptor;
+  }
+
+  public Job configureJob(Configuration configuration) throws IOException {
+    if (getIndexingType() == INDEXING_TYPE.UPDATE) {
+      checkTable();
+    }
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    DataOutputStream output = new DataOutputStream(os);
+    write(output);
+    output.close();
+    String blurTask = new String(Base64.encodeBase64(os.toByteArray()));
+    configuration.set(BLUR_BLURTASK, blurTask);
+
+    Job job = new Job(configuration, "Blur Indexer");
+    job.setReducerClass(BlurReducer.class);
+    job.setOutputKeyClass(BytesWritable.class);
+    job.setOutputValueClass(BlurMutate.class);
+    job.setNumReduceTasks(getNumReducers(configuration));
+    return job;
+  }
+
+  private void checkTable() {
+    ZookeeperClusterStatus status = new ZookeeperClusterStatus(_zooKeeper);
+    // check if table exists
+    String cluster = _tableDescriptor.cluster;
+    String table = _tableDescriptor.name;
+    if (!status.exists(false, cluster, table)) {
+      throw new RuntimeException("Table [" + table + "] in cluster [" + cluster + "] does not exist.");
+    }
+    // check if table is locked
+    try {
+      List<String> children = _zooKeeper.getChildren(ZookeeperPathConstants.getLockPath(cluster, table), false);
+      if (!children.isEmpty()) {
+        throw new RuntimeException("Table [" + table + "] in cluster [" + cluster + "] has write locks enabled, cannot perform update.");
+      }
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  public static BlurTask read(Configuration configuration) throws IOException {
+    byte[] blurTaskBs = Base64.decodeBase64(configuration.get(BLUR_BLURTASK));
+    BlurTask blurTask = new BlurTask();
+    blurTask.readFields(new DataInputStream(new ByteArrayInputStream(blurTaskBs)));
+    return blurTask;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    _maxRecordCount = input.readLong();
+    _ramBufferSizeMB = input.readInt();
+    _optimize = input.readBoolean();
+    _indexingType = INDEXING_TYPE.valueOf(readString(input));
+    byte[] data = new byte[input.readInt()];
+    input.readFully(data);
+    ByteArrayInputStream is = new ByteArrayInputStream(data);
+    TIOStreamTransport trans = new TIOStreamTransport(is);
+    TBinaryProtocol protocol = new TBinaryProtocol(trans);
+    _tableDescriptor = new TableDescriptor();
+    try {
+      _tableDescriptor.read(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private String readString(DataInput input) throws IOException {
+    int length = input.readInt();
+    byte[] buf = new byte[length];
+    input.readFully(buf);
+    return new String(buf);
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeLong(_maxRecordCount);
+    output.writeInt(_ramBufferSizeMB);
+    output.writeBoolean(_optimize);
+    writeString(output, _indexingType.name());
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    TIOStreamTransport trans = new TIOStreamTransport(os);
+    TBinaryProtocol protocol = new TBinaryProtocol(trans);
+    try {
+      _tableDescriptor.write(protocol);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    os.close();
+    byte[] bs = os.toByteArray();
+    output.writeInt(bs.length);
+    output.write(bs);
+  }
+
+  private void writeString(DataOutput output, String s) throws IOException {
+    byte[] bs = s.getBytes();
+    output.writeInt(bs.length);
+    output.write(bs);
+  }
+
+  public int getMaxRecordsPerRow() {
+    return _maxRecordsPerRow;
+  }
+
+  public void setMaxRecordsPerRow(int maxRecordsPerRow) {
+    _maxRecordsPerRow = maxRecordsPerRow;
+  }
+
+  public boolean getOptimize() {
+    return _optimize;
+  }
+
+  public void setOptimize(boolean optimize) {
+    _optimize = optimize;
+  }
+
+  public INDEXING_TYPE getIndexingType() {
+    return _indexingType;
+  }
+
+  public void setIndexingType(INDEXING_TYPE indexingType) {
+    _indexingType = indexingType;
+  }
+}


Mime
View raw message