incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [23/51] [partial] Initial repackage to org.apache.
Date Mon, 03 Sep 2012 03:17:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/parser/Parser.java
----------------------------------------------------------------------
diff --git a/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/parser/Parser.java b/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/parser/Parser.java
new file mode 100644
index 0000000..c8b7287
--- /dev/null
+++ b/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/parser/Parser.java
@@ -0,0 +1,205 @@
+package org.apache.blur.jdbc.parser;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Parser {
+
+  private static final String SELECT = "select";
+  private static final String WHERE = "where";
+  private static final String NATURAL = "natural";
+  private static final String FROM = "from";
+  private static final String JOIN = "join";
+  private static final String SEP = new String(new char[] { 1 });
+
+  public static void main(String[] args) {
+    // System.out.println(new
+    // Parser().parse("select * from table where query('person.pn:(nice cool)')"));
+    // System.out.println(new
+    // Parser().parse("select * from table natural join table2 where person.pn = 'coandol''s' and jon='asdndandanda' And person.pf ='niorce' or nice = 'be'"));
+    // System.out.println(new
+    // Parser().parse("select * from table where person.pn = 'coandol''s' and jon='asdndandanda' And person.pf ='niorce' or nice = 'be'"));
+    System.out.println(new Parser().parse("SELECT * FROM TEST_TABLE T WHERE 1 = 0"));
+    System.out.println(new Parser().parse("select * from table t where 1 = 0"));
+    // System.out.println(new
+    // Parser().parse("select id,locationid,score,cf1.* from table where query('+person.pn:(nice cool) AND cool.a:nice')"));
+  }
+
+  private String where;
+  private String tableName;
+  private List<String> columnNames;
+  private String joinTable;
+
+  public Parser parse(String query) {
+    columnNames = getColumnNames(query);
+    tableName = getTableName(query);
+    where = getWhere(query);
+    joinTable = getJoin(query);
+    return this;
+  }
+
+  public String getJoinTable() {
+    return joinTable;
+  }
+
+  private String getJoin(String query) {
+    String table = null;
+    StringTokenizer tokenizer = new StringTokenizer(query);
+    while (tokenizer.hasMoreTokens()) {
+      if (NATURAL.equals(tokenizer.nextToken().toLowerCase())) {
+        if (JOIN.equals(tokenizer.nextToken().toLowerCase())) {
+          table = tokenizer.nextToken();
+        } else {
+          throw new RuntimeException();
+        }
+      }
+    }
+    return table;
+  }
+
+  private String getWhere(String query) {
+    StringBuilder result = new StringBuilder();
+    StringTokenizer tokenizer = new StringTokenizer(query);
+    while (tokenizer.hasMoreTokens()) {
+      if (WHERE.equals(tokenizer.nextToken().toLowerCase())) {
+        while (tokenizer.hasMoreTokens()) {
+          String token = tokenizer.nextToken();
+          result.append(token).append(' ');
+        }
+      }
+    }
+    return getQuery(result.toString().trim());
+  }
+
+  private String getQuery(String query) {
+    Pattern p = Pattern.compile("([qQ][uU][eE][rR][yY]\\s*\\(\\s*')(.*)('\\s*\\).*)");
+    Matcher matcher = p.matcher(query);
+    if (matcher.find()) {
+      if (matcher.groupCount() != 3) {// first one is the whole string
+        throw new RuntimeException("malformed query [" + query + "]");
+      }
+      return matcher.group(2);// 2nd group is the lucene query
+    } else {
+      return changeQueryToLucene(query);
+    }
+  }
+
+  private String changeQueryToLucene(String query) {
+    query = fixAndsOrs(query);
+    query = query.replaceAll("\\s*=\\s*", ":");
+    query = query.replace("''", SEP);
+    query = query.replaceAll("'", "");
+    query = query.replace(SEP, "'");
+    return query;
+  }
+
+  private String fixAndsOrs(String query) {
+    query = fixToUpperToken(query, "AND");
+    query = fixToUpperToken(query, "OR");
+    return query;
+  }
+
+  private String fixToUpperToken(String query, String token) {
+    String queryUpper = query.toUpperCase();
+    int start = 0;
+    int index = queryUpper.indexOf(token, start);
+    while (index != -1) {
+      if (!query.substring(index, index + token.length()).equals(token)) {
+        String everythingInStringToCurrentPosition = query.substring(0, index);
+        if (!isHitInParameter(everythingInStringToCurrentPosition)) {
+          query = query.substring(0, index) + token + query.substring(index + token.length());
+          return fixToUpperToken(query, token);
+        }
+      }
+      start = index + 1;
+      index = queryUpper.indexOf(token, start);
+    }
+    return query;
+  }
+
+  private boolean isHitInParameter(String everythingInStringToCurrentPosition) {
+    char[] charArray = everythingInStringToCurrentPosition.toCharArray();
+    int count = 0;
+    for (int i = 0; i < charArray.length; i++) {
+      if (charArray[i] == '\'') {
+        count++;
+      }
+    }
+    return count % 2 != 0;
+  }
+
+  private String getTableName(String query) {
+    StringTokenizer tokenizer = new StringTokenizer(query);
+    while (tokenizer.hasMoreTokens()) {
+      if (FROM.equals(tokenizer.nextToken().toLowerCase())) {
+        if (tokenizer.hasMoreTokens()) {
+          return tokenizer.nextToken();
+        }
+      }
+    }
+    throw new IllegalArgumentException("Table not found");
+  }
+
+  private List<String> getColumnNames(String query) {
+    StringTokenizer tokenizer = new StringTokenizer(query);
+    List<String> columnNames = new ArrayList<String>();
+    while (tokenizer.hasMoreTokens()) {
+      if (SELECT.equals(tokenizer.nextToken().toLowerCase())) {
+        while (tokenizer.hasMoreTokens()) {
+          String token = tokenizer.nextToken().toLowerCase();
+          if (FROM.equals(token)) {
+            return columnNames;
+          }
+          processColumnToken(columnNames, token);
+        }
+      }
+    }
+    return null;
+  }
+
+  private void processColumnToken(List<String> columnNames, String token) {
+    StringTokenizer tokenizer = new StringTokenizer(token, ",");
+    while (tokenizer.hasMoreTokens()) {
+      columnNames.add(tokenizer.nextToken());
+    }
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public List<String> getColumnNames() {
+    return columnNames;
+  }
+
+  public String getWhere() {
+    if (where == null || where.trim().isEmpty()) {
+      return "*";
+    }
+    return where;
+  }
+
+  @Override
+  public String toString() {
+    return "Parser [columnNames=" + columnNames + ", tableName=" + tableName + ", where=" + where + ", joinTable=" + joinTable + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/EmptyResultSet.java
----------------------------------------------------------------------
diff --git a/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/EmptyResultSet.java b/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/EmptyResultSet.java
new file mode 100644
index 0000000..1483a6b
--- /dev/null
+++ b/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/EmptyResultSet.java
@@ -0,0 +1,42 @@
+package org.apache.blur.jdbc.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+import org.apache.blur.jdbc.abstractimpl.AbstractBlurResultSet;
+
+
+public class EmptyResultSet extends AbstractBlurResultSet {
+
+  @Override
+  public void close() throws SQLException {
+
+  }
+
+  @Override
+  public boolean next() throws SQLException {
+    return false;
+  }
+
+  @Override
+  public ResultSetMetaData getMetaData() throws SQLException {
+    return new EmptyResultSetMetaData();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/EmptyResultSetMetaData.java
----------------------------------------------------------------------
diff --git a/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/EmptyResultSetMetaData.java b/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/EmptyResultSetMetaData.java
new file mode 100644
index 0000000..0a0c7ab
--- /dev/null
+++ b/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/EmptyResultSetMetaData.java
@@ -0,0 +1,32 @@
+package org.apache.blur.jdbc.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+import org.apache.blur.jdbc.abstractimpl.AbstractBlurResultSetMetaData;
+
+
+public class EmptyResultSetMetaData extends AbstractBlurResultSetMetaData implements ResultSetMetaData {
+
+  @Override
+  public int getColumnCount() throws SQLException {
+    return 0;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/NotImplemented.java
----------------------------------------------------------------------
diff --git a/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/NotImplemented.java b/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/NotImplemented.java
new file mode 100644
index 0000000..cec34ab
--- /dev/null
+++ b/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/NotImplemented.java
@@ -0,0 +1,37 @@
+package org.apache.blur.jdbc.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class NotImplemented extends RuntimeException {
+
+  public static final String BLUR_JDBC_DEBUG = "blur.jdbc.debug";
+  private static final long serialVersionUID = 4736975316647139778L;
+  public static final boolean debug = Boolean.getBoolean(BLUR_JDBC_DEBUG);
+
+  public NotImplemented() {
+    this(null);
+  }
+
+  public NotImplemented(String name) {
+    if (debug) {
+      if (name != null) {
+        System.err.println("Method [" + name + "]");
+      }
+      new Throwable().printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/SimpleStringResultSet.java
----------------------------------------------------------------------
diff --git a/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/SimpleStringResultSet.java b/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/SimpleStringResultSet.java
new file mode 100644
index 0000000..eff7797
--- /dev/null
+++ b/src/blur-jdbc/src/main/java/org/apache/blur/jdbc/util/SimpleStringResultSet.java
@@ -0,0 +1,101 @@
+package org.apache.blur.jdbc.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.blur.jdbc.abstractimpl.AbstractBlurResultSet;
+import org.apache.blur.jdbc.abstractimpl.AbstractBlurResultSetMetaData;
+
+
+public class SimpleStringResultSet extends AbstractBlurResultSet {
+
+  private List<String> columnNames;
+  private List<Map<String, String>> data;
+  private int position = -1;
+  private String lastValue;
+
+  public SimpleStringResultSet(List<String> columnNames, List<Map<String, String>> data) {
+    this.columnNames = columnNames;
+    this.data = data;
+  }
+
+  @Override
+  public ResultSetMetaData getMetaData() throws SQLException {
+    return new SimpleStringResultSetMetaData(columnNames);
+  }
+
+  @Override
+  public boolean next() throws SQLException {
+    if (position + 1 >= data.size()) {
+      return false;
+    }
+    position++;
+    return true;
+  }
+
+  @Override
+  public String getString(int columnIndex) throws SQLException {
+    String name = columnNames.get(columnIndex - 1);
+    Map<String, String> row = data.get(position);
+    return lastValue = row.get(name);
+  }
+
+  @Override
+  public boolean wasNull() throws SQLException {
+    return lastValue == null ? true : false;
+  }
+
+  @Override
+  public void close() throws SQLException {
+
+  }
+
+  public static class SimpleStringResultSetMetaData extends AbstractBlurResultSetMetaData {
+
+    private List<String> columnNames;
+
+    public SimpleStringResultSetMetaData(List<String> columnNames) {
+      this.columnNames = columnNames;
+    }
+
+    @Override
+    public int getColumnCount() throws SQLException {
+      return columnNames.size();
+    }
+
+    @Override
+    public String getColumnName(int column) throws SQLException {
+      return columnNames.get(column - 1);
+    }
+
+    @Override
+    public int getColumnType(int column) throws SQLException {
+      return Types.VARCHAR;
+    }
+
+    @Override
+    public String getColumnTypeName(int column) throws SQLException {
+      return "string";
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/pom.xml
----------------------------------------------------------------------
diff --git a/src/blur-mapred/pom.xml b/src/blur-mapred/pom.xml
index a4e1dd9..9eb37fa 100644
--- a/src/blur-mapred/pom.xml
+++ b/src/blur-mapred/pom.xml
@@ -20,12 +20,12 @@ under the License.
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<parent>
-		<groupId>com.nearinfinity.blur</groupId>
+		<groupId>org.apache.blur</groupId>
 		<artifactId>blur</artifactId>
 		<version>0.1.3</version>
 	</parent>
 	<modelVersion>4.0.0</modelVersion>
-	<groupId>com.nearinfinity.blur</groupId>
+	<groupId>org.apache.blur</groupId>
 	<artifactId>blur-mapred</artifactId>
 	<packaging>jar</packaging>
 	<name>Blur Map Reduce</name>
@@ -38,17 +38,17 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 		<dependency>
-			<groupId>com.nearinfinity.blur</groupId>
+			<groupId>org.apache.blur</groupId>
 			<artifactId>blur-core</artifactId>
 			<version>0.1.3</version>
 		</dependency>
 		<dependency>
-			<groupId>com.nearinfinity.blur</groupId>
+			<groupId>org.apache.blur</groupId>
 			<artifactId>blur-store</artifactId>
 			<version>0.1.3</version>
 		</dependency>
 		<dependency>
-			<groupId>com.nearinfinity.blur</groupId>
+			<groupId>org.apache.blur</groupId>
 			<artifactId>blur-util</artifactId>
 			<version>0.1.3</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapred/BlurInputFormat.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapred/BlurInputFormat.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapred/BlurInputFormat.java
deleted file mode 100644
index 7cee8a5..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapred/BlurInputFormat.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.nearinfinity.blur.mapred;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-import com.nearinfinity.blur.mapreduce.BlurRecord;
-
-public class BlurInputFormat implements InputFormat<Text, BlurRecord> {
-
-  @Override
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    List<?> splits = new ArrayList<Object>();
-    Path[] paths = FileInputFormat.getInputPaths(job);
-    for (Path path : paths) {
-      com.nearinfinity.blur.mapreduce.lib.BlurInputFormat.findAllSegments((Configuration) job, path, splits);
-    }
-    return splits.toArray(new InputSplit[] {});
-  }
-
-  @Override
-  public RecordReader<Text, BlurRecord> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
-    reporter.setStatus(split.toString());
-    return new BlurRecordReader(split, job);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapred/BlurRecordReader.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapred/BlurRecordReader.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapred/BlurRecordReader.java
deleted file mode 100644
index f283e54..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapred/BlurRecordReader.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package com.nearinfinity.blur.mapred;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.store.Directory;
-
-import com.nearinfinity.blur.mapreduce.BlurRecord;
-import com.nearinfinity.blur.mapreduce.lib.BlurInputSplit;
-import com.nearinfinity.blur.mapreduce.lib.Utils;
-import com.nearinfinity.blur.store.hdfs.HdfsDirectory;
-import com.nearinfinity.blur.utils.RowDocumentUtil;
-
-public class BlurRecordReader implements RecordReader<Text, BlurRecord> {
-
-  private IndexReader reader;
-  private Directory directory;
-  private int startingDocId;
-  private int endingDocId;
-  private int position;
-
-  public BlurRecordReader(InputSplit split, JobConf job) throws IOException {
-    BlurInputSplit blurSplit = (BlurInputSplit) split;
-    Path path = blurSplit.getIndexPath();
-    String segmentName = blurSplit.getSegmentName();
-    startingDocId = blurSplit.getStartingDocId();
-    endingDocId = blurSplit.getEndingDocId();
-    directory = new HdfsDirectory(path);
-
-    IndexCommit commit = Utils.findLatest(directory);
-    reader = Utils.openSegmentReader(directory, commit, segmentName, Utils.getTermInfosIndexDivisor(job));
-    int maxDoc = reader.maxDoc();
-    if (endingDocId >= maxDoc) {
-      endingDocId = maxDoc - 1;
-    }
-    position = startingDocId - 1;
-  }
-
-  @Override
-  public boolean next(Text key, BlurRecord value) throws IOException {
-    do {
-      position++;
-      if (position > endingDocId) {
-        return false;
-      }
-    } while (reader.isDeleted(position));
-    readDocument(key, value);
-    return true;
-  }
-
-  private void readDocument(Text rowid, BlurRecord record) throws CorruptIndexException, IOException {
-    Document document = reader.document(position);
-    record.reset();
-    rowid.set(RowDocumentUtil.readRecord(document, record));
-  }
-
-  @Override
-  public Text createKey() {
-    return new Text();
-  }
-
-  @Override
-  public BlurRecord createValue() {
-    return new BlurRecord();
-  }
-
-  @Override
-  public long getPos() throws IOException {
-    return position;
-  }
-
-  @Override
-  public void close() throws IOException {
-    reader.close();
-    directory.close();
-  }
-
-  @Override
-  public float getProgress() throws IOException {
-    int total = endingDocId - startingDocId;
-    return (float) position / (float) total;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurColumn.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurColumn.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurColumn.java
deleted file mode 100644
index 0be0108..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurColumn.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package com.nearinfinity.blur.mapreduce;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-public class BlurColumn implements Writable {
-
-  private String name;
-  private String value;
-
-  public BlurColumn() {
-  }
-
-  public BlurColumn(String name, String value) {
-    this.name = name;
-    this.value = value;
-  }
-
-  public boolean hasNull() {
-    if (name == null || value == null) {
-      return true;
-    }
-    return false;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    name = IOUtil.readString(in);
-    value = IOUtil.readString(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    IOUtil.writeString(out, name);
-    IOUtil.writeString(out, value);
-  }
-
-  public String getValue() {
-    return value;
-  }
-
-  public void setValue(String value) {
-    this.value = value;
-  }
-
-  @Override
-  public String toString() {
-    return "{name=" + name + ", value=" + value + "}";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurMapper.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurMapper.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurMapper.java
deleted file mode 100644
index 6b321ee..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurMapper.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.nearinfinity.blur.mapreduce;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Mapper;
-
-public abstract class BlurMapper<KEY, VALUE> extends Mapper<KEY, VALUE, BytesWritable, BlurMutate> {
-
-  protected BlurMutate _mutate;
-  protected BytesWritable _key;
-  protected BlurTask _blurTask;
-  protected Counter _recordCounter;
-  protected Counter _fieldCounter;
-
-  @Override
-  public void run(Context context) throws IOException, InterruptedException {
-    setup(context);
-    long maxRecordCount = _blurTask.getMaxRecordCount();
-    if (maxRecordCount == -1) {
-      maxRecordCount = Long.MAX_VALUE;
-    }
-    for (long l = 0; l < maxRecordCount && context.nextKeyValue(); l++) {
-      map(context.getCurrentKey(), context.getCurrentValue(), context);
-    }
-    cleanup(context);
-  }
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    _blurTask = BlurTask.read(context.getConfiguration());
-    _mutate = new BlurMutate();
-    _key = new BytesWritable();
-    _recordCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRecordCounterName());
-    _fieldCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getFieldCounterName());
-  }
-
-  @Override
-  protected abstract void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurMutate.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurMutate.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurMutate.java
deleted file mode 100644
index 6c1a924..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurMutate.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package com.nearinfinity.blur.mapreduce;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-public class BlurMutate implements Writable {
-
-  public enum MUTATE_TYPE {
-    ADD(0), UPDATE(1), DELETE(2);
-    private int _value;
-
-    private MUTATE_TYPE(int value) {
-      _value = value;
-    }
-
-    public int getValue() {
-      return _value;
-    }
-
-    public MUTATE_TYPE find(int value) {
-      switch (value) {
-      case 0:
-        return ADD;
-      case 1:
-        return UPDATE;
-      case 2:
-        return DELETE;
-      default:
-        throw new RuntimeException("Value [" + value + "] not found.");
-      }
-    }
-  }
-
-  private MUTATE_TYPE _mutateType = MUTATE_TYPE.UPDATE;
-  private BlurRecord _record = new BlurRecord();
-
-  public BlurRecord getRecord() {
-    return _record;
-  }
-
-  public void setRecord(BlurRecord record) {
-    _record = record;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    IOUtil.writeVInt(out, _mutateType.getValue());
-    _record.write(out);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    _mutateType.find(IOUtil.readVInt(in));
-    _record.readFields(in);
-  }
-
-  public MUTATE_TYPE getMutateType() {
-    return _mutateType;
-  }
-
-  public void setMutateType(MUTATE_TYPE mutateType) {
-    _mutateType = mutateType;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurRecord.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurRecord.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurRecord.java
deleted file mode 100644
index dc5508c..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurRecord.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package com.nearinfinity.blur.mapreduce;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-
-import com.nearinfinity.blur.utils.ReaderBlurRecord;
-
-public class BlurRecord implements Writable, ReaderBlurRecord {
-
-  private String _rowId;
-  private String _recordId;
-  private String _family;
-
-  private List<BlurColumn> _columns = new ArrayList<BlurColumn>();
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    _rowId = IOUtil.readString(in);
-    _recordId = IOUtil.readString(in);
-    _family = IOUtil.readString(in);
-    int size = IOUtil.readVInt(in);
-    _columns.clear();
-    for (int i = 0; i < size; i++) {
-      BlurColumn column = new BlurColumn();
-      column.readFields(in);
-      _columns.add(column);
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    IOUtil.writeString(out, _rowId);
-    IOUtil.writeString(out, _recordId);
-    IOUtil.writeString(out, _family);
-    IOUtil.writeVInt(out, _columns.size());
-    for (BlurColumn column : _columns) {
-      column.write(out);
-    }
-  }
-
-  public String getRowId() {
-    return _rowId;
-  }
-
-  public void setRowId(String rowId) {
-    this._rowId = rowId;
-  }
-
-  public String getRecordId() {
-    return _recordId;
-  }
-
-  public void setRecordId(String recordId) {
-    this._recordId = recordId;
-  }
-
-  public String getFamily() {
-    return _family;
-  }
-
-  public void setFamily(String family) {
-    this._family = family;
-  }
-
-  public List<BlurColumn> getColumns() {
-    return _columns;
-  }
-
-  public void setColumns(List<BlurColumn> columns) {
-    this._columns = columns;
-  }
-
-  public void clearColumns() {
-    _columns.clear();
-  }
-
-  public void addColumn(BlurColumn column) {
-    _columns.add(column);
-  }
-
-  public void addColumn(String name, String value) {
-    BlurColumn blurColumn = new BlurColumn();
-    blurColumn.setName(name);
-    blurColumn.setValue(value);
-    addColumn(blurColumn);
-  }
-
-  @Override
-  public void setRecordIdStr(String value) {
-    setRecordId(value);
-  }
-
-  @Override
-  public void setFamilyStr(String family) {
-    setFamily(family);
-  }
-
-  public void reset() {
-    clearColumns();
-    _rowId = null;
-    _recordId = null;
-    _family = null;
-  }
-
-  @Override
-  public void setRowIdStr(String rowId) {
-    setRowId(rowId);
-  }
-
-  @Override
-  public String toString() {
-    return "{rowId=" + _rowId + ", recordId=" + _recordId + ", family=" + _family + ", columns=" + _columns + "}";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurReducer.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurReducer.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurReducer.java
deleted file mode 100644
index ad5dfad..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurReducer.java
+++ /dev/null
@@ -1,520 +0,0 @@
-package com.nearinfinity.blur.mapreduce;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static com.nearinfinity.blur.lucene.LuceneConstant.LUCENE_VERSION;
-import static com.nearinfinity.blur.utils.BlurConstants.RECORD_ID;
-import static com.nearinfinity.blur.utils.BlurConstants.ROW_ID;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Index;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TermDocs;
-import org.apache.lucene.index.TieredMergePolicy;
-import org.apache.lucene.store.BufferedIndexInput;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.NoLockFactory;
-import org.apache.lucene.util.IOUtils;
-import org.apache.zookeeper.ZooKeeper;
-
-import com.nearinfinity.blur.analysis.BlurAnalyzer;
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-import com.nearinfinity.blur.lucene.search.FairSimilarity;
-import com.nearinfinity.blur.mapreduce.BlurMutate.MUTATE_TYPE;
-import com.nearinfinity.blur.mapreduce.BlurTask.INDEXING_TYPE;
-import com.nearinfinity.blur.store.compressed.CompressedFieldDataDirectory;
-import com.nearinfinity.blur.store.hdfs.HdfsDirectory;
-import com.nearinfinity.blur.thrift.generated.Column;
-import com.nearinfinity.blur.thrift.generated.TableDescriptor;
-import com.nearinfinity.blur.utils.BlurConstants;
-import com.nearinfinity.blur.utils.BlurUtil;
-import com.nearinfinity.blur.utils.Converter;
-import com.nearinfinity.blur.utils.IterableConverter;
-import com.nearinfinity.blur.utils.RowIndexWriter;
-
-public class BlurReducer extends Reducer<BytesWritable, BlurMutate, BytesWritable, BlurMutate> {
-
-  static class LuceneFileComparator implements Comparator<String> {
-
-    private Directory _directory;
-
-    public LuceneFileComparator(Directory directory) {
-      _directory = directory;
-    }
-
-    @Override
-    public int compare(String o1, String o2) {
-      try {
-        long fileLength1 = _directory.fileLength(o1);
-        long fileLength2 = _directory.fileLength(o2);
-        if (fileLength1 == fileLength2) {
-          return o1.compareTo(o2);
-        }
-        return (int) (fileLength2 - fileLength1);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  };
-
-  protected static final Log LOG = LogFactory.getLog(BlurReducer.class);
-  protected static final long REPORT_PERIOD = TimeUnit.SECONDS.toMillis(10);
-  protected static final double MB = 1024 * 1024;
-  protected IndexWriter _writer;
-  protected Directory _directory;
-  protected BlurAnalyzer _analyzer;
-  protected BlurTask _blurTask;
-
-  protected Counter _recordCounter;
-  protected Counter _rowCounter;
-  protected Counter _fieldCounter;
-  protected Counter _rowBreak;
-  protected Counter _rowFailures;
-  protected StringBuilder _builder = new StringBuilder();
-  protected byte[] _copyBuf;
-  protected Configuration _configuration;
-  protected long _start;
-  protected long _previousRow;
-  protected long _previousRecord;
-  protected long _prev;
-  protected IndexReader _reader;
-  protected Map<String, Document> _newDocs = new HashMap<String, Document>();
-  protected Set<String> _recordIdsToDelete = new HashSet<String>();
-  protected Term _rowIdTerm = new Term(BlurConstants.ROW_ID);
-  protected ZooKeeper _zookeeper;
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    _blurTask = BlurTask.read(context.getConfiguration());
-    _configuration = context.getConfiguration();
-    setupCounters(context);
-    setupAnalyzer(context);
-    setupDirectory(context);
-    setupWriter(context);
-    if (_blurTask.getIndexingType() == INDEXING_TYPE.UPDATE) {
-      _reader = IndexReader.open(_directory);
-    }
-  }
-
-  protected void setupCounters(Context context) {
-    _rowCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRowCounterName());
-    _recordCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRecordCounterName());
-    _fieldCounter = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getFieldCounterName());
-    _rowBreak = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRowBreakCounterName());
-    _rowFailures = context.getCounter(BlurTask.getCounterGroupName(), BlurTask.getRowFailureCounterName());
-    _start = System.currentTimeMillis();
-    _prev = System.currentTimeMillis();
-  }
-
-  @Override
-  protected void reduce(BytesWritable key, Iterable<BlurMutate> values, Context context) throws IOException, InterruptedException {
-    if (!index(key, values, context)) {
-      _rowFailures.increment(1);
-    }
-  }
-
-  protected boolean index(BytesWritable key, Iterable<BlurMutate> values, Context context) throws IOException {
-    int recordCount = 0;
-    _newDocs.clear();
-    _recordIdsToDelete.clear();
-    boolean rowIdSet = false;
-
-    for (BlurMutate mutate : values) {
-      BlurRecord record = mutate.getRecord();
-      if (!rowIdSet) {
-        String rowId = record.getRowId();
-        _rowIdTerm = _rowIdTerm.createTerm(rowId);
-        rowIdSet = true;
-      }
-      if (mutate.getMutateType() == MUTATE_TYPE.DELETE) {
-        _recordIdsToDelete.add(record.getRecordId());
-        continue;
-      }
-      Document document = toDocument(record, _builder);
-      _newDocs.put(record.getRecordId(), document);
-
-      context.progress();
-      recordCount++;
-      if (recordCount >= _blurTask.getMaxRecordsPerRow()) {
-        return false;
-      }
-      if (_blurTask.getIndexingType() == INDEXING_TYPE.UPDATE) {
-        fetchOldRecords();
-      }
-    }
-
-    List<Document> docs = documentsToIndex(new ArrayList<Document>(_newDocs.values()));
-    if (docs.size() > 0) {
-      docs.get(0).add(BlurConstants.PRIME_DOC_FIELD);
-    }
-
-    switch (_blurTask.getIndexingType()) {
-    case REBUILD:
-      _writer.addDocuments(docs);
-      break;
-    case UPDATE:
-      _writer.updateDocuments(_rowIdTerm, docs);
-    default:
-      break;
-    }
-
-    _recordCounter.increment(recordCount);
-    _rowCounter.increment(1);
-    if (_prev + REPORT_PERIOD < System.currentTimeMillis()) {
-      long records = _recordCounter.getValue();
-      long rows = _rowCounter.getValue();
-
-      long now = System.currentTimeMillis();
-
-      double overAllSeconds = (now - _start) / 1000.0;
-      double overAllRecordRate = records / overAllSeconds;
-      double overAllRowsRate = rows / overAllSeconds;
-
-      double seconds = (now - _prev) / 1000.0;
-      double recordRate = (records - _previousRecord) / seconds;
-      double rowsRate = (rows - _previousRow) / seconds;
-
-      String status = String.format("Totals [%d Row, %d Records], Avg Rates [%.1f Row/s, %.1f Records/s] Rates [%.1f Row/s, %.1f Records/s]", rows, records, overAllRowsRate,
-          overAllRecordRate, rowsRate, recordRate);
-
-      LOG.info(status);
-      context.setStatus(status);
-
-      _previousRecord = records;
-      _previousRow = rows;
-      _prev = now;
-    }
-    return true;
-  }
-
-  protected List<Document> documentsToIndex(List<Document> list) {
-    return list;
-  }
-
-  protected void fetchOldRecords() throws IOException {
-    TermDocs termDocs = _reader.termDocs(_rowIdTerm);
-    // find all records for row that are not deleted.
-    while (termDocs.next()) {
-      int doc = termDocs.doc();
-      if (!_reader.isDeleted(doc)) {
-        Document document = _reader.document(doc);
-        String recordId = document.get(RECORD_ID);
-        // add them to the new records if the new records do not contain them.
-        if (!_newDocs.containsKey(recordId)) {
-          _newDocs.put(recordId, document);
-        }
-      }
-    }
-
-    // delete all records that should be removed.
-    for (String recordId : _recordIdsToDelete) {
-      _newDocs.remove(recordId);
-    }
-  }
-
-  @Override
-  protected void cleanup(Context context) throws IOException, InterruptedException {
-    switch (_blurTask.getIndexingType()) {
-    case UPDATE:
-      cleanupFromUpdate(context);
-      return;
-    case REBUILD:
-      cleanupFromRebuild(context);
-      return;
-    default:
-      break;
-    }
-  }
-
-  protected void cleanupFromUpdate(Context context) throws IOException {
-    _writer.commit();
-    _writer.close();
-  }
-
-  protected void cleanupFromRebuild(Context context) throws IOException, InterruptedException {
-    _writer.commit();
-    _writer.close();
-
-    IndexReader reader = IndexReader.open(_directory);
-
-    TableDescriptor descriptor = _blurTask.getTableDescriptor();
-
-    Path directoryPath = _blurTask.getDirectoryPath(context);
-    remove(directoryPath);
-
-    NoLockFactory lockFactory = NoLockFactory.getNoLockFactory();
-
-    Directory destDirectory = getDestDirectory(descriptor, directoryPath);
-    destDirectory.setLockFactory(lockFactory);
-
-    boolean optimize = _blurTask.getOptimize();
-
-    if (optimize) {
-      context.setStatus("Starting Copy-Optimize Phase");
-      IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
-      TieredMergePolicy policy = (TieredMergePolicy) conf.getMergePolicy();
-      policy.setUseCompoundFile(false);
-      long s = System.currentTimeMillis();
-      IndexWriter writer = new IndexWriter(getBiggerBuffers(destDirectory), conf);
-      writer.addIndexes(reader);
-      writer.close();
-      long e = System.currentTimeMillis();
-      context.setStatus("Copying phase took [" + (e - s) + " ms]");
-      LOG.info("Copying phase took [" + (e - s) + " ms]");
-    } else {
-      context.setStatus("Starting Copy-Optimize Phase");
-      long s = System.currentTimeMillis();
-      List<String> files = getFilesOrderedBySize(_directory);
-      long totalBytesToCopy = getTotalBytes(_directory);
-      long totalBytesCopied = 0;
-      long startTime = System.currentTimeMillis();
-      for (String file : files) {
-        totalBytesCopied += copy(_directory, destDirectory, file, file, context, totalBytesCopied, totalBytesToCopy, startTime);
-      }
-      long e = System.currentTimeMillis();
-      context.setStatus("Copying phase took [" + (e - s) + " ms]");
-      LOG.info("Copying phase took [" + (e - s) + " ms]");
-    }
-  }
-
-  protected Directory getBiggerBuffers(Directory destDirectory) {
-    return new BufferedDirectory(destDirectory, 32768);
-  }
-
-  protected Directory getDestDirectory(TableDescriptor descriptor, Path directoryPath) throws IOException {
-    String compressionClass = descriptor.compressionClass;
-    int compressionBlockSize = descriptor.getCompressionBlockSize();
-    if (compressionClass == null) {
-      compressionClass = "org.apache.hadoop.io.compress.DefaultCodec";
-    }
-    // if (compressionBlockSize == 0) {
-    compressionBlockSize = 32768;
-    // }
-    HdfsDirectory dir = new HdfsDirectory(directoryPath);
-    return new CompressedFieldDataDirectory(dir, getInstance(compressionClass), compressionBlockSize);
-  }
-
-  protected CompressionCodec getInstance(String compressionClass) throws IOException {
-    try {
-      CompressionCodec codec = (CompressionCodec) Class.forName(compressionClass).newInstance();
-      if (codec instanceof Configurable) {
-        Configurable configurable = (Configurable) codec;
-        configurable.setConf(_configuration);
-      }
-      return codec;
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  protected void remove(Path directoryPath) throws IOException {
-    FileSystem fileSystem = FileSystem.get(directoryPath.toUri(), _configuration);
-    fileSystem.delete(directoryPath, true);
-  }
-
-  protected long getTotalBytes(Directory directory) throws IOException {
-    long total = 0;
-    for (String file : directory.listAll()) {
-      total += directory.fileLength(file);
-    }
-    return total;
-  }
-
-  protected long copy(Directory from, Directory to, String src, String dest, Context context, long totalBytesCopied, long totalBytesToCopy, long startTime) throws IOException {
-    IndexOutput os = to.createOutput(dest);
-    IndexInput is = from.openInput(src);
-    IOException priorException = null;
-    try {
-      return copyBytes(is, os, is.length(), context, totalBytesCopied, totalBytesToCopy, startTime, src);
-    } catch (IOException ioe) {
-      priorException = ioe;
-    } finally {
-      IOUtils.closeWhileHandlingException(priorException, os, is);
-    }
-    return 0;// this should never be called
-  }
-
-  protected long copyBytes(IndexInput in, IndexOutput out, long numBytes, Context context, long totalBytesCopied, long totalBytesToCopy, long startTime, String src)
-      throws IOException {
-    if (_copyBuf == null) {
-      _copyBuf = new byte[BufferedIndexInput.BUFFER_SIZE];
-    }
-    long start = System.currentTimeMillis();
-    long copied = 0;
-    while (numBytes > 0) {
-      if (start + REPORT_PERIOD < System.currentTimeMillis()) {
-        report(context, totalBytesCopied + copied, totalBytesToCopy, startTime, src);
-        start = System.currentTimeMillis();
-      }
-      final int toCopy = (int) (numBytes > _copyBuf.length ? _copyBuf.length : numBytes);
-      in.readBytes(_copyBuf, 0, toCopy);
-      out.writeBytes(_copyBuf, 0, toCopy);
-      numBytes -= toCopy;
-      copied += toCopy;
-      context.progress();
-    }
-    return copied;
-  }
-
-  protected List<String> getFilesOrderedBySize(final Directory directory) throws IOException {
-    List<String> files = new ArrayList<String>(Arrays.asList(directory.listAll()));
-    Collections.sort(files, new LuceneFileComparator(_directory));
-    return files;
-  }
-
-  protected void setupDirectory(Context context) throws IOException {
-    TableDescriptor descriptor = _blurTask.getTableDescriptor();
-    switch (_blurTask.getIndexingType()) {
-    case UPDATE:
-      Path directoryPath = _blurTask.getDirectoryPath(context);
-      _directory = getDestDirectory(descriptor, directoryPath);
-
-      NoLockFactory lockFactory = NoLockFactory.getNoLockFactory();
-      _directory.setLockFactory(lockFactory);
-      return;
-    case REBUILD:
-      File dir = new File(System.getProperty("java.io.tmpdir"));
-      File path = new File(dir, "index");
-      rm(path);
-      LOG.info("Using local path [" + path + "] for indexing.");
-      String compressionClass = descriptor.compressionClass;
-      int compressionBlockSize = descriptor.getCompressionBlockSize();
-      if (compressionClass == null) {
-        compressionClass = "org.apache.hadoop.io.compress.DefaultCodec";
-      }
-
-      Directory localDirectory = FSDirectory.open(path);
-      // if (compressionBlockSize == 0) {
-      compressionBlockSize = 32768;
-      // }
-      CompressedFieldDataDirectory compressedFieldDataDirectory = new CompressedFieldDataDirectory(localDirectory, getInstance(compressionClass), compressionBlockSize);
-      _directory = new ProgressableDirectory(compressedFieldDataDirectory, context);
-      return;
-    default:
-      break;
-    }
-  }
-
-  protected String getNodeName(Context context) {
-    return context.getTaskAttemptID().toString();
-  }
-
-  protected void rm(File path) {
-    if (!path.exists()) {
-      return;
-    }
-    if (path.isDirectory()) {
-      for (File f : path.listFiles()) {
-        rm(f);
-      }
-    }
-    path.delete();
-  }
-
-  protected <T> T nullCheck(T o) {
-    if (o == null) {
-      throw new NullPointerException();
-    }
-    return o;
-  }
-
-  protected void setupWriter(Context context) throws IOException {
-    nullCheck(_directory);
-    nullCheck(_analyzer);
-    IndexWriterConfig config = new IndexWriterConfig(LUCENE_VERSION, _analyzer);
-    config.setSimilarity(new FairSimilarity());
-    config.setRAMBufferSizeMB(_blurTask.getRamBufferSizeMB());
-    TieredMergePolicy mergePolicy = (TieredMergePolicy) config.getMergePolicy();
-    mergePolicy.setUseCompoundFile(false);
-    _writer = new IndexWriter(_directory, config);
-  }
-
-  protected void setupAnalyzer(Context context) {
-    _analyzer = new BlurAnalyzer(_blurTask.getTableDescriptor().getAnalyzerDefinition());
-  }
-
-  protected Document toDocument(BlurRecord record, StringBuilder builder) {
-    Document document = new Document();
-    document.add(new Field(ROW_ID, record.getRowId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
-    document.add(new Field(RECORD_ID, record.getRecordId(), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
-    String columnFamily = record.getFamily();
-    RowIndexWriter.addColumns(document, _analyzer, builder, columnFamily, new IterableConverter<BlurColumn, Column>(record.getColumns(), new Converter<BlurColumn, Column>() {
-      @Override
-      public Column convert(BlurColumn from) throws Exception {
-        _fieldCounter.increment(1);
-        return new Column(from.getName(), from.getValue());
-      }
-    }));
-    return document;
-  }
-
-  protected static void report(Context context, long totalBytesCopied, long totalBytesToCopy, long startTime, String src) {
-    long now = System.currentTimeMillis();
-    double seconds = (now - startTime) / 1000.0;
-    double rate = totalBytesCopied / seconds;
-    String time = estimateTimeToComplete(rate, totalBytesCopied, totalBytesToCopy);
-
-    String status = String.format("%.1f Complete - Time Remaining [%s s], Copy rate [%.1f MB/s], Total Copied [%.1f MB], Total To Copy [%.1f MB]",
-        getPerComplete(totalBytesCopied, totalBytesToCopy), time, getMb(rate), getMb(totalBytesCopied), getMb(totalBytesToCopy));
-    LOG.info(status);
-    context.setStatus(status);
-  }
-
-  protected static double getPerComplete(long totalBytesCopied, long totalBytesToCopy) {
-    return ((double) totalBytesCopied / (double) totalBytesToCopy) * 100.0;
-  }
-
-  protected static double getMb(double b) {
-    return b / MB;
-  }
-
-  protected static String estimateTimeToComplete(double rate, long totalBytesCopied, long totalBytesToCopy) {
-    long whatsLeft = totalBytesToCopy - totalBytesCopied;
-    long secondsLeft = (long) (whatsLeft / rate);
-    return BlurUtil.humanizeTime(secondsLeft, TimeUnit.SECONDS);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurTask.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurTask.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurTask.java
deleted file mode 100644
index f7f4df2..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BlurTask.java
+++ /dev/null
@@ -1,282 +0,0 @@
-package com.nearinfinity.blur.mapreduce;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-import com.nearinfinity.blur.manager.clusterstatus.ZookeeperClusterStatus;
-import com.nearinfinity.blur.manager.clusterstatus.ZookeeperPathConstants;
-import com.nearinfinity.blur.thrift.generated.TableDescriptor;
-import com.nearinfinity.blur.utils.BlurConstants;
-import com.nearinfinity.blur.utils.BlurUtil;
-
-public class BlurTask implements Writable {
-
-  public enum INDEXING_TYPE {
-    REBUILD, UPDATE
-  }
-
-  private static final String BLUR_BLURTASK = "blur.blurtask";
-  private static final Log LOG = LogFactory.getLog(BlurTask.class);
-
-  public static String getCounterGroupName() {
-    return "Blur";
-  }
-
-  public static String getRowCounterName() {
-    return "Rows";
-  }
-
-  public static String getFieldCounterName() {
-    return "Fields";
-  }
-
-  public static String getRecordCounterName() {
-    return "Records";
-  }
-
-  public static String getRowBreakCounterName() {
-    return "Row Retries";
-  }
-
-  public static String getRowFailureCounterName() {
-    return "Row Failures";
-  }
-
-  private int _ramBufferSizeMB = 256;
-  private long _maxRecordCount = Long.MAX_VALUE;
-  private TableDescriptor _tableDescriptor;
-  private int _maxRecordsPerRow = 16384;
-  private boolean _optimize = true;
-  private INDEXING_TYPE _indexingType = INDEXING_TYPE.REBUILD;
-  private transient ZooKeeper _zooKeeper;
-
-  public String getShardName(TaskAttemptContext context) {
-    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
-    int id = taskAttemptID.getTaskID().getId();
-    return BlurUtil.getShardName(BlurConstants.SHARD_PREFIX, id);
-  }
-
-  public Path getDirectoryPath(TaskAttemptContext context) {
-    String shardName = getShardName(context);
-    return new Path(new Path(_tableDescriptor.tableUri), shardName);
-  }
-
-  public int getNumReducers(Configuration configuration) {
-    Path tablePath = new Path(_tableDescriptor.tableUri);
-    try {
-      int num = _tableDescriptor.shardCount;
-      FileSystem fileSystem = FileSystem.get(tablePath.toUri(), configuration);
-      if (!fileSystem.exists(tablePath)) {
-        return num;
-      }
-      FileStatus[] files = fileSystem.listStatus(tablePath);
-      int shardCount = 0;
-      for (FileStatus fileStatus : files) {
-        if (fileStatus.isDir()) {
-          String name = fileStatus.getPath().getName();
-          if (name.startsWith(BlurConstants.SHARD_PREFIX)) {
-            shardCount++;
-          }
-        }
-      }
-
-      if (shardCount == 0) {
-        return num;
-      }
-      if (shardCount != num) {
-        LOG.warn("Asked for " + num + " reducers, but existing table " + _tableDescriptor.name + " has " + shardCount + " shards. Using " + shardCount + " reducers");
-      }
-      return shardCount;
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to connect to filesystem", e);
-    }
-  }
-
-  public int getRamBufferSizeMB() {
-    return _ramBufferSizeMB;
-  }
-
-  public void setRamBufferSizeMB(int ramBufferSizeMB) {
-    _ramBufferSizeMB = ramBufferSizeMB;
-  }
-
-  public long getMaxRecordCount() {
-    return _maxRecordCount;
-  }
-
-  public void setMaxRecordCount(long maxRecordCount) {
-    _maxRecordCount = maxRecordCount;
-  }
-
-  public void setTableDescriptor(TableDescriptor tableDescriptor) {
-    _tableDescriptor = tableDescriptor;
-  }
-
-  public TableDescriptor getTableDescriptor() {
-    return _tableDescriptor;
-  }
-
-  public Job configureJob(Configuration configuration) throws IOException {
-    if (getIndexingType() == INDEXING_TYPE.UPDATE) {
-      checkTable();
-    }
-    ByteArrayOutputStream os = new ByteArrayOutputStream();
-    DataOutputStream output = new DataOutputStream(os);
-    write(output);
-    output.close();
-    String blurTask = new String(Base64.encodeBase64(os.toByteArray()));
-    configuration.set(BLUR_BLURTASK, blurTask);
-
-    Job job = new Job(configuration, "Blur Indexer");
-    job.setReducerClass(BlurReducer.class);
-    job.setOutputKeyClass(BytesWritable.class);
-    job.setOutputValueClass(BlurMutate.class);
-    job.setNumReduceTasks(getNumReducers(configuration));
-    return job;
-  }
-
-  private void checkTable() {
-    ZookeeperClusterStatus status = new ZookeeperClusterStatus(_zooKeeper);
-    // check if table exists
-    String cluster = _tableDescriptor.cluster;
-    String table = _tableDescriptor.name;
-    if (!status.exists(false, cluster, table)) {
-      throw new RuntimeException("Table [" + table + "] in cluster [" + cluster + "] does not exist.");
-    }
-    // check if table is locked
-    try {
-      List<String> children = _zooKeeper.getChildren(ZookeeperPathConstants.getLockPath(cluster, table), false);
-      if (!children.isEmpty()) {
-        throw new RuntimeException("Table [" + table + "] in cluster [" + cluster + "] has write locks enabled, cannot perform update.");
-      }
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-
-  }
-
-  public static BlurTask read(Configuration configuration) throws IOException {
-    byte[] blurTaskBs = Base64.decodeBase64(configuration.get(BLUR_BLURTASK));
-    BlurTask blurTask = new BlurTask();
-    blurTask.readFields(new DataInputStream(new ByteArrayInputStream(blurTaskBs)));
-    return blurTask;
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    _maxRecordCount = input.readLong();
-    _ramBufferSizeMB = input.readInt();
-    _optimize = input.readBoolean();
-    _indexingType = INDEXING_TYPE.valueOf(readString(input));
-    byte[] data = new byte[input.readInt()];
-    input.readFully(data);
-    ByteArrayInputStream is = new ByteArrayInputStream(data);
-    TIOStreamTransport trans = new TIOStreamTransport(is);
-    TBinaryProtocol protocol = new TBinaryProtocol(trans);
-    _tableDescriptor = new TableDescriptor();
-    try {
-      _tableDescriptor.read(protocol);
-    } catch (TException e) {
-      throw new IOException(e);
-    }
-  }
-
-  private String readString(DataInput input) throws IOException {
-    int length = input.readInt();
-    byte[] buf = new byte[length];
-    input.readFully(buf);
-    return new String(buf);
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    output.writeLong(_maxRecordCount);
-    output.writeInt(_ramBufferSizeMB);
-    output.writeBoolean(_optimize);
-    writeString(output, _indexingType.name());
-    ByteArrayOutputStream os = new ByteArrayOutputStream();
-    TIOStreamTransport trans = new TIOStreamTransport(os);
-    TBinaryProtocol protocol = new TBinaryProtocol(trans);
-    try {
-      _tableDescriptor.write(protocol);
-    } catch (TException e) {
-      throw new IOException(e);
-    }
-    os.close();
-    byte[] bs = os.toByteArray();
-    output.writeInt(bs.length);
-    output.write(bs);
-  }
-
-  private void writeString(DataOutput output, String s) throws IOException {
-    byte[] bs = s.getBytes();
-    output.writeInt(bs.length);
-    output.write(bs);
-  }
-
-  public int getMaxRecordsPerRow() {
-    return _maxRecordsPerRow;
-  }
-
-  public void setMaxRecordsPerRow(int maxRecordsPerRow) {
-    _maxRecordsPerRow = maxRecordsPerRow;
-  }
-
-  public boolean getOptimize() {
-    return _optimize;
-  }
-
-  public void setOptimize(boolean optimize) {
-    _optimize = optimize;
-  }
-
-  public INDEXING_TYPE getIndexingType() {
-    return _indexingType;
-  }
-
-  public void setIndexingType(INDEXING_TYPE indexingType) {
-    _indexingType = indexingType;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BufferedDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BufferedDirectory.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BufferedDirectory.java
deleted file mode 100644
index 0131996..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/BufferedDirectory.java
+++ /dev/null
@@ -1,150 +0,0 @@
-package com.nearinfinity.blur.mapreduce;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.lucene.store.BufferedIndexInput;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-
-public class BufferedDirectory extends Directory {
-
-  private Directory _directory;
-  private int _buffer;
-
-  public BufferedDirectory(Directory directory, int buffer) {
-    _directory = directory;
-    _buffer = buffer;
-  }
-
-  public void close() throws IOException {
-    _directory.close();
-  }
-
-  public IndexOutput createOutput(String name) throws IOException {
-    return _directory.createOutput(name);
-  }
-
-  public void deleteFile(String name) throws IOException {
-    _directory.deleteFile(name);
-  }
-
-  public boolean fileExists(String name) throws IOException {
-    return _directory.fileExists(name);
-  }
-
-  public long fileLength(String name) throws IOException {
-    return _directory.fileLength(name);
-  }
-
-  @SuppressWarnings("deprecation")
-  public long fileModified(String name) throws IOException {
-    return _directory.fileModified(name);
-  }
-
-  public String[] listAll() throws IOException {
-    return _directory.listAll();
-  }
-
-  public IndexInput openInput(String name, int bufferSize) throws IOException {
-    return openInput(name);
-  }
-
-  public IndexInput openInput(String name) throws IOException {
-    return new BigBufferIndexInput(name, _directory.openInput(name), _buffer);
-  }
-
-  @SuppressWarnings("deprecation")
-  public void touchFile(String name) throws IOException {
-    _directory.touchFile(name);
-  }
-
-  public static class BigBufferIndexInput extends BufferedIndexInput {
-
-    private IndexInput _input;
-    private long _length;
-
-    public BigBufferIndexInput(String name, IndexInput input, int buffer) {
-      super(name, buffer);
-      _input = input;
-      _length = input.length();
-    }
-
-    @Override
-    protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      _input.seek(getFilePointer());
-      _input.readBytes(b, offset, length);
-    }
-
-    @Override
-    protected void seekInternal(long pos) throws IOException {
-
-    }
-
-    @Override
-    public void close() throws IOException {
-      _input.close();
-    }
-
-    @Override
-    public long length() {
-      return _length;
-    }
-
-    @Override
-    public Object clone() {
-      BigBufferIndexInput clone = (BigBufferIndexInput) super.clone();
-      clone._input = (IndexInput) _input.clone();
-      return clone;
-    }
-  }
-
-  public void clearLock(String name) throws IOException {
-    _directory.clearLock(name);
-  }
-
-  public LockFactory getLockFactory() {
-    return _directory.getLockFactory();
-  }
-
-  public String getLockID() {
-    return _directory.getLockID();
-  }
-
-  public Lock makeLock(String name) {
-    return _directory.makeLock(name);
-  }
-
-  public void setLockFactory(LockFactory lockFactory) throws IOException {
-    _directory.setLockFactory(lockFactory);
-  }
-
-  public void sync(Collection<String> names) throws IOException {
-    _directory.sync(names);
-  }
-
-  @SuppressWarnings("deprecation")
-  public void sync(String name) throws IOException {
-    _directory.sync(name);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/IOUtil.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/IOUtil.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/IOUtil.java
deleted file mode 100644
index a4489d2..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/IOUtil.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.nearinfinity.blur.mapreduce;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class IOUtil {
-
-  public static final String UTF_8 = "UTF-8";
-
-  public static String readString(DataInput input) throws IOException {
-    int length = readVInt(input);
-    byte[] buffer = new byte[length];
-    input.readFully(buffer);
-    return new String(buffer, UTF_8);
-  }
-
-  public static void writeString(DataOutput output, String s) throws IOException {
-    byte[] bs = s.getBytes(UTF_8);
-    writeVInt(output, bs.length);
-    output.write(bs);
-  }
-
-  public static int readVInt(DataInput input) throws IOException {
-    byte b = input.readByte();
-    int i = b & 0x7F;
-    for (int shift = 7; (b & 0x80) != 0; shift += 7) {
-      b = input.readByte();
-      i |= (b & 0x7F) << shift;
-    }
-    return i;
-  }
-
-  public static void writeVInt(DataOutput output, int i) throws IOException {
-    while ((i & ~0x7F) != 0) {
-      output.writeByte((byte) ((i & 0x7f) | 0x80));
-      i >>>= 7;
-    }
-    output.writeByte((byte) i);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/ProgressableDirectory.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/ProgressableDirectory.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/ProgressableDirectory.java
deleted file mode 100644
index 40c1344..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/ProgressableDirectory.java
+++ /dev/null
@@ -1,273 +0,0 @@
-package com.nearinfinity.blur.mapreduce;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.hadoop.util.Progressable;
-import org.apache.lucene.store.BufferedIndexInput;
-import org.apache.lucene.store.DataInput;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.Lock;
-import org.apache.lucene.store.LockFactory;
-
-public class ProgressableDirectory extends Directory {
-
-  private Directory _directory;
-  private Progressable _progressable;
-
-  public ProgressableDirectory(Directory directory, Progressable progressable) {
-    _directory = directory;
-    _progressable = progressable;
-  }
-
-  public void clearLock(String name) throws IOException {
-    _directory.clearLock(name);
-  }
-
-  public void close() throws IOException {
-    _directory.close();
-  }
-
-  public void copy(Directory to, String src, String dest) throws IOException {
-    _directory.copy(to, src, dest);
-  }
-
-  private IndexInput wrapInput(String name, IndexInput openInput) {
-    return new ProgressableIndexInput(name, openInput, 16384, _progressable);
-  }
-
-  private IndexOutput wrapOutput(IndexOutput createOutput) {
-    return new ProgressableIndexOutput(createOutput, _progressable);
-  }
-
-  public IndexOutput createOutput(String name) throws IOException {
-    return wrapOutput(_directory.createOutput(name));
-  }
-
-  public void deleteFile(String name) throws IOException {
-    _directory.deleteFile(name);
-  }
-
-  public boolean equals(Object obj) {
-    return _directory.equals(obj);
-  }
-
-  public boolean fileExists(String name) throws IOException {
-    return _directory.fileExists(name);
-  }
-
-  public long fileLength(String name) throws IOException {
-    return _directory.fileLength(name);
-  }
-
-  @SuppressWarnings("deprecation")
-  public long fileModified(String name) throws IOException {
-    return _directory.fileModified(name);
-  }
-
-  public LockFactory getLockFactory() {
-    return _directory.getLockFactory();
-  }
-
-  public String getLockID() {
-    return _directory.getLockID();
-  }
-
-  public int hashCode() {
-    return _directory.hashCode();
-  }
-
-  public String[] listAll() throws IOException {
-    return _directory.listAll();
-  }
-
-  public Lock makeLock(String name) {
-    return _directory.makeLock(name);
-  }
-
-  public IndexInput openInput(String name, int bufferSize) throws IOException {
-    return wrapInput(name, _directory.openInput(name, bufferSize));
-  }
-
-  public IndexInput openInput(String name) throws IOException {
-    return wrapInput(name, _directory.openInput(name));
-  }
-
-  public void setLockFactory(LockFactory lockFactory) throws IOException {
-    _directory.setLockFactory(lockFactory);
-  }
-
-  public void sync(Collection<String> names) throws IOException {
-    _directory.sync(names);
-  }
-
-  @SuppressWarnings("deprecation")
-  public void sync(String name) throws IOException {
-    _directory.sync(name);
-  }
-
-  public String toString() {
-    return _directory.toString();
-  }
-
-  @SuppressWarnings("deprecation")
-  public void touchFile(String name) throws IOException {
-    _directory.touchFile(name);
-  }
-
-  static class ProgressableIndexOutput extends IndexOutput {
-
-    private Progressable _progressable;
-    private IndexOutput _indexOutput;
-
-    public ProgressableIndexOutput(IndexOutput indexOutput, Progressable progressable) {
-      _indexOutput = indexOutput;
-      _progressable = progressable;
-    }
-
-    public void close() throws IOException {
-      _indexOutput.close();
-      _progressable.progress();
-    }
-
-    public void copyBytes(DataInput input, long numBytes) throws IOException {
-      _indexOutput.copyBytes(input, numBytes);
-      _progressable.progress();
-    }
-
-    public void flush() throws IOException {
-      _indexOutput.flush();
-      _progressable.progress();
-    }
-
-    public long getFilePointer() {
-      return _indexOutput.getFilePointer();
-    }
-
-    public long length() throws IOException {
-      return _indexOutput.length();
-    }
-
-    public void seek(long pos) throws IOException {
-      _indexOutput.seek(pos);
-      _progressable.progress();
-    }
-
-    public void setLength(long length) throws IOException {
-      _indexOutput.setLength(length);
-      _progressable.progress();
-    }
-
-    public String toString() {
-      return _indexOutput.toString();
-    }
-
-    public void writeByte(byte b) throws IOException {
-      _indexOutput.writeByte(b);
-    }
-
-    public void writeBytes(byte[] b, int offset, int length) throws IOException {
-      _indexOutput.writeBytes(b, offset, length);
-      _progressable.progress();
-    }
-
-    public void writeBytes(byte[] b, int length) throws IOException {
-      _indexOutput.writeBytes(b, length);
-      _progressable.progress();
-    }
-
-    @SuppressWarnings("deprecation")
-    public void writeChars(char[] s, int start, int length) throws IOException {
-      _indexOutput.writeChars(s, start, length);
-      _progressable.progress();
-    }
-
-    @SuppressWarnings("deprecation")
-    public void writeChars(String s, int start, int length) throws IOException {
-      _indexOutput.writeChars(s, start, length);
-      _progressable.progress();
-    }
-
-    public void writeInt(int i) throws IOException {
-      _indexOutput.writeInt(i);
-    }
-
-    public void writeLong(long i) throws IOException {
-      _indexOutput.writeLong(i);
-    }
-
-    public void writeString(String s) throws IOException {
-      _indexOutput.writeString(s);
-    }
-
-    public void writeStringStringMap(Map<String, String> map) throws IOException {
-      _indexOutput.writeStringStringMap(map);
-    }
-
-  }
-
-  static class ProgressableIndexInput extends BufferedIndexInput {
-
-    private IndexInput _indexInput;
-    private final long _length;
-    private Progressable _progressable;
-
-    ProgressableIndexInput(String name, IndexInput indexInput, int buffer, Progressable progressable) {
-      super(name, buffer);
-      _indexInput = indexInput;
-      _length = indexInput.length();
-      _progressable = progressable;
-    }
-
-    @Override
-    protected void readInternal(byte[] b, int offset, int length) throws IOException {
-      long filePointer = getFilePointer();
-      if (filePointer != _indexInput.getFilePointer()) {
-        _indexInput.seek(filePointer);
-      }
-      _indexInput.readBytes(b, offset, length);
-      _progressable.progress();
-    }
-
-    @Override
-    protected void seekInternal(long pos) throws IOException {
-
-    }
-
-    @Override
-    public void close() throws IOException {
-      _indexInput.close();
-    }
-
-    @Override
-    public long length() {
-      return _length;
-    }
-
-    @Override
-    public Object clone() {
-      ProgressableIndexInput clone = (ProgressableIndexInput) super.clone();
-      clone._indexInput = (IndexInput) _indexInput.clone();
-      return clone;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/33df9310/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/SpinLock.java
----------------------------------------------------------------------
diff --git a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/SpinLock.java b/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/SpinLock.java
deleted file mode 100644
index 7fdc317..0000000
--- a/src/blur-mapred/src/main/java/com/nearinfinity/blur/mapreduce/SpinLock.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.nearinfinity.blur.mapreduce;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.mapreduce.Reducer.Context;
-import org.apache.hadoop.util.Progressable;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import com.nearinfinity.blur.log.Log;
-import com.nearinfinity.blur.log.LogFactory;
-
-public class SpinLock {
-
-  private static final Log LOG = LogFactory.getLog(SpinLock.class);
-  private ZooKeeper _zooKeeper;
-  private String _path;
-  private int _maxCopies;
-  private String _name;
-  private long _delay = TimeUnit.SECONDS.toMillis(30);
-  private Progressable _progressable;
-
-  public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
-    Progressable progressable = new Progressable() {
-      @Override
-      public void progress() {
-        System.out.println("go");
-      }
-    };
-    String zkConnectionStr = "localhost";
-    SpinLock lock = new SpinLock(progressable, zkConnectionStr, "test", "/test-spin-lock");
-    lock.copyLock(null);
-  }
-
-  public SpinLock(Progressable progressable, String zkConnectionStr, String name, String path) throws IOException, KeeperException, InterruptedException {
-    _path = path;
-    _name = name;
-    _progressable = progressable;
-    _zooKeeper = new ZooKeeper(zkConnectionStr, 60000, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-
-      }
-    });
-    checkMaxCopies();
-  }
-
-  private void checkMaxCopies() throws KeeperException, InterruptedException {
-    Stat stat = _zooKeeper.exists(_path, false);
-    if (stat == null) {
-      LOG.warn("Path [{0}] not set no limit on copies.", _path);
-      _maxCopies = Integer.MAX_VALUE;
-    } else {
-      byte[] data = _zooKeeper.getData(_path, false, stat);
-      _maxCopies = Integer.parseInt(new String(data));
-    }
-  }
-
-  public void copyLock(@SuppressWarnings("rawtypes") Context context) {
-    if (_maxCopies == Integer.MAX_VALUE) {
-      return;
-    }
-    try {
-      String newpath = _zooKeeper.create(_path + "/" + _name, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-      while (true) {
-        _progressable.progress();
-        checkMaxCopies();
-        List<String> children = new ArrayList<String>(_zooKeeper.getChildren(_path, false));
-        Collections.sort(children);
-        for (int i = 0; i < _maxCopies && i < children.size(); i++) {
-          if (newpath.equals(_path + "/" + children.get(i))) {
-            return;
-          }
-        }
-        LOG.info("Waiting for copy lock");
-        context.setStatus("Waiting for copy lock");
-        Thread.sleep(_delay);
-      }
-    } catch (KeeperException e) {
-      throw new RuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-
-  }
-
-}


Mime
View raw message