incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: Code cleanup and more tests.
Date Tue, 07 Jan 2014 15:01:05 GMT
Code cleanup and more tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/01b4c760
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/01b4c760
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/01b4c760

Branch: refs/heads/apache-blur-0.2
Commit: 01b4c7606775674bad2961d2a491a4ed808de9ef
Parents: f30b1be
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Jan 7 09:59:06 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Jan 7 10:00:56 2014 -0500

----------------------------------------------------------------------
 .../apache/blur/manager/writer/BlurIndex.java   |   4 -
 .../blur/manager/writer/BlurIndexReader.java    | 161 -------
 .../manager/writer/BlurIndexSimpleWriter.java   |  50 +--
 .../blur/manager/writer/BlurNRTIndex.java       | 448 -------------------
 .../blur/manager/writer/MutatableAction.java    |  22 +-
 .../manager/writer/TransactionRecorder.java     | 433 ------------------
 .../org/apache/blur/utils/RowDocumentUtil.java  |  52 ++-
 .../manager/writer/BlurIndexReaderTest.java     | 146 ------
 .../writer/BlurIndexSimpleWriterTest.java       | 124 +++--
 .../blur/manager/writer/BlurNRTIndexTest.java   | 245 ----------
 .../manager/writer/TransactionRecorderTest.java | 185 --------
 .../org/apache/blur/mapreduce/BlurReducer.java  |   8 +-
 .../blur/mapreduce/lib/BlurOutputFormat.java    |   5 +-
 .../blur/thrift/util/BlurThriftHelper.java      |  13 +
 14 files changed, 175 insertions(+), 1721 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
index 8e52dd3..5b6310f 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -48,10 +48,6 @@ public abstract class BlurIndex {
     _shardContext = shardContext;
   }
 
-  public abstract void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException;
-
-  public abstract void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException;
-
   public abstract IndexSearcherClosable getIndexSearcher() throws IOException;
 
   public abstract void close() throws IOException;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
deleted file mode 100644
index c852e75..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReader.java
+++ /dev/null
@@ -1,161 +0,0 @@
-package org.apache.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.lucene.codec.Blur022Codec;
-import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.lucene.warmup.TraceableDirectory;
-import org.apache.blur.manager.indexserver.BlurIndexWarmup;
-import org.apache.blur.server.IndexSearcherClosable;
-import org.apache.blur.server.ShardContext;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.lucene.analysis.core.KeywordAnalyzer;
-import org.apache.lucene.index.BlurIndexWriter;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-
-public class BlurIndexReader extends BlurIndex {
-
-  private static final Log LOG = LogFactory.getLog(BlurIndexReader.class);
-
-  private BlurIndexCloser _closer;
-  private Directory _directory;
-  private AtomicReference<DirectoryReader> _indexReaderRef = new AtomicReference<DirectoryReader>();
-  private AtomicBoolean _isClosed = new AtomicBoolean(false);
-  private AtomicBoolean _open = new AtomicBoolean();
-  private BlurIndexRefresher _refresher;
-  private final TableContext _tableContext;
-  private final ShardContext _shardContext;
-
-  public BlurIndexReader(ShardContext shardContext, Directory directory, SharedMergeScheduler mergeScheduler,
-      DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, BlurIndexCloser indexCloser,
-      BlurIndexRefresher refresher, BlurIndexWarmup indexWarmup) throws IOException {
-    super(shardContext, directory, mergeScheduler, gc, searchExecutor, indexCloser, refresher, indexWarmup);
-    _tableContext = shardContext.getTableContext();
-    // This directory allows for warm up by adding tracing ability.
-    _directory = new TraceableDirectory(directory);
-    _shardContext = shardContext;
-    _refresher = refresher;
-    _closer = indexCloser;
-
-    _open.set(true);
-
-    if (!DirectoryReader.indexExists(directory)) {
-      LOG.info("Creating an empty index");
-      // if the directory is empty then create an empty index.
-      IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, new KeywordAnalyzer());
-      conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
-      conf.setCodec(new Blur022Codec());
-      new BlurIndexWriter(directory, conf).close();
-    }
-    _indexReaderRef.set(DirectoryReader.open(directory));
-    _refresher.register(this);
-  }
-
-  @Override
-  public void refresh() throws IOException {
-    if (!_open.get()) {
-      return;
-    }
-    DirectoryReader oldReader = _indexReaderRef.get();
-    DirectoryReader reader = DirectoryReader.openIfChanged(oldReader);
-    if (reader != null) {
-      _indexReaderRef.set(reader);
-      _closer.close(oldReader);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    _open.set(false);
-    _refresher.unregister(this);
-    _directory.close();
-    _isClosed.set(true);
-    LOG.info("Reader for table [{0}] shard [{1}] closed.", _tableContext.getTable(), _shardContext.getShard());
-  }
-
-  @Override
-  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
-  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
-  public void optimize(int numberOfSegmentsPerShard) throws IOException {
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
-  public IndexSearcherClosable getIndexSearcher() throws IOException {
-    final DirectoryReader reader = _indexReaderRef.get();
-    reader.incRef();
-    return new IndexSearcherClosable(reader, null) {
-
-      @Override
-      public Directory getDirectory() {
-        return _directory;
-      }
-
-      @Override
-      public void close() throws IOException {
-        reader.decRef();
-      }
-    };
-  }
-
-  @Override
-  public AtomicBoolean isClosed() {
-    return _isClosed;
-  }
-
-  @Override
-  public void createSnapshot(String name) throws IOException {
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
-  public void removeSnapshot(String name) throws IOException {
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
-  public List<String> getSnapshots() throws IOException {
-    throw new RuntimeException("Read-only shard");
-  }
-
-  @Override
-  public void process(MutatableAction mutatableAction) {
-    throw new RuntimeException("Read-only shard");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index 7fccd46..6548522 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -40,12 +40,10 @@ import org.apache.blur.manager.indexserver.BlurIndexWarmup;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.generated.Row;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.document.Field;
 import org.apache.lucene.index.BlurIndexWriter;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
@@ -62,7 +60,6 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private final AtomicReference<DirectoryReader> _indexReader = new AtomicReference<DirectoryReader>();
   private final ExecutorService _searchThreadPool;
   private final Directory _directory;
-  private final Thread _writerOpener;
   private final IndexWriterConfig _conf;
   private final TableContext _tableContext;
   private final FieldManager _fieldManager;
@@ -73,6 +70,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private final ReadWriteLock _lock = new ReentrantReadWriteLock();
   private final Lock _writeLock = _lock.writeLock();
   private Thread _optimizeThread;
+  private Thread _writerOpener;
 
   public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler mergeScheduler,
       DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, BlurIndexCloser indexCloser,
@@ -103,7 +101,21 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     _indexCloser = indexCloser;
     _indexReader.set(wrap(DirectoryReader.open(_directory)));
 
-    _writerOpener = getWriterOpener(shardContext);
+    openWriter();
+  }
+
+  private synchronized void openWriter() {
+    BlurIndexWriter writer = _writer.get();
+    if (writer != null) {
+      try {
+        writer.close(false);
+      } catch (IOException e) {
+        LOG.error("Unknown error while trying to close the writer, [" + _shardContext.getTableContext().getTable()
+            + "] Shard [" + _shardContext.getShard() + "]", e);
+      }
+      _writer.set(null);
+    }
+    _writerOpener = getWriterOpener(_shardContext);
     _writerOpener.start();
   }
 
@@ -154,35 +166,6 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     };
   }
 
-  @Override
-  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
-    _writeLock.lock();
-    Tracer trace = Trace.trace("replaceRow");
-    try {
-      waitUntilNotNull(_writer);
-      BlurIndexWriter writer = _writer.get();
-      List<List<Field>> docs = TransactionRecorder.getDocs(row, _fieldManager);
-      writer.updateDocuments(TransactionRecorder.createRowId(row.getId()), docs);
-      commit();
-    } finally {
-      trace.done();
-      _writeLock.unlock();
-    }
-  }
-
-  @Override
-  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
-    _writeLock.lock();
-    try {
-      waitUntilNotNull(_writer);
-      BlurIndexWriter writer = _writer.get();
-      writer.deleteDocuments(TransactionRecorder.createRowId(rowId));
-      commit();
-    } finally {
-      _writeLock.unlock();
-    }
-  }
-
   private void waitUntilNotNull(AtomicReference<?> ref) {
     while (true) {
       Object object = ref.get();
@@ -289,6 +272,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
       commit();
     } catch (Exception e) {
       writer.rollback();
+      openWriter();
       throw new IOException("Unknown error during mutation", e);
     } finally {
       _writeLock.unlock();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
deleted file mode 100644
index 45215cf..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ /dev/null
@@ -1,448 +0,0 @@
-package org.apache.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.blur.analysis.FieldManager;
-import org.apache.blur.index.ExitableReader;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.lucene.codec.Blur022Codec;
-import org.apache.blur.lucene.store.refcounter.DirectoryReferenceCounter;
-import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.lucene.warmup.TraceableDirectory;
-import org.apache.blur.manager.indexserver.BlurIndexWarmup;
-import org.apache.blur.server.IndexSearcherClosable;
-import org.apache.blur.server.IndexSearcherClosableNRT;
-import org.apache.blur.server.ShardContext;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.blur.utils.SimpleTimer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.BlurIndexWriter;
-import org.apache.lucene.index.BlurIndexWriter.LockOwnerException;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexCommit;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.SnapshotDeletionPolicy;
-import org.apache.lucene.index.TieredMergePolicy;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.NRTManager;
-import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
-import org.apache.lucene.search.NRTManagerReopenThread;
-import org.apache.lucene.search.SearcherFactory;
-import org.apache.lucene.store.Directory;
-
-public class BlurNRTIndex extends BlurIndex {
-
-  private static final Log LOG = LogFactory.getLog(BlurNRTIndex.class);
-  private static final boolean APPLY_ALL_DELETES = true;
-  private static final String SNAPSHOTS_FOLDER_NAME = "snapshots";
-  private static final String SNAPSHOTS_TMPFILE_EXTENSION = ".tmp";
-
-  private final AtomicReference<NRTManager> _nrtManagerRef = new AtomicReference<NRTManager>();
-  private final AtomicBoolean _isClosed = new AtomicBoolean();
-  private final BlurIndexWriter _writer;
-  private final Thread _committer;
-  private final SearcherFactory _searcherFactory;
-  private final Directory _directory;
-  private final NRTManagerReopenThread _refresher;
-  private final TableContext _tableContext;
-  private final ShardContext _shardContext;
-  private final TransactionRecorder _recorder;
-  private final TrackingIndexWriter _trackingWriter;
-  private final IndexImporter _indexImporter;
-  // This lock is used during a import of data from the file system. For example
-  // after a mapreduce program.
-  private final ReadWriteLock _lock = new ReentrantReadWriteLock();
-  private long _lastRefresh = 0;
-
-  public BlurNRTIndex(ShardContext shardContext, Directory directory, SharedMergeScheduler mergeScheduler,
-      DirectoryReferenceFileGC gc, final ExecutorService searchExecutor, BlurIndexCloser indexCloser,
-      BlurIndexRefresher refresher, BlurIndexWarmup indexWarmup) throws IOException {
-    super(shardContext, directory, mergeScheduler, gc, searchExecutor, indexCloser, refresher, indexWarmup);
-    _tableContext = shardContext.getTableContext();
-    _directory = directory;
-    _shardContext = shardContext;
-
-    FieldManager fieldManager = _tableContext.getFieldManager();
-    Analyzer analyzer = fieldManager.getAnalyzerForIndex();
-    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
-    conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
-    conf.setCodec(new Blur022Codec(_tableContext.getBlurConfiguration()));
-    conf.setSimilarity(_tableContext.getSimilarity());
-    conf.setMergedSegmentWarmer(new BlurIndexReaderWarmer(shardContext, _isClosed, indexWarmup));
-
-    SnapshotDeletionPolicy sdp;
-    if (snapshotsDirectoryExists()) {
-      // load existing snapshots
-      sdp = new SnapshotDeletionPolicy(_tableContext.getIndexDeletionPolicy(), loadExistingSnapshots());
-    } else {
-      sdp = new SnapshotDeletionPolicy(_tableContext.getIndexDeletionPolicy());
-    }
-    conf.setIndexDeletionPolicy(sdp);
-
-    TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
-    mergePolicy.setUseCompoundFile(false);
-    conf.setMergeScheduler(mergeScheduler.getMergeScheduler());
-
-    DirectoryReferenceCounter referenceCounter = new DirectoryReferenceCounter(directory, gc);
-    // This directory allows for warm up by adding tracing ability.
-    TraceableDirectory dir = new TraceableDirectory(referenceCounter);
-
-    SimpleTimer simpleTimer = new SimpleTimer();
-    simpleTimer.start("writerOpen");
-    _writer = new BlurIndexWriter(dir, conf, true);
-    simpleTimer.stop("writerOpen");
-    simpleTimer.start("nrtSetup");
-    _recorder = new TransactionRecorder(shardContext);
-    _recorder.replay(_writer);
-
-    _searcherFactory = new SearcherFactory() {
-      @Override
-      public IndexSearcher newSearcher(IndexReader reader) throws IOException {
-        // return new IndexSearcherClosableNRT(reader, searchExecutor,
-        // _nrtManagerRef, _directory);
-        return new IndexSearcherClosableNRT(reader, null, _nrtManagerRef, _directory);
-      }
-    };
-
-    _trackingWriter = new TrackingIndexWriter(_writer);
-    _indexImporter = new IndexImporter(_trackingWriter.getIndexWriter(), _lock, _shardContext, TimeUnit.SECONDS, 10);
-    _nrtManagerRef.set(new NRTManager(_trackingWriter, _searcherFactory, APPLY_ALL_DELETES));
-    // start commiter
-
-    _committer = new Thread(new Committer());
-    _committer.setDaemon(true);
-    _committer.setName("Commit Thread [" + _tableContext.getTable() + "/" + shardContext.getShard() + "]");
-    _committer.start();
-
-    // start refresher
-    double targetMinStaleSec = _tableContext.getTimeBetweenRefreshs() / 1000.0;
-    _refresher = new NRTManagerReopenThread(getNRTManager(), targetMinStaleSec * 10, targetMinStaleSec);
-    _refresher.setName("Refresh Thread [" + _tableContext.getTable() + "/" + shardContext.getShard() + "]");
-    _refresher.setDaemon(true);
-    _refresher.start();
-    simpleTimer.stop("nrtSetup");
-    simpleTimer.log(LOG);
-  }
-
-  /**
-   * The snapshots directory contains a file per snapshot. Name of the file is
-   * the snapshot name and it stores the segments filename
-   * 
-   * @return Map<String, String>
-   * @throws IOException
-   */
-  private Map<String, String> loadExistingSnapshots() throws IOException {
-    Map<String, String> snapshots = new HashMap<String, String>();
-
-    FileSystem fileSystem = getFileSystem();
-    FileStatus[] status = fileSystem.listStatus(getSnapshotsDirectoryPath());
-
-    for (int i = 0; i < status.length; i++) {
-      FileStatus fileStatus = status[i];
-      String snapshotName = fileStatus.getPath().getName();
-      // cleanup all tmp files
-      if (snapshotName.endsWith(SNAPSHOTS_TMPFILE_EXTENSION)) {
-        fileSystem.delete(fileStatus.getPath(), true);
-        continue;
-      }
-      BufferedReader br = new BufferedReader(new InputStreamReader(fileSystem.open(fileStatus.getPath())));
-      String segmentsFilename = br.readLine();
-      if (segmentsFilename != null) {
-        snapshots.put(snapshotName, segmentsFilename);
-      }
-    }
-    return snapshots;
-  }
-
-  private boolean snapshotsDirectoryExists() throws IOException {
-    Path shardHdfsDirPath = _shardContext.getHdfsDirPath();
-    FileSystem fileSystem = getFileSystem();
-    Path shardSnapshotsDirPath = new Path(shardHdfsDirPath, SNAPSHOTS_FOLDER_NAME);
-    if (fileSystem.exists(shardSnapshotsDirPath)) {
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public void replaceRow(boolean waitToBeVisible, boolean wal, Row row) throws IOException {
-    _lock.readLock().lock();
-    try {
-      List<Record> records = row.records;
-      if (records == null || records.isEmpty()) {
-        deleteRow(waitToBeVisible, wal, row.id);
-        return;
-      }
-      long generation = _recorder.replaceRow(wal, row, _trackingWriter);
-      waitToBeVisible(waitToBeVisible, generation);
-    } finally {
-      _lock.readLock().unlock();
-    }
-  }
-
-  @Override
-  public void deleteRow(boolean waitToBeVisible, boolean wal, String rowId) throws IOException {
-    _lock.readLock().lock();
-    try {
-      long generation = _recorder.deleteRow(wal, rowId, _trackingWriter);
-      waitToBeVisible(waitToBeVisible, generation);
-    } finally {
-      _lock.readLock().unlock();
-    }
-  }
-
-  /**
-   * The method fetches a reference to the IndexSearcher, the caller is
-   * responsible for calling close on the searcher.
-   */
-  @Override
-  public IndexSearcherClosable getIndexSearcher() throws IOException {
-    return resetRunning((IndexSearcherClosable) getNRTManager().acquire());
-  }
-
-  private IndexSearcherClosable resetRunning(IndexSearcherClosable indexSearcherClosable) {
-    IndexReader indexReader = indexSearcherClosable.getIndexReader();
-    if (indexReader instanceof ExitableReader) {
-      ExitableReader er = (ExitableReader) indexReader;
-      er.getRunning().set(true);
-    }
-    indexSearcherClosable.setSimilarity(_tableContext.getSimilarity());
-    return indexSearcherClosable;
-  }
-
-  private NRTManager getNRTManager() {
-    return _nrtManagerRef.get();
-  }
-
-  @Override
-  public void close() throws IOException {
-    // @TODO make sure that locks are cleaned up.
-    if (!_isClosed.get()) {
-      _isClosed.set(true);
-      _indexImporter.close();
-      _committer.interrupt();
-      _refresher.close();
-      try {
-        _recorder.close();
-        _writer.close(false);
-        getNRTManager().close();
-      } finally {
-        _directory.close();
-      }
-    }
-  }
-
-  @Override
-  public void refresh() throws IOException {
-    getNRTManager().maybeRefresh();
-    _lastRefresh = System.currentTimeMillis();
-  }
-
-  @Override
-  public AtomicBoolean isClosed() {
-    return _isClosed;
-  }
-
-  @Override
-  public void optimize(int numberOfSegmentsPerShard) throws IOException {
-    _writer.forceMerge(numberOfSegmentsPerShard);
-  }
-
-  private void waitToBeVisible(boolean waitToBeVisible, long generation) throws IOException {
-    if (needsRefresh()) {
-      refresh();
-    }
-    if (waitToBeVisible && getNRTManager().getCurrentSearchingGen() < generation) {
-      getNRTManager().waitForGeneration(generation);
-    }
-  }
-
-  private boolean needsRefresh() {
-    if (_lastRefresh + _tableContext.getTimeBetweenRefreshs() < System.currentTimeMillis()) {
-      return true;
-    }
-    return false;
-  }
-
-  class Committer implements Runnable {
-    @Override
-    public void run() {
-      synchronized (this) {
-        while (!_isClosed.get()) {
-          try {
-            LOG.debug("Committing of [{0}/{1}].", _tableContext.getTable(), _shardContext.getShard());
-            _recorder.commit(_writer);
-          } catch (CorruptIndexException e) {
-            LOG.error("Curruption Error during commit of [{0}/{1}].", e, _tableContext.getTable(),
-                _shardContext.getShard());
-          } catch (LockOwnerException e) {
-            LOG.info("This shard server no longer owns the lock on [{0}/{1}], closing.", _tableContext.getTable(),
-                _shardContext.getShard());
-            try {
-              close();
-            } catch (IOException ex) {
-              LOG.error("Unknown error while trying to close [{0}/{1}]", _tableContext.getTable(),
-                  _shardContext.getShard());
-            }
-            return;
-          } catch (IOException e) {
-            LOG.error("IO Error during commit of [{0}/{1}].", e, _tableContext.getTable(), _shardContext.getShard());
-          }
-          try {
-            wait(_tableContext.getTimeBetweenCommits());
-          } catch (InterruptedException e) {
-            if (_isClosed.get()) {
-              return;
-            }
-            LOG.error("Unknown error with committer thread [{0}/{1}].", e, _tableContext.getTable(),
-                _shardContext.getShard());
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void createSnapshot(String name) throws IOException {
-    SnapshotDeletionPolicy snapshotter = getSnapshotter();
-    Map<String, String> existingSnapshots = snapshotter.getSnapshots();
-    if (existingSnapshots.containsKey(name)) {
-      LOG.error("A Snapshot already exists with the same name [{0}] on [{1}/{2}].", name, _tableContext.getTable(),
-          _shardContext.getShard());
-      throw new IOException("A Snapshot already exists with the same name [" + name + "] on " + "["
-          + _tableContext.getTable() + "/" + _shardContext.getShard() + "].");
-    }
-    _writer.commit();
-    IndexCommit indexCommit = snapshotter.snapshot(name);
-
-    /*
-     * Persist the snapshots info into a tmp file under the snapshots sub-folder
-     * and once writing is finished, close the writer. Now rename the tmp file
-     * to an actual snapshots file. This make the file write an atomic operation
-     * 
-     * The name of the file is the snapshot name and its contents specify the
-     * segments file name
-     */
-    String segmentsFilename = indexCommit.getSegmentsFileName();
-    FileSystem fileSystem = getFileSystem();
-    Path shardSnapshotsDirPath = getSnapshotsDirectoryPath();
-    BlurUtil.createPath(fileSystem, shardSnapshotsDirPath);
-    Path newTmpSnapshotFile = new Path(shardSnapshotsDirPath, name + SNAPSHOTS_TMPFILE_EXTENSION);
-    BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fileSystem.create(newTmpSnapshotFile, true)));
-    br.write(segmentsFilename);
-    br.close();
-
-    // now rename the tmp file
-    Path newSnapshotFile = new Path(shardSnapshotsDirPath, name);
-    fileSystem.rename(newTmpSnapshotFile, newSnapshotFile);
-
-    LOG.info("Snapshot [{0}] created successfully on [{1}/{2}].", name, _tableContext.getTable(),
-        _shardContext.getShard());
-  }
-
-  @Override
-  public void removeSnapshot(String name) throws IOException {
-    SnapshotDeletionPolicy snapshotter = getSnapshotter();
-    Map<String, String> existingSnapshots = snapshotter.getSnapshots();
-    if (existingSnapshots.containsKey(name)) {
-      snapshotter.release(name);
-
-      // now delete the snapshot file stored in the snapshots directory under
-      // the shard
-      Path snapshotFilePath = new Path(getSnapshotsDirectoryPath(), name);
-      getFileSystem().delete(snapshotFilePath, true);
-
-      LOG.info("Snapshot [{0}] removed successfully from [{1}/{2}].", name, _tableContext.getTable(),
-          _shardContext.getShard());
-    } else {
-      LOG.error("No Snapshot exists with the name [{0}] on  [{1}/{2}].", name, _tableContext.getTable(),
-          _shardContext.getShard());
-      throw new IOException("No Snapshot exists with the name [" + name + "] on " + "[" + _tableContext.getTable()
-          + "/" + _shardContext.getShard() + "].");
-    }
-  }
-
-  @Override
-  public List<String> getSnapshots() throws IOException {
-    SnapshotDeletionPolicy snapshotter = getSnapshotter();
-    Map<String, String> existingSnapshots = snapshotter.getSnapshots();
-    return new ArrayList<String>(existingSnapshots.keySet());
-  }
-
-  /**
-   * Fetches the snapshotter from the LiveIndexWriterConfig of IndexWriter
-   * 
-   * @return SnapshotDeletionPolicy
-   * @throws IOException
-   */
-  private SnapshotDeletionPolicy getSnapshotter() throws IOException {
-    IndexDeletionPolicy idp = _writer.getConfig().getIndexDeletionPolicy();
-    if (idp instanceof SnapshotDeletionPolicy) {
-      SnapshotDeletionPolicy snapshotter = (SnapshotDeletionPolicy) idp;
-      return snapshotter;
-    } else {
-      LOG.error("The index deletion policy for [{0}/{1}] does not support snapshots.", _tableContext.getTable(),
-          _shardContext.getShard());
-      throw new IOException("The index deletion policy for [" + _tableContext.getTable() + "/"
-          + _shardContext.getShard() + "]" + " does not support snapshots.");
-    }
-  }
-
-  public Path getSnapshotsDirectoryPath() throws IOException {
-    Path shardHdfsDirPath = _shardContext.getHdfsDirPath();
-    return new Path(shardHdfsDirPath, SNAPSHOTS_FOLDER_NAME);
-  }
-
-  private FileSystem getFileSystem() throws IOException {
-    Path shardHdfsDirPath = _shardContext.getHdfsDirPath();
-    Configuration configuration = _shardContext.getTableContext().getConfiguration();
-    return shardHdfsDirPath.getFileSystem(configuration);
-  }
-
-  @Override
-  public void process(MutatableAction mutatableAction) {
-    throw new RuntimeException("Not supported.");
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
index 3bc8c29..b8dc858 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/MutatableAction.java
@@ -85,7 +85,7 @@ public class MutatableAction {
     _actions.add(new InternalAction() {
       @Override
       void performAction(IndexReader reader, IndexWriter writer) throws IOException {
-        List<List<Field>> docs = TransactionRecorder.getDocs(row, _fieldManager);
+        List<List<Field>> docs = RowDocumentUtil.getDocs(row, _fieldManager);
         Term rowId = createRowId(row.getId());
         writer.updateDocuments(rowId, docs);
       }
@@ -128,7 +128,7 @@ public class MutatableAction {
               writer.deleteDocuments(rowIdTerm);
             } else {
               Row row = new Row(rowId, toRecords(docs), docs.size());
-              List<List<Field>> docsToUpdate = TransactionRecorder.getDocs(row, _fieldManager);
+              List<List<Field>> docsToUpdate = RowDocumentUtil.getDocs(row, _fieldManager);
               writer.updateDocuments(rowIdTerm, docsToUpdate);
             }
           }
@@ -152,7 +152,7 @@ public class MutatableAction {
         TopDocs topDocs = searcher.search(query, 1);
         if (topDocs.totalHits == 0) {
           // just add
-          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, record);
+          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, record);
           doc.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
           writer.addDocument(doc);
         } else if (topDocs.totalHits == 1) {
@@ -163,7 +163,7 @@ public class MutatableAction {
           ResetableDocumentStoredFieldVisitor fieldVisitor = IndexManager.getFieldSelector(selector);
           List<Document> docs = new ArrayList<Document>(BlurUtil.fetchDocuments(reader, fieldVisitor, selector,
               _maxHeap, _table + "/" + _shard, _primeDocTerm, null));
-          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, record);
+          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, record);
 
           for (int i = 0; i < docs.size(); i++) {
             Document document = docs.get(i);
@@ -174,7 +174,7 @@ public class MutatableAction {
           }
           docs.add(toDocument(doc));
           Row row = new Row(rowId, toRecords(docs), docs.size());
-          List<List<Field>> docsToUpdate = TransactionRecorder.getDocs(row, _fieldManager);
+          List<List<Field>> docsToUpdate = RowDocumentUtil.getDocs(row, _fieldManager);
           writer.updateDocuments(rowIdTerm, docsToUpdate);
         } else {
           throw new IOException("RowId [" + rowId + "] found more than one row primedoc.");
@@ -206,7 +206,7 @@ public class MutatableAction {
         TopDocs topDocs = searcher.search(query, 1);
         if (topDocs.totalHits == 0) {
           // just add
-          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, record);
+          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, record);
           doc.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
           writer.addDocument(doc);
         } else if (topDocs.totalHits == 1) {
@@ -245,11 +245,11 @@ public class MutatableAction {
             existingRecord.addToColumns(column);
           }
 
-          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, existingRecord);
+          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, existingRecord);
           docs.add(toDocument(doc));
 
           Row row = new Row(rowId, toRecords(docs), docs.size());
-          List<List<Field>> docsToUpdate = TransactionRecorder.getDocs(row, _fieldManager);
+          List<List<Field>> docsToUpdate = RowDocumentUtil.getDocs(row, _fieldManager);
 
           writer.updateDocuments(rowIdTerm, docsToUpdate);
         } else {
@@ -272,7 +272,7 @@ public class MutatableAction {
         TopDocs topDocs = searcher.search(query, 1);
         if (topDocs.totalHits == 0) {
           // just add
-          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, record);
+          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, record);
           doc.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
           writer.addDocument(doc);
         } else if (topDocs.totalHits == 1) {
@@ -337,11 +337,11 @@ public class MutatableAction {
             processedColumns.add(name);
           }
 
-          List<Field> doc = TransactionRecorder.getDoc(_fieldManager, rowId, newRecord);
+          List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, rowId, newRecord);
           docs.add(toDocument(doc));
 
           Row row = new Row(rowId, toRecords(docs), docs.size());
-          List<List<Field>> docsToUpdate = TransactionRecorder.getDocs(row, _fieldManager);
+          List<List<Field>> docsToUpdate = RowDocumentUtil.getDocs(row, _fieldManager);
           writer.updateDocuments(rowIdTerm, docsToUpdate);
         } else {
           throw new IOException("RowId [" + rowId + "] found more than one row primedoc.");

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java b/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
deleted file mode 100644
index c799376..0000000
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/TransactionRecorder.java
+++ /dev/null
@@ -1,433 +0,0 @@
-package org.apache.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import static org.apache.blur.utils.BlurConstants.SEP;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.blur.analysis.FieldManager;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.server.ShardContext;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.blur.utils.BlurConstants;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.record.Utils;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.FieldType;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.BlurIndexWriter;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
-
-public class TransactionRecorder extends TimerTask implements Closeable {
-
-  enum TYPE {
-    DELETE((byte) 0), ROW((byte) 1);
-    private byte b;
-
-    private TYPE(byte b) {
-      this.b = b;
-    }
-
-    public byte value() {
-      return b;
-    }
-
-    public static TYPE lookup(byte b) {
-      switch (b) {
-      case 0:
-        return DELETE;
-      case 1:
-        return ROW;
-      default:
-        throw new RuntimeException("Type not found [" + b + "]");
-      }
-    }
-  }
-
-  private static final Log LOG = LogFactory.getLog(TransactionRecorder.class);
-  private static final FieldType SUPER_FIELD_TYPE;
-  static {
-    SUPER_FIELD_TYPE = new FieldType(TextField.TYPE_NOT_STORED);
-    SUPER_FIELD_TYPE.setOmitNorms(true);
-  }
-  public static FieldType ID_TYPE;
-  static {
-    ID_TYPE = new FieldType();
-    ID_TYPE.setIndexed(true);
-    ID_TYPE.setTokenized(false);
-    ID_TYPE.setOmitNorms(true);
-    ID_TYPE.setStored(true);
-    ID_TYPE.freeze();
-  }
-
-  private final AtomicBoolean _running = new AtomicBoolean(true);
-  private final AtomicReference<FSDataOutputStream> _outputStream = new AtomicReference<FSDataOutputStream>();
-  private final long _timeBetweenSyncsNanos;
-  private final AtomicLong _lastSync = new AtomicLong();
-
-  private final Path _walPath;
-  private final Configuration _configuration;
-  private final FileSystem _fileSystem;
-  private final Timer _timer;
-  private final String _table;
-  private final String _shard;
-  private final FieldManager _fieldManager;
-
-  public TransactionRecorder(ShardContext shardContext) throws IOException {
-    TableContext tableContext = shardContext.getTableContext();
-    _configuration = tableContext.getConfiguration();
-    _fieldManager = tableContext.getFieldManager();
-    _walPath = shardContext.getWalShardPath();
-    _fileSystem = _walPath.getFileSystem(_configuration);
-    _timeBetweenSyncsNanos = tableContext.getTimeBetweenWALSyncsNanos();
-    _timer = new Timer("wal-sync-[" + tableContext.getTable() + "/" + shardContext.getShard() + "]", true);
-    _timer.schedule(this, TimeUnit.NANOSECONDS.toMillis(_timeBetweenSyncsNanos),
-        TimeUnit.NANOSECONDS.toMillis(_timeBetweenSyncsNanos));
-    _table = tableContext.getTable();
-    _shard = shardContext.getShard();
-  }
-
-  public void open() throws IOException {
-    if (_fileSystem.exists(_walPath)) {
-      throw new IOException("WAL path [" + _walPath + "] still exists, replay must have not worked.");
-    } else {
-      _outputStream.set(_fileSystem.create(_walPath));
-    }
-    if (_outputStream == null) {
-      throw new RuntimeException();
-    }
-    _lastSync.set(System.nanoTime());
-  }
-
-  public void replay(BlurIndexWriter writer) throws IOException {
-    if (_fileSystem.exists(_walPath)) {
-      FSDataInputStream inputStream = _fileSystem.open(_walPath);
-      replay(writer, inputStream);
-      inputStream.close();
-      commit(writer);
-    } else {
-      open();
-    }
-  }
-
-  private void replay(BlurIndexWriter writer, DataInputStream inputStream) throws CorruptIndexException, IOException {
-    long updateCount = 0;
-    long deleteCount = 0;
-    byte[] buffer;
-    while ((buffer = readBuffer(inputStream)) != null) {
-      DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(buffer));
-      TYPE lookup = TYPE.lookup(dataInputStream.readByte());
-      switch (lookup) {
-      case ROW:
-        Row row = readRow(dataInputStream);
-        writer.updateDocuments(createRowId(row.id), getDocs(row, _fieldManager));
-        updateCount++;
-        continue;
-      case DELETE:
-        String deleteRowId = readString(dataInputStream);
-        writer.deleteDocuments(createRowId(deleteRowId));
-        deleteCount++;
-        continue;
-      default:
-        LOG.error("Unknown type [{0}]", lookup);
-        throw new IOException("Unknown type [" + lookup + "]");
-      }
-    }
-    LOG.info("Rows reclaimed from the WAL [{0}]", updateCount);
-    LOG.info("Deletes reclaimed from the WAL [{0}]", deleteCount);
-  }
-
-  private byte[] readBuffer(DataInputStream inputStream) {
-    try {
-      int length = inputStream.readInt();
-      byte[] buffer = new byte[length];
-      inputStream.readFully(buffer);
-      return buffer;
-    } catch (ChecksumException e) {
-      LOG.warn("End of WAL file found.");
-      if (LOG.isDebugEnabled()) {
-        LOG.error("End of WAL file found.", e);
-      }
-      return null;
-    } catch (IOException e) {
-      if (e instanceof EOFException) {
-        LOG.warn("End of WAL file found.");
-        if (LOG.isDebugEnabled()) {
-          LOG.error("End of WAL file found.", e);
-        }
-        return null;
-      }
-    }
-    return null;
-  }
-
-  private void rollLog() throws IOException {
-    LOG.debug("Rolling WAL path [" + _walPath + "]");
-    FSDataOutputStream os = _outputStream.get();
-    if (os != null) {
-      os.close();
-    }
-    _fileSystem.delete(_walPath, false);
-    open();
-  }
-
-  public void close() throws IOException {
-    synchronized (_running) {
-      _running.set(false);
-    }
-    _timer.purge();
-    _timer.cancel();
-    _outputStream.get().close();
-  }
-
-  private static void writeRow(DataOutputStream outputStream, Row row) throws IOException {
-    writeString(outputStream, row.id);
-    List<Record> records = row.records;
-    int size = records.size();
-    outputStream.writeInt(size);
-    for (int i = 0; i < size; i++) {
-      Record record = records.get(i);
-      writeRecord(outputStream, record);
-    }
-  }
-
-  private static Row readRow(DataInputStream inputStream) throws IOException {
-    Row row = new Row();
-    row.id = readString(inputStream);
-    int size = inputStream.readInt();
-    for (int i = 0; i < size; i++) {
-      row.addToRecords(readRecord(inputStream));
-    }
-    return row;
-  }
-
-  private static void writeRecord(DataOutputStream outputStream, Record record) throws IOException {
-    writeString(outputStream, record.recordId);
-    writeString(outputStream, record.family);
-    List<Column> columns = record.columns;
-    int size = columns.size();
-    outputStream.writeInt(size);
-    for (int i = 0; i < size; i++) {
-      writeColumn(outputStream, columns.get(i));
-    }
-  }
-
-  private static Record readRecord(DataInputStream inputStream) throws IOException {
-    Record record = new Record();
-    record.recordId = readString(inputStream);
-    record.family = readString(inputStream);
-    int size = inputStream.readInt();
-    for (int i = 0; i < size; i++) {
-      record.addToColumns(readColumn(inputStream));
-    }
-    return record;
-  }
-
-  private static void writeColumn(DataOutputStream outputStream, Column column) throws IOException {
-    writeString(outputStream, column.name);
-    writeString(outputStream, column.value);
-  }
-
-  private static Column readColumn(DataInputStream inputStream) throws IOException {
-    Column column = new Column();
-    column.name = readString(inputStream);
-    column.value = readString(inputStream);
-    return column;
-  }
-
-  private static void writeDelete(DataOutputStream outputStream, String deleteRowId) throws IOException {
-    writeString(outputStream, deleteRowId);
-  }
-
-  private static void writeString(DataOutputStream outputStream, String s) throws IOException {
-    if (s == null) {
-      Utils.writeVInt(outputStream, -1);
-      return;
-    }
-    byte[] bs = s.getBytes();
-    Utils.writeVInt(outputStream, bs.length);
-    outputStream.write(bs);
-  }
-
-  private static String readString(DataInputStream inputStream) throws IOException {
-    int length = Utils.readVInt(inputStream);
-    if (length == -1) {
-      return null;
-    }
-    byte[] buffer = new byte[length];
-    inputStream.readFully(buffer);
-    return new String(buffer);
-  }
-
-  private void sync(byte[] bs) throws IOException {
-    if (bs == null || _outputStream == null) {
-      throw new RuntimeException("bs [" + bs + "] outputStream [" + _outputStream + "]");
-    }
-    synchronized (_running) {
-      FSDataOutputStream os = _outputStream.get();
-      os.writeInt(bs.length);
-      os.write(bs);
-      tryToSync(os);
-    }
-  }
-
-  private void tryToSync() throws IOException {
-    synchronized (_running) {
-      tryToSync(_outputStream.get());
-    }
-  }
-
-  private void tryToSync(FSDataOutputStream os) throws IOException {
-    if (os == null) {
-      return;
-    }
-    long now = System.nanoTime();
-    if (_lastSync.get() + _timeBetweenSyncsNanos < now) {
-      os.sync();
-      _lastSync.set(now);
-    }
-  }
-
-  public long replaceRow(boolean wal, Row row, TrackingIndexWriter writer) throws IOException {
-    if (wal) {
-      synchronized (_running) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream outputStream = new DataOutputStream(baos);
-        outputStream.writeByte(TYPE.ROW.value());
-        writeRow(outputStream, row);
-        outputStream.close();
-        sync(baos.toByteArray());
-      }
-    }
-    Term term = createRowId(row.id);
-    List<List<Field>> docs = getDocs(row, _fieldManager);
-    return writer.updateDocuments(term, docs);
-  }
-
-  public long deleteRow(boolean wal, String rowId, TrackingIndexWriter writer) throws IOException {
-    if (wal) {
-      synchronized (_running) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream outputStream = new DataOutputStream(baos);
-        outputStream.writeByte(TYPE.DELETE.value());
-        writeDelete(outputStream, rowId);
-        outputStream.close();
-        sync(baos.toByteArray());
-      }
-    }
-    return writer.deleteDocuments(createRowId(rowId));
-  }
-
-  public void commit(BlurIndexWriter writer) throws CorruptIndexException, IOException {
-    synchronized (_running) {
-      long s = System.nanoTime();
-      writer.commit();
-      long m = System.nanoTime();
-      LOG.debug("Commit took [{0} ms] for [{1}/{2}]", (m - s) / 1000000.0, _table, _shard);
-      rollLog();
-      long e = System.nanoTime();
-      LOG.debug("Log roller took [{0} ms] for [{1}/{2}]", (e - m) / 1000000.0, _table, _shard);
-    }
-  }
-
-  public static List<List<Field>> getDocs(Row row, FieldManager fieldManager) throws IOException {
-    List<Record> records = row.records;
-    if (records == null) {
-      return null;
-    }
-    int size = records.size();
-    if (size == 0) {
-      return null;
-    }
-    final String rowId = row.id;
-    List<List<Field>> docs = new ArrayList<List<Field>>(size);
-    for (int i = 0; i < size; i++) {
-      Record record = records.get(i);
-      List<Field> fields = getDoc(fieldManager, rowId, record);
-      docs.add(fields);
-    }
-    List<Field> doc = docs.get(0);
-    doc.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
-    return docs;
-  }
-
-  public static List<Field> getDoc(FieldManager fieldManager, final String rowId, Record record) throws IOException {
-    BlurUtil.validateRowIdAndRecord(rowId, record);
-    List<Field> fields = fieldManager.getFields(rowId, record);
-    return fields;
-  }
-
-  public static Term createRowId(String id) {
-    return new Term(BlurConstants.ROW_ID, id);
-  }
-
-  @Override
-  public void run() {
-    try {
-      if (_running.get()) {
-        tryToSync();
-      }
-    } catch (IOException e) {
-      if (_running.get()) {
-        if (e.getMessage().equals("DFSOutputStream is closed")) {
-          LOG.warn("Trying to sync the outputstrema and the stream has been closed.  This is probably a test and the filesystem has been closed.");
-          try {
-            Thread.sleep(TimeUnit.SECONDS.toMillis(5));
-          } catch (InterruptedException ex) {
-            return;
-          }
-        } else {
-          LOG.error("Known error while trying to sync.", e);
-        }
-      }
-    }
-  }
-
-  public static String getFieldName(String columnFamily, String name) {
-    return columnFamily + SEP + name;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java b/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
index 4f09fe2..97d89b0 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/RowDocumentUtil.java
@@ -17,20 +17,35 @@ package org.apache.blur.utils;
  * limitations under the License.
  */
 import static org.apache.blur.utils.BlurConstants.*;
-import static org.apache.blur.utils.BlurConstants.ROW_ID;
-import static org.apache.blur.utils.BlurConstants.SEP;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.blur.analysis.FieldManager;
 import org.apache.blur.thrift.generated.FetchRecordResult;
 import org.apache.blur.thrift.generated.Record;
 import org.apache.blur.thrift.generated.Row;
 import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.index.IndexableField;
-
+import org.apache.lucene.index.Term;
 
 public class RowDocumentUtil {
 
+  public static FieldType ID_TYPE;
+  static {
+    ID_TYPE = new FieldType();
+    ID_TYPE.setIndexed(true);
+    ID_TYPE.setTokenized(false);
+    ID_TYPE.setOmitNorms(true);
+    ID_TYPE.setStored(true);
+    ID_TYPE.freeze();
+  }
+
   public static FetchRecordResult getRecord(Document document) {
     FetchRecordResult result = new FetchRecordResult();
     BlurThriftRecord record = new BlurThriftRecord();
@@ -88,4 +103,35 @@ public class RowDocumentUtil {
     }
     return rowId;
   }
+
+  public static List<List<Field>> getDocs(Row row, FieldManager fieldManager) throws IOException {
+    List<Record> records = row.records;
+    if (records == null) {
+      return null;
+    }
+    int size = records.size();
+    if (size == 0) {
+      return null;
+    }
+    final String rowId = row.id;
+    List<List<Field>> docs = new ArrayList<List<Field>>(size);
+    for (int i = 0; i < size; i++) {
+      Record record = records.get(i);
+      List<Field> fields = getDoc(fieldManager, rowId, record);
+      docs.add(fields);
+    }
+    List<Field> doc = docs.get(0);
+    doc.add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+    return docs;
+  }
+
+  public static List<Field> getDoc(FieldManager fieldManager, final String rowId, Record record) throws IOException {
+    BlurUtil.validateRowIdAndRecord(rowId, record);
+    List<Field> fields = fieldManager.getFields(rowId, record);
+    return fields;
+  }
+
+  public static Term createRowId(String id) {
+    return new Term(BlurConstants.ROW_ID, id);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
deleted file mode 100644
index 91c89eb..0000000
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexReaderTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package org.apache.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.blur.concurrent.Executors;
-import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup;
-import org.apache.blur.server.IndexSearcherClosable;
-import org.apache.blur.server.ShardContext;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.lucene.analysis.core.KeywordAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.BlurIndexWriter;
-import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.FSDirectory;
-import org.apache.lucene.store.LockObtainFailedException;
-import org.apache.lucene.util.Version;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BlurIndexReaderTest {
-
-  private static final File TMPDIR = new File("./target/tmp/BlurIndexReaderTest");
-
-  private ExecutorService service;
-  private File base;
-  private Configuration configuration;
-
-  private DirectoryReferenceFileGC gc;
-  private SharedMergeScheduler mergeScheduler;
-  private BlurIndexReader reader;
-
-  private BlurIndexRefresher refresher;
-  private BlurIndexCloser indexCloser;
-  private FSDirectory directory;
-  private DefaultBlurIndexWarmup indexWarmup;
-
-  @Before
-  public void setup() throws IOException {
-    TableContext.clear();
-    base = new File(TMPDIR, "blur-index-reader-test");
-    rm(base);
-    base.mkdirs();
-
-    mergeScheduler = new SharedMergeScheduler(1);
-    gc = new DirectoryReferenceFileGC();
-
-    configuration = new Configuration();
-    service = Executors.newThreadPool("test", 1);
-    indexWarmup = new DefaultBlurIndexWarmup(1000000);
-
-  }
-
-  private void setupWriter(Configuration configuration, long refresh) throws IOException, URISyntaxException {
-    String tableUri = new File(base, "table-store-" + UUID.randomUUID().toString()).toURI().toString();
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setName("test-table");
-    tableDescriptor.setTableUri(tableUri);
-    tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", Long.toString(refresh));
-    tableDescriptor.putToTableProperties("blur.shard.time.between.commits", Long.toString(1000));
-
-    TableContext tableContext = TableContext.create(tableDescriptor);
-    directory = FSDirectory.open(new File(new URI(tableDescriptor.getTableUri())));
-
-    ShardContext shardContext = ShardContext.create(tableContext, "test-shard");
-    refresher = new BlurIndexRefresher();
-    indexCloser = new BlurIndexCloser();
-    reader = new BlurIndexReader(shardContext, directory, null, null, null, indexCloser, refresher, indexWarmup);
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    reader.close();
-    mergeScheduler.close();
-    gc.close();
-    service.shutdownNow();
-    refresher.close();
-    indexCloser.close();
-    rm(base);
-  }
-
-  private void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Test
-  public void testBlurIndexWriter() throws IOException, InterruptedException, URISyntaxException {
-    setupWriter(configuration, 1);
-    IndexSearcherClosable indexReader1 = reader.getIndexSearcher();
-    doWrite();
-    assertEquals(0, indexReader1.getIndexReader().numDocs());
-    indexReader1.close();
-    reader.refresh();
-    IndexSearcherClosable indexReader2 = reader.getIndexSearcher();
-    assertEquals(1, indexReader2.getIndexReader().numDocs());
-    indexReader2.close();
-  }
-
-  private void doWrite() throws CorruptIndexException, LockObtainFailedException, IOException {
-    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_43, new KeywordAnalyzer());
-    BlurIndexWriter writer = new BlurIndexWriter(directory, conf);
-    writer.addDocument(getDoc());
-    writer.close();
-  }
-
-  private Document getDoc() {
-    return new Document();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
index 37eece8..f0e95e6 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurIndexSimpleWriterTest.java
@@ -17,7 +17,7 @@ package org.apache.blur.manager.writer;
  * limitations under the License.
  */
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -49,44 +49,45 @@ import org.junit.Test;
 
 public class BlurIndexSimpleWriterTest {
 
+  private static final String TEST_TABLE = "test-table";
   private static final int TEST_NUMBER_WAIT_VISIBLE = 500;
   private static final int TEST_NUMBER = 50000;
 
   private static final File TMPDIR = new File("./target/tmp");
 
-  private BlurIndexSimpleWriter writer;
+  private BlurIndexSimpleWriter _writer;
   private Random random = new Random();
-  private ExecutorService service;
-  private File base;
-  private Configuration configuration;
+  private ExecutorService _service;
+  private File _base;
+  private Configuration _configuration;
 
-  private DirectoryReferenceFileGC gc;
-  private SharedMergeScheduler mergeScheduler;
+  private DirectoryReferenceFileGC _gc;
+  private SharedMergeScheduler _mergeScheduler;
   private String uuid;
   private BlurIndexRefresher _refresher;
   private BlurIndexCloser _closer;
-  private DefaultBlurIndexWarmup indexWarmup;
+  private DefaultBlurIndexWarmup _indexWarmup;
 
   @Before
   public void setup() throws IOException {
     TableContext.clear();
-    base = new File(TMPDIR, "blur-index-writer-test");
-    rm(base);
-    base.mkdirs();
+    _base = new File(TMPDIR, "blur-index-writer-test");
+    rmr(_base);
+    _base.mkdirs();
 
-    mergeScheduler = new SharedMergeScheduler(1);
-    gc = new DirectoryReferenceFileGC();
+    _mergeScheduler = new SharedMergeScheduler(1);
+    _gc = new DirectoryReferenceFileGC();
 
-    configuration = new Configuration();
-    service = Executors.newThreadPool("test", 10);
+    _configuration = new Configuration();
+    _service = Executors.newThreadPool("test", 10);
     _refresher = new BlurIndexRefresher();
     _closer = new BlurIndexCloser();
-    indexWarmup = new DefaultBlurIndexWarmup(1000000);
+    _indexWarmup = new DefaultBlurIndexWarmup(1000000);
   }
 
   private void setupWriter(Configuration configuration, long refresh, boolean reload) throws IOException {
     TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setName("test-table");
+    tableDescriptor.setName(TEST_TABLE);
     /*
      * if reload is set to true...we create a new writer instance pointing to
      * the same location as the old one..... so previous writer instances should
@@ -97,43 +98,77 @@ public class BlurIndexSimpleWriterTest {
       uuid = UUID.randomUUID().toString();
     }
 
-    tableDescriptor.setTableUri(new File(base, "table-store-" + uuid).toURI().toString());
-    tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", Long.toString(refresh));
-
+    tableDescriptor.setTableUri(new File(_base, "table-store-" + uuid).toURI().toString());
     TableContext tableContext = TableContext.create(tableDescriptor);
-    File path = new File(base, "index_" + uuid);
+    File path = new File(_base, "index_" + uuid);
     path.mkdirs();
     FSDirectory directory = FSDirectory.open(path);
     ShardContext shardContext = ShardContext.create(tableContext, "test-shard-" + uuid);
-    writer = new BlurIndexSimpleWriter(shardContext, directory, mergeScheduler, gc, service, _closer, _refresher,
-        indexWarmup);
+    _writer = new BlurIndexSimpleWriter(shardContext, directory, _mergeScheduler, _gc, _service, _closer, _refresher,
+        _indexWarmup);
   }
 
   @After
   public void tearDown() throws IOException {
     _refresher.close();
-    writer.close();
-    mergeScheduler.close();
-    gc.close();
-    service.shutdownNow();
-    rm(base);
+    _writer.close();
+    _mergeScheduler.close();
+    _gc.close();
+    _service.shutdownNow();
+    rmr(_base);
   }
 
-  private void rm(File file) {
+  private void rmr(File file) {
     if (!file.exists()) {
       return;
     }
     if (file.isDirectory()) {
       for (File f : file.listFiles()) {
-        rm(f);
+        rmr(f);
       }
     }
     file.delete();
   }
 
   @Test
+  public void testRollbackAndReopen() throws IOException {
+    setupWriter(_configuration, 5, false);
+    {
+      IndexSearcherClosable searcher = _writer.getIndexSearcher();
+      IndexReader reader = searcher.getIndexReader();
+      assertEquals(0, reader.numDocs());
+      searcher.close();
+    }
+
+    MutatableAction action = new MutatableAction(_writer.getShardContext());
+    action.replaceRow(new Row());
+    try {
+      _writer.process(action);
+      fail("should throw exception");
+    } catch (IOException e) {
+      // do nothing
+    }
+    {
+      IndexSearcherClosable searcher = _writer.getIndexSearcher();
+      IndexReader reader = searcher.getIndexReader();
+      assertEquals(0, reader.numDocs());
+      searcher.close();
+    }
+
+    action.replaceRow(genRow());
+    _writer.process(action);
+
+    {
+      IndexSearcherClosable searcher = _writer.getIndexSearcher();
+      IndexReader reader = searcher.getIndexReader();
+      assertEquals(1, reader.numDocs());
+      searcher.close();
+    }
+  }
+
+  @Test
   public void testBlurIndexWriter() throws IOException {
-    setupWriter(configuration, 5, false);
+    setupWriter(_configuration, 5, false);
     long s = System.nanoTime();
     int total = 0;
     TraceStorage oldStorage = Trace.getStorage();
@@ -150,8 +185,10 @@ public class BlurIndexSimpleWriterTest {
     });
     Trace.setupTrace("test");
     for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) {
-      writer.replaceRow(true, true, genRow());
-      IndexSearcherClosable searcher = writer.getIndexSearcher();
+      MutatableAction action = new MutatableAction(_writer.getShardContext());
+      action.replaceRow(genRow());
+      _writer.process(action);
+      IndexSearcherClosable searcher = _writer.getIndexSearcher();
       IndexReader reader = searcher.getIndexReader();
       assertEquals(i + 1, reader.numDocs());
       searcher.close();
@@ -162,31 +199,28 @@ public class BlurIndexSimpleWriterTest {
     double seconds = (e - s) / 1000000000.0;
     double rate = total / seconds;
     System.out.println("Rate " + rate);
-    IndexSearcherClosable searcher = writer.getIndexSearcher();
+    IndexSearcherClosable searcher = _writer.getIndexSearcher();
     IndexReader reader = searcher.getIndexReader();
     assertEquals(TEST_NUMBER_WAIT_VISIBLE, reader.numDocs());
     searcher.close();
     Trace.setStorage(oldStorage);
   }
 
-//  @Test
+  @Test
   public void testBlurIndexWriterFaster() throws IOException, InterruptedException {
-    // This test doesn't make any sense anymore, because it's no different than the first test.
-    setupWriter(configuration, 100, false);
-    IndexSearcherClosable searcher1 = writer.getIndexSearcher();
+    setupWriter(_configuration, 100, false);
+    IndexSearcherClosable searcher1 = _writer.getIndexSearcher();
     IndexReader reader1 = searcher1.getIndexReader();
     assertEquals(0, reader1.numDocs());
     searcher1.close();
     long s = System.nanoTime();
     int total = 0;
+    MutatableAction action = new MutatableAction(_writer.getShardContext());
     for (int i = 0; i < TEST_NUMBER; i++) {
-      if (i == TEST_NUMBER - 1) {
-        writer.replaceRow(true, true, genRow());
-      } else {
-        writer.replaceRow(false, true, genRow());
-      }
+      action.replaceRow(genRow());
       total++;
     }
+    _writer.process(action);
     long e = System.nanoTime();
     double seconds = (e - s) / 1000000000.0;
     double rate = total / seconds;
@@ -194,8 +228,8 @@ public class BlurIndexSimpleWriterTest {
     // //wait one second for the data to become visible the test is set to
     // refresh once every 25 ms
     Thread.sleep(1000);// Hack for now
-    writer.refresh();
-    IndexSearcherClosable searcher2 = writer.getIndexSearcher();
+    _writer.refresh();
+    IndexSearcherClosable searcher2 = _writer.getIndexSearcher();
     IndexReader reader2 = searcher2.getIndexReader();
     assertEquals(TEST_NUMBER, reader2.numDocs());
     searcher2.close();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
deleted file mode 100644
index 4abb9b9..0000000
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/BlurNRTIndexTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-package org.apache.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.blur.concurrent.Executors;
-import org.apache.blur.lucene.store.refcounter.DirectoryReferenceFileGC;
-import org.apache.blur.manager.indexserver.BlurIndexWarmup;
-import org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup;
-import org.apache.blur.server.IndexSearcherClosable;
-import org.apache.blur.server.ShardContext;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.store.FSDirectory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BlurNRTIndexTest {
-
-  private static final int TEST_NUMBER_WAIT_VISIBLE = 500;
-  private static final int TEST_NUMBER = 50000;
-
-  private static final File TMPDIR = new File("./target/tmp");
-
-  private BlurNRTIndex writer;
-  private Random random = new Random();
-  private ExecutorService service;
-  private File base;
-  private Configuration configuration;
-
-  private DirectoryReferenceFileGC gc;
-  private SharedMergeScheduler mergeScheduler;
-  private String uuid;
-  private BlurIndexWarmup indexWarmup;
-
-  @Before
-  public void setup() throws IOException {
-    TableContext.clear();
-    base = new File(TMPDIR, "blur-index-writer-test");
-    rm(base);
-    base.mkdirs();
-
-    mergeScheduler = new SharedMergeScheduler(1);
-    gc = new DirectoryReferenceFileGC();
-
-    configuration = new Configuration();
-    service = Executors.newThreadPool("test", 10);
-    indexWarmup = new DefaultBlurIndexWarmup(1000000);
-  }
-
-  private void setupWriter(Configuration configuration, long refresh, boolean reload) throws IOException {
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setName("test-table");
-    /*
-     * if reload is set to true...we create a new writer instance pointing to
-     * the same location as the old one..... so previous writer instances should
-     * be closed
-     */
-
-    if (!reload && uuid == null) {
-      uuid = UUID.randomUUID().toString();
-    }
-
-    tableDescriptor.setTableUri(new File(base, "table-store-" + uuid).toURI().toString());
-    tableDescriptor.putToTableProperties("blur.shard.time.between.refreshs", Long.toString(refresh));
-
-    TableContext tableContext = TableContext.create(tableDescriptor);
-    File path = new File(base, "index_" + uuid);
-    path.mkdirs();
-    FSDirectory directory = FSDirectory.open(path);
-    ShardContext shardContext = ShardContext.create(tableContext, "test-shard-" + uuid);
-    writer = new BlurNRTIndex(shardContext, directory, mergeScheduler, gc, service, null, null, indexWarmup);
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    writer.close();
-    mergeScheduler.close();
-    gc.close();
-    service.shutdownNow();
-    rm(base);
-  }
-
-  private void rm(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isDirectory()) {
-      for (File f : file.listFiles()) {
-        rm(f);
-      }
-    }
-    file.delete();
-  }
-
-  @Test
-  public void testBlurIndexWriter() throws IOException {
-    setupWriter(configuration, 5, false);
-    long s = System.nanoTime();
-    int total = 0;
-    for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) {
-      writer.replaceRow(true, true, genRow());
-      IndexSearcherClosable searcher = writer.getIndexSearcher();
-      IndexReader reader = searcher.getIndexReader();
-      assertEquals(i + 1, reader.numDocs());
-      searcher.close();
-      total++;
-    }
-    long e = System.nanoTime();
-    double seconds = (e - s) / 1000000000.0;
-    double rate = total / seconds;
-    System.out.println("Rate " + rate);
-    IndexSearcherClosable searcher = writer.getIndexSearcher();
-    IndexReader reader = searcher.getIndexReader();
-    assertEquals(TEST_NUMBER_WAIT_VISIBLE, reader.numDocs());
-    searcher.close();
-  }
-
-  @Test
-  public void testBlurIndexWriterFaster() throws IOException, InterruptedException {
-    setupWriter(configuration, 100, false);
-    IndexSearcherClosable searcher1 = writer.getIndexSearcher();
-    IndexReader reader1 = searcher1.getIndexReader();
-    assertEquals(0, reader1.numDocs());
-    searcher1.close();
-    long s = System.nanoTime();
-    int total = 0;
-    for (int i = 0; i < TEST_NUMBER; i++) {
-      if (i == TEST_NUMBER - 1) {
-        writer.replaceRow(true, true, genRow());
-      } else {
-        writer.replaceRow(false, true, genRow());
-      }
-      total++;
-    }
-    long e = System.nanoTime();
-    double seconds = (e - s) / 1000000000.0;
-    double rate = total / seconds;
-    System.out.println("Rate " + rate);
-    // //wait one second for the data to become visible the test is set to
-    // refresh once every 25 ms
-    // Thread.sleep(1000);
-    writer.refresh();
-    IndexSearcherClosable searcher2 = writer.getIndexSearcher();
-    IndexReader reader2 = searcher2.getIndexReader();
-    assertEquals(TEST_NUMBER, reader2.numDocs());
-    searcher2.close();
-  }
-
-  private Row genRow() {
-    Row row = new Row();
-    row.setId(Long.toString(random.nextLong()));
-    Record record = new Record();
-    record.setFamily("testing");
-    record.setRecordId(Long.toString(random.nextLong()));
-    for (int i = 0; i < 10; i++) {
-      record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
-    }
-    row.addToRecords(record);
-    return row;
-  }
-
-  @Test
-  public void testCreateSnapshot() throws IOException {
-    setupWriter(configuration, 5, false);
-    writer.createSnapshot("test_snapshot");
-    assertTrue(writer.getSnapshots().contains("test_snapshot"));
-
-    // check that the file is persisted
-    Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
-    FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
-    Path snapshotFilePath = new Path(snapshotsDirPath, "test_snapshot");
-    assertTrue(fileSystem.exists(snapshotFilePath));
-
-    // create a new writer instance and test whether the snapshots are loaded
-    // properly
-    writer.close();
-    setupWriter(configuration, 5, true);
-    assertTrue(writer.getSnapshots().contains("test_snapshot"));
-  }
-
-  @Test
-  public void testRemoveSnapshots() throws IOException {
-    setupWriter(configuration, 5, false);
-    Path snapshotsDirPath = writer.getSnapshotsDirectoryPath();
-    FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
-    fileSystem.mkdirs(snapshotsDirPath);
-
-    // create 2 files in snapshots sub-dir
-    Path snapshotFile1 = new Path(snapshotsDirPath, "test_snapshot1");
-    Path snapshotFile2 = new Path(snapshotsDirPath, "test_snapshot2");
-
-    BufferedWriter br1 = new BufferedWriter(new OutputStreamWriter(fileSystem.create(snapshotFile1, true)));
-    br1.write("segments_1");
-    br1.close();
-
-    BufferedWriter br2 = new BufferedWriter(new OutputStreamWriter(fileSystem.create(snapshotFile2, true)));
-    br2.write("segments_1");
-    br2.close();
-
-    // re-load the writer to load the snpshots
-    writer.close();
-    setupWriter(configuration, 5, true);
-    assertEquals(writer.getSnapshots().size(), 2);
-
-    writer.removeSnapshot("test_snapshot2");
-    assertEquals(writer.getSnapshots().size(), 1);
-    assertTrue(!writer.getSnapshots().contains("test_snapshot2"));
-    assertTrue(!fileSystem.exists(snapshotFile2));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
deleted file mode 100644
index 968ca77..0000000
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/TransactionRecorderTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package org.apache.blur.manager.writer;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.blur.MiniCluster;
-import org.apache.blur.log.Log;
-import org.apache.blur.log.LogFactory;
-import org.apache.blur.server.ShardContext;
-import org.apache.blur.server.TableContext;
-import org.apache.blur.thrift.generated.Column;
-import org.apache.blur.thrift.generated.Record;
-import org.apache.blur.thrift.generated.Row;
-import org.apache.blur.thrift.generated.TableDescriptor;
-import org.apache.blur.utils.BlurUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.lucene.analysis.core.KeywordAnalyzer;
-import org.apache.lucene.index.BlurIndexWriter;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.RAMDirectory;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TransactionRecorderTest {
-
-  private static final Log LOG = LogFactory.getLog(TransactionRecorderTest.class);
-  private static MiniCluster miniCluster;
-
-  @BeforeClass
-  public static void setup() {
-    miniCluster = new MiniCluster();
-    miniCluster.startDfs(new File("target", "transaction-recorder-test").getAbsolutePath());
-  }
-
-  @AfterClass
-  public static void teardown() throws IOException {
-    miniCluster.shutdownDfs();
-  }
-
-  private Collection<Closeable> closeThis = new HashSet<Closeable>();
-
-  @After
-  public void after() {
-    for (Closeable closeable : closeThis) {
-      IOUtils.cleanup(LOG, closeable);
-    }
-  }
-
-  @Test
-  public void testReplaySimpleTest() throws IOException, InterruptedException {
-    TableContext.clear();
-    Configuration configuration = new Configuration(false);
-    URI fileSystemUri = miniCluster.getFileSystemUri();
-    Path path = new Path(fileSystemUri.toString() + "/transaction-recorder-test");
-    FileSystem fileSystem = path.getFileSystem(configuration);
-    fileSystem.delete(path, true);
-
-    KeywordAnalyzer analyzer = new KeywordAnalyzer();
-
-    TableDescriptor tableDescriptor = new TableDescriptor();
-    tableDescriptor.setName("table");
-    String tableUri = new Path(path, "tableuri").toUri().toString();
-
-    System.out.println("tableUri=" + tableUri);
-    tableDescriptor.setTableUri(tableUri);
-
-    TableContext tableContext = TableContext.create(tableDescriptor);
-    ShardContext shardContext = ShardContext.create(tableContext, "shard-1");
-    TransactionRecorder transactionRecorder = new TransactionRecorder(shardContext);
-    closeThis.add(transactionRecorder);
-    transactionRecorder.open();
-
-    try {
-      transactionRecorder.replaceRow(true, genRow(), null);
-      fail("Should NPE");
-    } catch (NullPointerException e) {
-    }
-
-    Thread.sleep(TimeUnit.SECONDS.toMillis(2));
-
-    RAMDirectory directory = new RAMDirectory();
-    IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, analyzer);
-    BlurIndexWriter writer = new BlurIndexWriter(directory, conf);
-
-    TransactionRecorder replayTransactionRecorder = new TransactionRecorder(shardContext);
-    closeThis.add(replayTransactionRecorder);
-    System.out.println("REPLAY");
-    replayTransactionRecorder.replay(writer);
-    System.out.println("REPLAY COMPLETE");
-    IndexReader reader = DirectoryReader.open(directory);
-    System.out.println("assert");
-    assertEquals(1, reader.numDocs());
-  }
-
-  @Test
-  public void testConvertShouldPass() {
-    String rowId = "RowId_123-1";
-    Record record = new Record();
-    record.setRecordId("RecordId_123-1");
-    record.setFamily("Family_123-1");
-
-    Column column = new Column();
-    column.setName("columnName_123-1");
-    record.setColumns(Arrays.asList(column));
-
-    BlurUtil.validateRowIdAndRecord(rowId, record);
-    assert (true);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testConvertWithBadFamilyNameShouldFail() {
-    String rowId = "RowId_123-1";
-    Record record = new Record();
-    record.setRecordId("RecordId_123-1");
-    record.setFamily("Family_123.1");
-
-    Column column = new Column();
-    column.setName("columnName_123-1");
-    record.setColumns(Arrays.asList(column));
-
-    BlurUtil.validateRowIdAndRecord(rowId, record);
-    fail();
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testConvertWithBadColumnNameShouldFail() {
-    String rowId = "RowId_123-1";
-    Record record = new Record();
-    record.setRecordId("RecordId_123-1");
-    record.setFamily("Family_123-1");
-
-    Column column = new Column();
-    column.setName("columnName_123.1");
-    record.setColumns(Arrays.asList(column));
-
-    BlurUtil.validateRowIdAndRecord(rowId, record);
-    fail();
-  }
-
-  private Row genRow() {
-    Row row = new Row();
-    row.id = "1";
-    Record record = new Record();
-    record.recordId = "1";
-    record.family = "test";
-    record.addToColumns(new Column("name", "value"));
-    row.addToRecords(record);
-    return row;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/01b4c760/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
index 67f0b92..dff34c8 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/BlurReducer.java
@@ -37,7 +37,6 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.codec.Blur022Codec;
 import org.apache.blur.lucene.search.FairSimilarity;
-import org.apache.blur.manager.writer.TransactionRecorder;
 import org.apache.blur.mapreduce.BlurTask.INDEXING_TYPE;
 import org.apache.blur.mapreduce.lib.BlurColumn;
 import org.apache.blur.mapreduce.lib.BlurMutate;
@@ -55,6 +54,7 @@ import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurConstants;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.ResetableDocumentStoredFieldVisitor;
+import org.apache.blur.utils.RowDocumentUtil;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -478,10 +478,10 @@ public class BlurReducer extends Reducer<Text, BlurMutate, Text, BlurMutate> {
 
   protected Document toDocument(BlurRecord record) throws IOException {
     Document document = new Document();
-    document.add(new Field(BlurConstants.ROW_ID, record.getRowId(), TransactionRecorder.ID_TYPE));
-    document.add(new Field(BlurConstants.RECORD_ID, record.getRecordId(), TransactionRecorder.ID_TYPE));
+    document.add(new Field(BlurConstants.ROW_ID, record.getRowId(), RowDocumentUtil.ID_TYPE));
+    document.add(new Field(BlurConstants.RECORD_ID, record.getRecordId(), RowDocumentUtil.ID_TYPE));
 
-    List<Field> doc = TransactionRecorder.getDoc(_fieldManager, record.getRowId(), toRecord(record));
+    List<Field> doc = RowDocumentUtil.getDoc(_fieldManager, record.getRowId(), toRecord(record));
     for (Field field : doc) {
       document.add(field);
     }


Mime
View raw message