incubator-blur-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Williams <william...@gmail.com>
Subject Re: [8/9] git commit: Adding bulk mutate implementation.
Date Fri, 19 Dec 2014 12:25:09 GMT
Good stuff, makes me think the mutateBatch, with it's current weak
guarantee, should just go away or be implemented on top of this?

With the BlurIndexSimpleWriter, is it the case that this doesn't yet
handle shard failovers - I tried to figure out how _bulkWriters could
get reinflated on startup but didn't see it so not sure if I was
missing something or its not there yet?  Also wondering if the sorted
path needs to be a temporary file and moved into place?

--tim


On Thu, Dec 18, 2014 at 6:21 PM,  <amccurry@apache.org> wrote:
> Adding bulk mutate implementation.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
> Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/89945db1
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/89945db1
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/89945db1
>
> Branch: refs/heads/master
> Commit: 89945db12a98bca1290e6ccbedbbee7c526803ba
> Parents: a0a7d7d
> Author: Aaron McCurry <amccurry@gmail.com>
> Authored: Thu Dec 18 18:21:14 2014 -0500
> Committer: Aaron McCurry <amccurry@gmail.com>
> Committed: Thu Dec 18 18:21:14 2014 -0500
>
> ----------------------------------------------------------------------
>  .../org/apache/blur/manager/IndexManager.java   |  30 ++-
>  .../apache/blur/manager/writer/BlurIndex.java   |   6 +
>  .../blur/manager/writer/BlurIndexReadOnly.java  |  15 ++
>  .../manager/writer/BlurIndexSimpleWriter.java   | 201 ++++++++++++++++++-
>  .../apache/blur/server/FilteredBlurServer.java  |  15 ++
>  .../blur/thrift/BlurControllerServer.java       |  75 +++++++
>  .../org/apache/blur/thrift/BlurShardServer.java |  39 ++++
>  .../blur/command/ShardCommandManagerTest.java   |  15 ++
>  .../blur/manager/writer/IndexImporterTest.java  |  15 ++
>  9 files changed, 407 insertions(+), 4 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> index 51c4561..0dbc634 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> @@ -1015,18 +1015,18 @@ public class IndexManager {
>      List<String> terms = new ArrayList<String>(size);
>      AtomicReader areader = BlurUtil.getAtomicReader(reader);
>      Terms termsAll = areader.terms(term.field());
> -
> +
>      if (termsAll == null) {
>        return terms;
>      }
>
>      TermsEnum termEnum = termsAll.iterator(null);
>      SeekStatus status = termEnum.seekCeil(term.bytes());
> -
> +
>      if (status == SeekStatus.END) {
>        return terms;
>      }
> -
> +
>      BytesRef currentTermText = termEnum.term();
>      do {
>        terms.add(currentTermText.utf8ToString());
> @@ -1289,4 +1289,28 @@ public class IndexManager {
>      enqueue(Arrays.asList(mutation));
>    }
>
> +  public void bulkMutateStart(String table, String bulkId) throws BlurException, IOException
{
> +    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
> +    for (BlurIndex index : indexes.values()) {
> +      index.startBulkMutate(bulkId);
> +    }
> +  }
> +
> +  public void bulkMutateAdd(String table, String bulkId, RowMutation mutation) throws
BlurException, IOException {
> +    String shard = MutationHelper.getShardName(table, mutation.rowId, getNumberOfShards(table),
_blurPartitioner);
> +    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
> +    BlurIndex blurIndex = indexes.get(shard);
> +    if (blurIndex == null) {
> +      throw new BException("Shard [{0}] for table [{1}] not found on this server.",
shard, table);
> +    }
> +    blurIndex.addBulkMutate(bulkId, mutation);
> +  }
> +
> +  public void bulkMutateFinish(String table, String bulkId, boolean apply, boolean blockUntilComplete)
throws BlurException, IOException {
> +    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
> +    for (BlurIndex index : indexes.values()) {
> +      index.finishBulkMutate(bulkId, apply,blockUntilComplete);
> +    }
> +  }
> +
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> index 1ca8f0c..eec3f65 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> @@ -136,4 +136,10 @@ public abstract class BlurIndex {
>
>    public abstract void enqueue(List<RowMutation> mutations) throws IOException;
>
> +  public abstract void startBulkMutate(String bulkId) throws IOException;
> +
> +  public abstract void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException;
> +
> +  public abstract void addBulkMutate(String bulkId, RowMutation mutation) throws IOException;
> +
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> index c2aee75..a33dc62 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> @@ -74,4 +74,19 @@ public class BlurIndexReadOnly extends BlurIndex {
>      throw new RuntimeException("Read-only shard");
>    }
>
> +  @Override
> +  public void startBulkMutate(String bulkId) {
> +    throw new RuntimeException("Read-only shard");
> +  }
> +
> +  @Override
> +  public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
{
> +    throw new RuntimeException("Read-only shard");
> +  }
> +
> +  @Override
> +  public void addBulkMutate(String bulkId, RowMutation mutation) {
> +    throw new RuntimeException("Read-only shard");
> +  }
> +
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> index 792a7d8..a07cc34 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> @@ -22,9 +22,11 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_
>  import java.io.IOException;
>  import java.util.ArrayList;
>  import java.util.List;
> +import java.util.Map;
>  import java.util.Timer;
>  import java.util.concurrent.ArrayBlockingQueue;
>  import java.util.concurrent.BlockingQueue;
> +import java.util.concurrent.ConcurrentHashMap;
>  import java.util.concurrent.ExecutorService;
>  import java.util.concurrent.TimeUnit;
>  import java.util.concurrent.atomic.AtomicBoolean;
> @@ -45,15 +47,27 @@ import org.apache.blur.lucene.codec.Blur024Codec;
>  import org.apache.blur.server.IndexSearcherClosable;
>  import org.apache.blur.server.ShardContext;
>  import org.apache.blur.server.TableContext;
> +import org.apache.blur.thrift.generated.BlurException;
>  import org.apache.blur.thrift.generated.RowMutation;
>  import org.apache.blur.trace.Trace;
>  import org.apache.blur.trace.Tracer;
> +import org.apache.blur.utils.BlurUtil;
> +import org.apache.hadoop.conf.Configuration;
> +import org.apache.hadoop.fs.FileStatus;
> +import org.apache.hadoop.fs.FileSystem;
>  import org.apache.hadoop.fs.Path;
> +import org.apache.hadoop.io.BytesWritable;
>  import org.apache.hadoop.io.IOUtils;
> +import org.apache.hadoop.io.SequenceFile;
> +import org.apache.hadoop.io.SequenceFile.Reader;
> +import org.apache.hadoop.io.SequenceFile.Sorter;
> +import org.apache.hadoop.io.SequenceFile.Writer;
> +import org.apache.hadoop.io.Text;
>  import org.apache.lucene.analysis.Analyzer;
>  import org.apache.lucene.index.BlurIndexWriter;
>  import org.apache.lucene.index.DirectoryReader;
>  import org.apache.lucene.index.IndexReader;
> +import org.apache.lucene.index.IndexWriter;
>  import org.apache.lucene.index.IndexWriterConfig;
>  import org.apache.lucene.index.TieredMergePolicy;
>  import org.apache.lucene.store.Directory;
> @@ -88,10 +102,12 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>    private final BlockingQueue<RowMutation> _queue;
>    private final MutationQueueProcessor _mutationQueueProcessor;
>    private final Timer _indexImporterTimer;
> +  private final Map<String, BulkEntry> _bulkWriters;
>
>    public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
>        final ExecutorService searchExecutor, BlurIndexCloser indexCloser, Timer indexImporterTimer)
throws IOException {
>      super(shardContext, directory, mergeScheduler, searchExecutor, indexCloser, indexImporterTimer);
> +    _bulkWriters = new ConcurrentHashMap<String, BlurIndexSimpleWriter.BulkEntry>();
>      _indexImporterTimer = indexImporterTimer;
>      _searchThreadPool = searchExecutor;
>      _shardContext = shardContext;
> @@ -103,7 +119,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>      _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
>      _conf.setCodec(new Blur024Codec(_tableContext.getBlurConfiguration()));
>      _conf.setSimilarity(_tableContext.getSimilarity());
> -    _conf.setInfoStream(new LoggingInfoStream(_tableContext.getTable(),_shardContext.getShard()));
> +    _conf.setInfoStream(new LoggingInfoStream(_tableContext.getTable(), _shardContext.getShard()));
>      TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
>      mergePolicy.setUseCompoundFile(false);
>      _conf.setMergeScheduler(mergeScheduler.getMergeScheduler());
> @@ -381,4 +397,187 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>      _mutationQueueProcessor.startIfNotRunning();
>    }
>
> +  static class BulkEntry {
> +    final SequenceFile.Writer _writer;
> +    final Path _path;
> +
> +    BulkEntry(Writer writer, Path path) {
> +      _writer = writer;
> +      _path = path;
> +    }
> +  }
> +
> +  @Override
> +  public void startBulkMutate(String bulkId) throws IOException {
> +    BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> +    if (bulkEntry == null) {
> +      Path tablePath = _tableContext.getTablePath();
> +      Path bulk = new Path(tablePath, "bulk");
> +      Path bulkInstance = new Path(bulk, bulkId);
> +      Path path = new Path(bulkInstance, _shardContext.getShard() + ".notsorted.seq");
> +      Configuration configuration = _tableContext.getConfiguration();
> +      FileSystem fileSystem = path.getFileSystem(configuration);
> +      Writer writer = new SequenceFile.Writer(fileSystem, configuration, path, Text.class,
BytesWritable.class);
> +      _bulkWriters.put(bulkId, new BulkEntry(writer, path));
> +    } else {
> +      LOG.info("Bulk [{0}] mutate already started on shard [{1}] in table [{2}].", bulkId,
_shardContext.getShard(),
> +          _tableContext.getTable());
> +    }
> +  }
> +
> +  @Override
> +  public void finishBulkMutate(final String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException {
> +    final String table = _tableContext.getTable();
> +    final String shard = _shardContext.getShard();
> +
> +    LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply [{1}]", bulkId, apply,
table, shard);
> +    final BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> +    bulkEntry._writer.close();
> +
> +    Configuration configuration = _tableContext.getConfiguration();
> +    final Path path = bulkEntry._path;
> +    final FileSystem fileSystem = path.getFileSystem(configuration);
> +
> +    if (!apply) {
> +      fileSystem.delete(path, false);
> +      Path parent = path.getParent();
> +      removeParentIfLastFile(fileSystem, parent);
> +    } else {
> +      Runnable runnable = new Runnable() {
> +        @Override
> +        public void run() {
> +          try {
> +            process(new IndexAction() {
> +              private Path _sorted;
> +
> +              @Override
> +              public void performMutate(IndexSearcherClosable searcher, IndexWriter
writer) throws IOException {
> +                Configuration configuration = _tableContext.getConfiguration();
> +
> +                SequenceFile.Sorter sorter = new Sorter(fileSystem, Text.class, BytesWritable.class,
configuration);
> +
> +                _sorted = new Path(path.getParent(), shard + ".sorted.seq");
> +
> +                LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates path [{0}] sorted
path [{1}]", path, _sorted, table,
> +                    shard, bulkId);
> +                sorter.sort(path, _sorted);
> +
> +                LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates sorted path [{0}]",
_sorted, table, shard, bulkId);
> +                Reader reader = new SequenceFile.Reader(fileSystem, _sorted, configuration);
> +
> +                Text key = new Text();
> +                BytesWritable value = new BytesWritable();
> +
> +                Text last = null;
> +                List<RowMutation> list = new ArrayList<RowMutation>();
> +                while (reader.next(key, value)) {
> +                  if (!key.equals(last)) {
> +                    flushMutates(searcher, writer, list);
> +                    last = new Text(key);
> +                    list.clear();
> +                  }
> +                  list.add(fromBytesWritable(value));
> +                }
> +                flushMutates(searcher, writer, list);
> +                reader.close();
> +                LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying mutates starting
commit.", table, shard, bulkId);
> +              }
> +
> +              private void flushMutates(IndexSearcherClosable searcher, IndexWriter
writer, List<RowMutation> list)
> +                  throws IOException {
> +                if (!list.isEmpty()) {
> +                  List<RowMutation> reduceMutates;
> +                  try {
> +                    reduceMutates = MutatableAction.reduceMutates(list);
> +                  } catch (BlurException e) {
> +                    throw new IOException(e);
> +                  }
> +                  for (RowMutation mutation : reduceMutates) {
> +                    MutatableAction mutatableAction = new MutatableAction(_shardContext);
> +                    mutatableAction.mutate(mutation);
> +                    mutatableAction.performMutate(searcher, writer);
> +                  }
> +                }
> +              }
> +
> +              private void cleanupFiles() throws IOException {
> +                fileSystem.delete(path, false);
> +                fileSystem.delete(_sorted, false);
> +                Path parent = path.getParent();
> +                removeParentIfLastFile(fileSystem, parent);
> +              }
> +
> +              @Override
> +              public void doPreRollback(IndexWriter writer) throws IOException {
> +
> +              }
> +
> +              @Override
> +              public void doPreCommit(IndexSearcherClosable indexSearcher, IndexWriter
writer) throws IOException {
> +
> +              }
> +
> +              @Override
> +              public void doPostRollback(IndexWriter writer) throws IOException {
> +                cleanupFiles();
> +              }
> +
> +              @Override
> +              public void doPostCommit(IndexWriter writer) throws IOException {
> +                cleanupFiles();
> +              }
> +            });
> +          } catch (IOException e) {
> +            LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while trying to finish
the bulk updates.", table, shard,
> +                bulkId);
> +          }
> +        }
> +      };
> +      if (blockUntilComplete) {
> +        runnable.run();
> +      } else {
> +        Thread thread = new Thread(runnable);
> +        thread.setName("Bulk Finishing Thread Table [" + table + "] Shard [" + shard
+ "] BulkId [" + bulkId + "]");
> +        thread.start();
> +      }
> +    }
> +  }
> +
> +  @Override
> +  public void addBulkMutate(String bulkId, RowMutation mutation) throws IOException
{
> +    BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> +    if (bulkEntry == null) {
> +      throw new IOException("Bulk writer for [" + bulkId + "] not found.");
> +    }
> +    bulkEntry._writer.append(getKey(mutation), toBytesWritable(mutation));
> +  }
> +
> +  private Text getKey(RowMutation mutation) {
> +    return new Text(mutation.getRowId());
> +  }
> +
> +  private BytesWritable toBytesWritable(RowMutation mutation) {
> +    BytesWritable value = new BytesWritable();
> +    byte[] bytes = BlurUtil.toBytes(mutation);
> +    value.set(bytes, 0, bytes.length);
> +    return value;
> +  }
> +
> +  private RowMutation fromBytesWritable(BytesWritable value) {
> +    return (RowMutation) BlurUtil.fromBytes(value.getBytes(), 0, value.getLength());
> +  }
> +
> +  private static void removeParentIfLastFile(final FileSystem fileSystem, Path parent)
throws IOException {
> +    FileStatus[] listStatus = fileSystem.listStatus(parent);
> +    if (listStatus != null) {
> +      if (listStatus.length == 0) {
> +        if (!fileSystem.delete(parent, false)) {
> +          if (fileSystem.exists(parent)) {
> +            LOG.error("Could not remove parent directory [{0}]", parent);
> +          }
> +        }
> +      }
> +    }
> +  }
> +
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
> index d561560..47a9376 100644
> --- a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
> +++ b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
> @@ -291,4 +291,19 @@ public class FilteredBlurServer implements Iface {
>      _iface.loadData(table, location);
>    }
>
> +  @Override
> +  public void bulkMutateStart(String table, String bulkId) throws BlurException, TException
{
> +    _iface.bulkMutateStart(table, bulkId);
> +  }
> +
> +  @Override
> +  public void bulkMutateAdd(String table, String bulkId, RowMutation rowMutation) throws
BlurException, TException {
> +    _iface.bulkMutateAdd(table, bulkId, rowMutation);
> +  }
> +
> +  @Override
> +  public void bulkMutateFinish(String table, String bulkId, boolean apply, boolean blockUntilComplete)
throws BlurException, TException {
> +    _iface.bulkMutateFinish(table, bulkId, apply, blockUntilComplete);
> +  }
> +
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
> index 23f719c..4cd2327 100644
> --- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
> +++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
> @@ -1673,4 +1673,79 @@ public class BlurControllerServer extends TableAdmin implements
Iface {
>      throw new BException("Not Implemented");
>    }
>
> +  @Override
> +  public void bulkMutateStart(final String table, final String bulkId) throws BlurException,
TException {
> +    String cluster = getCluster(table);
> +    try {
> +      scatter(cluster, new BlurCommand<Void>() {
> +        @Override
> +        public Void call(Client client) throws BlurException, TException {
> +          client.bulkMutateStart(table, bulkId);
> +          return null;
> +        }
> +      });
> +    } catch (Exception e) {
> +      LOG.error("Unknown error while trying to get start a bulk mutate [{0}] [{1}]",
e, table, bulkId);
> +      if (e instanceof BlurException) {
> +        throw (BlurException) e;
> +      }
> +      throw new BException(e.getMessage(), e);
> +    }
> +  }
> +
> +  @Override
> +  public void bulkMutateAdd(final String table, final String bulkId, final RowMutation
mutation) throws BlurException, TException {
> +    try {
> +      checkTable(mutation.table);
> +      checkForUpdates(mutation.table);
> +      MutationHelper.validateMutation(mutation);
> +      if (!table.equals(mutation.getTable())) {
> +        throw new BException("RowMutation table [{0}] has to match method table [{1}]",
mutation.getTable(), table);
> +      }
> +
> +      int numberOfShards = getShardCount(table);
> +      Map<String, String> tableLayout = getTableLayout(table);
> +      if (tableLayout.size() != numberOfShards) {
> +        throw new BException("Cannot update data while shard is missing");
> +      }
> +
> +      String shardName = MutationHelper.getShardName(table, mutation.rowId, numberOfShards,
_blurPartitioner);
> +      String node = tableLayout.get(shardName);
> +      _client.execute(node, new BlurCommand<Void>() {
> +        @Override
> +        public Void call(Client client) throws BlurException, TException {
> +          client.bulkMutateAdd(table, bulkId, mutation);
> +          return null;
> +        }
> +      }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
> +    } catch (Exception e) {
> +      LOG.error("Unknown error during bulk mutation of [{0}]", e, mutation);
> +      if (e instanceof BlurException) {
> +        throw (BlurException) e;
> +      }
> +      throw new BException("Unknown error during bulk mutation of [{0}]", e, mutation);
> +    }
> +  }
> +
> +  @Override
> +  public void bulkMutateFinish(final String table, final String bulkId, final boolean
apply, final boolean blockUntilComplete) throws BlurException,
> +      TException {
> +    String cluster = getCluster(table);
> +    try {
> +      scatter(cluster, new BlurCommand<Void>() {
> +        @Override
> +        public Void call(Client client) throws BlurException, TException {
> +          client.bulkMutateFinish(table, bulkId, apply, blockUntilComplete);
> +          return null;
> +        }
> +      });
> +    } catch (Exception e) {
> +      LOG.error("Unknown error while trying to get finish a bulk mutate [{0}] [{1}]",
e, table, bulkId);
> +      if (e instanceof BlurException) {
> +        throw (BlurException) e;
> +      }
> +      throw new BException(e.getMessage(), e);
> +    }
> +  }
> +
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
> index 135feaf..976896c 100644
> --- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
> +++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
> @@ -695,4 +695,43 @@ public class BlurShardServer extends TableAdmin implements Iface
{
>      return CommandStatusStateEnum.valueOf(state.name());
>    }
>
> +  @Override
> +  public void bulkMutateStart(String table, String bulkId) throws BlurException, TException
{
> +    try {
> +      _indexManager.bulkMutateStart(table, bulkId);
> +    } catch (Exception e) {
> +      LOG.error("Unknown error while trying to start a bulk mutate on table [" + table
+ "]", e);
> +      if (e instanceof BlurException) {
> +        throw (BlurException) e;
> +      }
> +      throw new BException(e.getMessage(), e);
> +    }
> +  }
> +
> +  @Override
> +  public void bulkMutateAdd(String table, String bulkId, RowMutation rowMutation) throws
BlurException, TException {
> +    try {
> +      _indexManager.bulkMutateAdd(table, bulkId, rowMutation);
> +    } catch (Exception e) {
> +      LOG.error("Unknown error while trying to add to a bulk mutate on table [" + table
+ "]", e);
> +      if (e instanceof BlurException) {
> +        throw (BlurException) e;
> +      }
> +      throw new BException(e.getMessage(), e);
> +    }
> +  }
> +
> +  @Override
> +  public void bulkMutateFinish(String table, String bulkId, boolean apply, boolean blockUntilComplete)
throws BlurException, TException {
> +    try {
> +      _indexManager.bulkMutateFinish(table, bulkId, apply,blockUntilComplete);
> +    } catch (Exception e) {
> +      LOG.error("Unknown error while trying to finsh a bulk mutate on table [" + table
+ "]", e);
> +      if (e instanceof BlurException) {
> +        throw (BlurException) e;
> +      }
> +      throw new BException(e.getMessage(), e);
> +    }
> +  }
> +
>  }
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
> index d2b4eed..673838b 100644
> --- a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
> +++ b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
> @@ -430,6 +430,21 @@ public class ShardCommandManagerTest {
>        public void close() throws IOException {
>          throw new RuntimeException("Not implemented.");
>        }
> +
> +      @Override
> +      public void startBulkMutate(String bulkId) throws IOException {
> +        throw new RuntimeException("Not implemented.");
> +      }
> +
> +      @Override
> +      public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException {
> +        throw new RuntimeException("Not implemented.");
> +      }
> +
> +      @Override
> +      public void addBulkMutate(String bulkId, RowMutation mutation) throws IOException
{
> +        throw new RuntimeException("Not implemented.");
> +      }
>      };
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
> index 28b0d21..9e83196 100644
> --- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
> +++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
> @@ -197,6 +197,21 @@ public class IndexImporterTest {
>        public void enqueue(List<RowMutation> mutations) {
>          throw new RuntimeException("Not Implemented");
>        }
> +
> +      @Override
> +      public void startBulkMutate(String bulkId) throws IOException {
> +        throw new RuntimeException("Not implemented.");
> +      }
> +
> +      @Override
> +      public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException {
> +        throw new RuntimeException("Not implemented.");
> +      }
> +
> +      @Override
> +      public void addBulkMutate(String bulkId, RowMutation mutation) throws IOException
{
> +        throw new RuntimeException("Not implemented.");
> +      }
>      };
>    }
>
>

Mime
View raw message