incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [8/9] git commit: Adding bulk mutate implementation.
Date Thu, 18 Dec 2014 23:21:57 GMT
Adding bulk mutate implementation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/89945db1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/89945db1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/89945db1

Branch: refs/heads/master
Commit: 89945db12a98bca1290e6ccbedbbee7c526803ba
Parents: a0a7d7d
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Dec 18 18:21:14 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Dec 18 18:21:14 2014 -0500

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   |  30 ++-
 .../apache/blur/manager/writer/BlurIndex.java   |   6 +
 .../blur/manager/writer/BlurIndexReadOnly.java  |  15 ++
 .../manager/writer/BlurIndexSimpleWriter.java   | 201 ++++++++++++++++++-
 .../apache/blur/server/FilteredBlurServer.java  |  15 ++
 .../blur/thrift/BlurControllerServer.java       |  75 +++++++
 .../org/apache/blur/thrift/BlurShardServer.java |  39 ++++
 .../blur/command/ShardCommandManagerTest.java   |  15 ++
 .../blur/manager/writer/IndexImporterTest.java  |  15 ++
 9 files changed, 407 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index 51c4561..0dbc634 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -1015,18 +1015,18 @@ public class IndexManager {
     List<String> terms = new ArrayList<String>(size);
     AtomicReader areader = BlurUtil.getAtomicReader(reader);
     Terms termsAll = areader.terms(term.field());
-    
+
     if (termsAll == null) {
       return terms;
     }
 
     TermsEnum termEnum = termsAll.iterator(null);
     SeekStatus status = termEnum.seekCeil(term.bytes());
-    
+
     if (status == SeekStatus.END) {
       return terms;
     }
-    
+
     BytesRef currentTermText = termEnum.term();
     do {
       terms.add(currentTermText.utf8ToString());
@@ -1289,4 +1289,28 @@ public class IndexManager {
     enqueue(Arrays.asList(mutation));
   }
 
+  public void bulkMutateStart(String table, String bulkId) throws BlurException, IOException
{
+    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
+    for (BlurIndex index : indexes.values()) {
+      index.startBulkMutate(bulkId);
+    }
+  }
+
+  public void bulkMutateAdd(String table, String bulkId, RowMutation mutation) throws BlurException,
IOException {
+    String shard = MutationHelper.getShardName(table, mutation.rowId, getNumberOfShards(table),
_blurPartitioner);
+    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
+    BlurIndex blurIndex = indexes.get(shard);
+    if (blurIndex == null) {
+      throw new BException("Shard [{0}] for table [{1}] not found on this server.", shard,
table);
+    }
+    blurIndex.addBulkMutate(bulkId, mutation);
+  }
+
+  public void bulkMutateFinish(String table, String bulkId, boolean apply, boolean blockUntilComplete)
throws BlurException, IOException {
+    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
+    for (BlurIndex index : indexes.values()) {
+      index.finishBulkMutate(bulkId, apply,blockUntilComplete);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
index 1ca8f0c..eec3f65 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
@@ -136,4 +136,10 @@ public abstract class BlurIndex {
 
   public abstract void enqueue(List<RowMutation> mutations) throws IOException;
 
+  public abstract void startBulkMutate(String bulkId) throws IOException;
+
+  public abstract void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException;
+
+  public abstract void addBulkMutate(String bulkId, RowMutation mutation) throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
index c2aee75..a33dc62 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
@@ -74,4 +74,19 @@ public class BlurIndexReadOnly extends BlurIndex {
     throw new RuntimeException("Read-only shard");
   }
 
+  @Override
+  public void startBulkMutate(String bulkId) {
+    throw new RuntimeException("Read-only shard");
+  }
+
+  @Override
+  public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
{
+    throw new RuntimeException("Read-only shard");
+  }
+
+  @Override
+  public void addBulkMutate(String bulkId, RowMutation mutation) {
+    throw new RuntimeException("Read-only shard");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index 792a7d8..a07cc34 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -22,9 +22,11 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Timer;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,15 +47,27 @@ import org.apache.blur.lucene.codec.Blur024Codec;
 import org.apache.blur.server.IndexSearcherClosable;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.TableContext;
+import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.trace.Trace;
 import org.apache.blur.trace.Tracer;
+import org.apache.blur.utils.BlurUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Sorter;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.index.BlurIndexWriter;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.store.Directory;
@@ -88,10 +102,12 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   private final BlockingQueue<RowMutation> _queue;
   private final MutationQueueProcessor _mutationQueueProcessor;
   private final Timer _indexImporterTimer;
+  private final Map<String, BulkEntry> _bulkWriters;
 
   public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
       final ExecutorService searchExecutor, BlurIndexCloser indexCloser, Timer indexImporterTimer)
throws IOException {
     super(shardContext, directory, mergeScheduler, searchExecutor, indexCloser, indexImporterTimer);
+    _bulkWriters = new ConcurrentHashMap<String, BlurIndexSimpleWriter.BulkEntry>();
     _indexImporterTimer = indexImporterTimer;
     _searchThreadPool = searchExecutor;
     _shardContext = shardContext;
@@ -103,7 +119,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
     _conf.setCodec(new Blur024Codec(_tableContext.getBlurConfiguration()));
     _conf.setSimilarity(_tableContext.getSimilarity());
-    _conf.setInfoStream(new LoggingInfoStream(_tableContext.getTable(),_shardContext.getShard()));
+    _conf.setInfoStream(new LoggingInfoStream(_tableContext.getTable(), _shardContext.getShard()));
     TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
     mergePolicy.setUseCompoundFile(false);
     _conf.setMergeScheduler(mergeScheduler.getMergeScheduler());
@@ -381,4 +397,187 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     _mutationQueueProcessor.startIfNotRunning();
   }
 
+  static class BulkEntry {
+    final SequenceFile.Writer _writer;
+    final Path _path;
+
+    BulkEntry(Writer writer, Path path) {
+      _writer = writer;
+      _path = path;
+    }
+  }
+
+  @Override
+  public void startBulkMutate(String bulkId) throws IOException {
+    BulkEntry bulkEntry = _bulkWriters.get(bulkId);
+    if (bulkEntry == null) {
+      Path tablePath = _tableContext.getTablePath();
+      Path bulk = new Path(tablePath, "bulk");
+      Path bulkInstance = new Path(bulk, bulkId);
+      Path path = new Path(bulkInstance, _shardContext.getShard() + ".notsorted.seq");
+      Configuration configuration = _tableContext.getConfiguration();
+      FileSystem fileSystem = path.getFileSystem(configuration);
+      Writer writer = new SequenceFile.Writer(fileSystem, configuration, path, Text.class,
BytesWritable.class);
+      _bulkWriters.put(bulkId, new BulkEntry(writer, path));
+    } else {
+      LOG.info("Bulk [{0}] mutate already started on shard [{1}] in table [{2}].", bulkId,
_shardContext.getShard(),
+          _tableContext.getTable());
+    }
+  }
+
+  @Override
+  public void finishBulkMutate(final String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException {
+    final String table = _tableContext.getTable();
+    final String shard = _shardContext.getShard();
+
+    LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply [{1}]", bulkId, apply,
table, shard);
+    final BulkEntry bulkEntry = _bulkWriters.get(bulkId);
+    bulkEntry._writer.close();
+
+    Configuration configuration = _tableContext.getConfiguration();
+    final Path path = bulkEntry._path;
+    final FileSystem fileSystem = path.getFileSystem(configuration);
+
+    if (!apply) {
+      fileSystem.delete(path, false);
+      Path parent = path.getParent();
+      removeParentIfLastFile(fileSystem, parent);
+    } else {
+      Runnable runnable = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            process(new IndexAction() {
+              private Path _sorted;
+
+              @Override
+              public void performMutate(IndexSearcherClosable searcher, IndexWriter writer)
throws IOException {
+                Configuration configuration = _tableContext.getConfiguration();
+
+                SequenceFile.Sorter sorter = new Sorter(fileSystem, Text.class, BytesWritable.class,
configuration);
+
+                _sorted = new Path(path.getParent(), shard + ".sorted.seq");
+
+                LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates path [{0}] sorted path
[{1}]", path, _sorted, table,
+                    shard, bulkId);
+                sorter.sort(path, _sorted);
+
+                LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates sorted path [{0}]", _sorted,
table, shard, bulkId);
+                Reader reader = new SequenceFile.Reader(fileSystem, _sorted, configuration);
+
+                Text key = new Text();
+                BytesWritable value = new BytesWritable();
+
+                Text last = null;
+                List<RowMutation> list = new ArrayList<RowMutation>();
+                while (reader.next(key, value)) {
+                  if (!key.equals(last)) {
+                    flushMutates(searcher, writer, list);
+                    last = new Text(key);
+                    list.clear();
+                  }
+                  list.add(fromBytesWritable(value));
+                }
+                flushMutates(searcher, writer, list);
+                reader.close();
+                LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying mutates starting commit.",
table, shard, bulkId);
+              }
+
+              private void flushMutates(IndexSearcherClosable searcher, IndexWriter writer,
List<RowMutation> list)
+                  throws IOException {
+                if (!list.isEmpty()) {
+                  List<RowMutation> reduceMutates;
+                  try {
+                    reduceMutates = MutatableAction.reduceMutates(list);
+                  } catch (BlurException e) {
+                    throw new IOException(e);
+                  }
+                  for (RowMutation mutation : reduceMutates) {
+                    MutatableAction mutatableAction = new MutatableAction(_shardContext);
+                    mutatableAction.mutate(mutation);
+                    mutatableAction.performMutate(searcher, writer);
+                  }
+                }
+              }
+
+              private void cleanupFiles() throws IOException {
+                fileSystem.delete(path, false);
+                fileSystem.delete(_sorted, false);
+                Path parent = path.getParent();
+                removeParentIfLastFile(fileSystem, parent);
+              }
+
+              @Override
+              public void doPreRollback(IndexWriter writer) throws IOException {
+
+              }
+
+              @Override
+              public void doPreCommit(IndexSearcherClosable indexSearcher, IndexWriter writer)
throws IOException {
+
+              }
+
+              @Override
+              public void doPostRollback(IndexWriter writer) throws IOException {
+                cleanupFiles();
+              }
+
+              @Override
+              public void doPostCommit(IndexWriter writer) throws IOException {
+                cleanupFiles();
+              }
+            });
+          } catch (IOException e) {
+            LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while trying to finish the
bulk updates.", table, shard,
+                bulkId);
+          }
+        }
+      };
+      if (blockUntilComplete) {
+        runnable.run();
+      } else {
+        Thread thread = new Thread(runnable);
+        thread.setName("Bulk Finishing Thread Table [" + table + "] Shard [" + shard + "]
BulkId [" + bulkId + "]");
+        thread.start();
+      }
+    }
+  }
+
+  @Override
+  public void addBulkMutate(String bulkId, RowMutation mutation) throws IOException {
+    BulkEntry bulkEntry = _bulkWriters.get(bulkId);
+    if (bulkEntry == null) {
+      throw new IOException("Bulk writer for [" + bulkId + "] not found.");
+    }
+    bulkEntry._writer.append(getKey(mutation), toBytesWritable(mutation));
+  }
+
+  private Text getKey(RowMutation mutation) {
+    return new Text(mutation.getRowId());
+  }
+
+  private BytesWritable toBytesWritable(RowMutation mutation) {
+    BytesWritable value = new BytesWritable();
+    byte[] bytes = BlurUtil.toBytes(mutation);
+    value.set(bytes, 0, bytes.length);
+    return value;
+  }
+
+  private RowMutation fromBytesWritable(BytesWritable value) {
+    return (RowMutation) BlurUtil.fromBytes(value.getBytes(), 0, value.getLength());
+  }
+
+  private static void removeParentIfLastFile(final FileSystem fileSystem, Path parent) throws
IOException {
+    FileStatus[] listStatus = fileSystem.listStatus(parent);
+    if (listStatus != null) {
+      if (listStatus.length == 0) {
+        if (!fileSystem.delete(parent, false)) {
+          if (fileSystem.exists(parent)) {
+            LOG.error("Could not remove parent directory [{0}]", parent);
+          }
+        }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
index d561560..47a9376 100644
--- a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
+++ b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
@@ -291,4 +291,19 @@ public class FilteredBlurServer implements Iface {
     _iface.loadData(table, location);
   }
 
+  @Override
+  public void bulkMutateStart(String table, String bulkId) throws BlurException, TException
{
+    _iface.bulkMutateStart(table, bulkId);
+  }
+
+  @Override
+  public void bulkMutateAdd(String table, String bulkId, RowMutation rowMutation) throws
BlurException, TException {
+    _iface.bulkMutateAdd(table, bulkId, rowMutation);
+  }
+
+  @Override
+  public void bulkMutateFinish(String table, String bulkId, boolean apply, boolean blockUntilComplete)
throws BlurException, TException {
+    _iface.bulkMutateFinish(table, bulkId, apply, blockUntilComplete);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 23f719c..4cd2327 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -1673,4 +1673,79 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     throw new BException("Not Implemented");
   }
 
+  @Override
+  public void bulkMutateStart(final String table, final String bulkId) throws BlurException,
TException {
+    String cluster = getCluster(table);
+    try {
+      scatter(cluster, new BlurCommand<Void>() {
+        @Override
+        public Void call(Client client) throws BlurException, TException {
+          client.bulkMutateStart(table, bulkId);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get start a bulk mutate [{0}] [{1}]", e, table,
bulkId);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void bulkMutateAdd(final String table, final String bulkId, final RowMutation mutation)
throws BlurException, TException {
+    try {
+      checkTable(mutation.table);
+      checkForUpdates(mutation.table);
+      MutationHelper.validateMutation(mutation);
+      if (!table.equals(mutation.getTable())) {
+        throw new BException("RowMutation table [{0}] has to match method table [{1}]", mutation.getTable(),
table);
+      }
+
+      int numberOfShards = getShardCount(table);
+      Map<String, String> tableLayout = getTableLayout(table);
+      if (tableLayout.size() != numberOfShards) {
+        throw new BException("Cannot update data while shard is missing");
+      }
+
+      String shardName = MutationHelper.getShardName(table, mutation.rowId, numberOfShards,
_blurPartitioner);
+      String node = tableLayout.get(shardName);
+      _client.execute(node, new BlurCommand<Void>() {
+        @Override
+        public Void call(Client client) throws BlurException, TException {
+          client.bulkMutateAdd(table, bulkId, mutation);
+          return null;
+        }
+      }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
+    } catch (Exception e) {
+      LOG.error("Unknown error during bulk mutation of [{0}]", e, mutation);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException("Unknown error during bulk mutation of [{0}]", e, mutation);
+    }
+  }
+
+  @Override
+  public void bulkMutateFinish(final String table, final String bulkId, final boolean apply,
final boolean blockUntilComplete) throws BlurException,
+      TException {
+    String cluster = getCluster(table);
+    try {
+      scatter(cluster, new BlurCommand<Void>() {
+        @Override
+        public Void call(Client client) throws BlurException, TException {
+          client.bulkMutateFinish(table, bulkId, apply, blockUntilComplete);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to get finish a bulk mutate [{0}] [{1}]", e,
table, bulkId);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 135feaf..976896c 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -695,4 +695,43 @@ public class BlurShardServer extends TableAdmin implements Iface {
     return CommandStatusStateEnum.valueOf(state.name());
   }
 
+  @Override
+  public void bulkMutateStart(String table, String bulkId) throws BlurException, TException
{
+    try {
+      _indexManager.bulkMutateStart(table, bulkId);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to start a bulk mutate on table [" + table +
"]", e);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void bulkMutateAdd(String table, String bulkId, RowMutation rowMutation) throws
BlurException, TException {
+    try {
+      _indexManager.bulkMutateAdd(table, bulkId, rowMutation);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to add to a bulk mutate on table [" + table +
"]", e);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void bulkMutateFinish(String table, String bulkId, boolean apply, boolean blockUntilComplete)
throws BlurException, TException {
+    try {
+      _indexManager.bulkMutateFinish(table, bulkId, apply,blockUntilComplete);
+    } catch (Exception e) {
+      LOG.error("Unknown error while trying to finsh a bulk mutate on table [" + table +
"]", e);
+      if (e instanceof BlurException) {
+        throw (BlurException) e;
+      }
+      throw new BException(e.getMessage(), e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
index d2b4eed..673838b 100644
--- a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
@@ -430,6 +430,21 @@ public class ShardCommandManagerTest {
       public void close() throws IOException {
         throw new RuntimeException("Not implemented.");
       }
+
+      @Override
+      public void startBulkMutate(String bulkId) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void addBulkMutate(String bulkId, RowMutation mutation) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
index 28b0d21..9e83196 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -197,6 +197,21 @@ public class IndexImporterTest {
       public void enqueue(List<RowMutation> mutations) {
         throw new RuntimeException("Not Implemented");
       }
+
+      @Override
+      public void startBulkMutate(String bulkId) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
+
+      @Override
+      public void addBulkMutate(String bulkId, RowMutation mutation) throws IOException {
+        throw new RuntimeException("Not implemented.");
+      }
     };
   }
 


Mime
View raw message