incubator-blur-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aaron McCurry <amccu...@gmail.com>
Subject Re: [8/9] git commit: Adding bulk mutate implementation.
Date Fri, 19 Dec 2014 13:17:55 GMT
On Friday, December 19, 2014, Tim Williams <williamstw@gmail.com> wrote:

> Good stuff, makes me think the mutateBatch, with it's current weak
> guarantee, should just go away or be implemented on top of this?


Yes I was thinking the same thing.  Though there isn't a total commit
guarantee with the new solution.  Yet.

>
> With the BlurIndexSimpleWriter, is it the case that this doesn't yet
> handle shard failovers - I tried to figure out how _bulkWriters could
> get reinflated on startup but didn't see it so not sure if I was
> missing something or its not there yet?  Also wondering if the sorted
> path needs to be a temporary file and moved into place?


You are correct it doesn't survive fail overs yet.  We should discuss some
strageties for that.


>
> --tim
>
>
> On Thu, Dec 18, 2014 at 6:21 PM,  <amccurry@apache.org <javascript:;>>
> wrote:
> > Adding bulk mutate implementation.
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
> > Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/89945db1
> > Tree:
> http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/89945db1
> > Diff:
> http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/89945db1
> >
> > Branch: refs/heads/master
> > Commit: 89945db12a98bca1290e6ccbedbbee7c526803ba
> > Parents: a0a7d7d
> > Author: Aaron McCurry <amccurry@gmail.com <javascript:;>>
> > Authored: Thu Dec 18 18:21:14 2014 -0500
> > Committer: Aaron McCurry <amccurry@gmail.com <javascript:;>>
> > Committed: Thu Dec 18 18:21:14 2014 -0500
> >
> > ----------------------------------------------------------------------
> >  .../org/apache/blur/manager/IndexManager.java   |  30 ++-
> >  .../apache/blur/manager/writer/BlurIndex.java   |   6 +
> >  .../blur/manager/writer/BlurIndexReadOnly.java  |  15 ++
> >  .../manager/writer/BlurIndexSimpleWriter.java   | 201
> ++++++++++++++++++-
> >  .../apache/blur/server/FilteredBlurServer.java  |  15 ++
> >  .../blur/thrift/BlurControllerServer.java       |  75 +++++++
> >  .../org/apache/blur/thrift/BlurShardServer.java |  39 ++++
> >  .../blur/command/ShardCommandManagerTest.java   |  15 ++
> >  .../blur/manager/writer/IndexImporterTest.java  |  15 ++
> >  9 files changed, 407 insertions(+), 4 deletions(-)
> > ----------------------------------------------------------------------
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> > index 51c4561..0dbc634 100644
> > --- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> > +++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
> > @@ -1015,18 +1015,18 @@ public class IndexManager {
> >      List<String> terms = new ArrayList<String>(size);
> >      AtomicReader areader = BlurUtil.getAtomicReader(reader);
> >      Terms termsAll = areader.terms(term.field());
> > -
> > +
> >      if (termsAll == null) {
> >        return terms;
> >      }
> >
> >      TermsEnum termEnum = termsAll.iterator(null);
> >      SeekStatus status = termEnum.seekCeil(term.bytes());
> > -
> > +
> >      if (status == SeekStatus.END) {
> >        return terms;
> >      }
> > -
> > +
> >      BytesRef currentTermText = termEnum.term();
> >      do {
> >        terms.add(currentTermText.utf8ToString());
> > @@ -1289,4 +1289,28 @@ public class IndexManager {
> >      enqueue(Arrays.asList(mutation));
> >    }
> >
> > +  public void bulkMutateStart(String table, String bulkId) throws
> BlurException, IOException {
> > +    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
> > +    for (BlurIndex index : indexes.values()) {
> > +      index.startBulkMutate(bulkId);
> > +    }
> > +  }
> > +
> > +  public void bulkMutateAdd(String table, String bulkId, RowMutation
> mutation) throws BlurException, IOException {
> > +    String shard = MutationHelper.getShardName(table, mutation.rowId,
> getNumberOfShards(table), _blurPartitioner);
> > +    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
> > +    BlurIndex blurIndex = indexes.get(shard);
> > +    if (blurIndex == null) {
> > +      throw new BException("Shard [{0}] for table [{1}] not found on
> this server.", shard, table);
> > +    }
> > +    blurIndex.addBulkMutate(bulkId, mutation);
> > +  }
> > +
> > +  public void bulkMutateFinish(String table, String bulkId, boolean
> apply, boolean blockUntilComplete) throws BlurException, IOException {
> > +    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
> > +    for (BlurIndex index : indexes.values()) {
> > +      index.finishBulkMutate(bulkId, apply,blockUntilComplete);
> > +    }
> > +  }
> > +
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > index 1ca8f0c..eec3f65 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > @@ -136,4 +136,10 @@ public abstract class BlurIndex {
> >
> >    public abstract void enqueue(List<RowMutation> mutations) throws
> IOException;
> >
> > +  public abstract void startBulkMutate(String bulkId) throws
> IOException;
> > +
> > +  public abstract void finishBulkMutate(String bulkId, boolean apply,
> boolean blockUntilComplete) throws IOException;
> > +
> > +  public abstract void addBulkMutate(String bulkId, RowMutation
> mutation) throws IOException;
> > +
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > index c2aee75..a33dc62 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > @@ -74,4 +74,19 @@ public class BlurIndexReadOnly extends BlurIndex {
> >      throw new RuntimeException("Read-only shard");
> >    }
> >
> > +  @Override
> > +  public void startBulkMutate(String bulkId) {
> > +    throw new RuntimeException("Read-only shard");
> > +  }
> > +
> > +  @Override
> > +  public void finishBulkMutate(String bulkId, boolean apply, boolean
> blockUntilComplete) {
> > +    throw new RuntimeException("Read-only shard");
> > +  }
> > +
> > +  @Override
> > +  public void addBulkMutate(String bulkId, RowMutation mutation) {
> > +    throw new RuntimeException("Read-only shard");
> > +  }
> > +
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> > index 792a7d8..a07cc34 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> > @@ -22,9 +22,11 @@ import static
> org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_
> >  import java.io.IOException;
> >  import java.util.ArrayList;
> >  import java.util.List;
> > +import java.util.Map;
> >  import java.util.Timer;
> >  import java.util.concurrent.ArrayBlockingQueue;
> >  import java.util.concurrent.BlockingQueue;
> > +import java.util.concurrent.ConcurrentHashMap;
> >  import java.util.concurrent.ExecutorService;
> >  import java.util.concurrent.TimeUnit;
> >  import java.util.concurrent.atomic.AtomicBoolean;
> > @@ -45,15 +47,27 @@ import org.apache.blur.lucene.codec.Blur024Codec;
> >  import org.apache.blur.server.IndexSearcherClosable;
> >  import org.apache.blur.server.ShardContext;
> >  import org.apache.blur.server.TableContext;
> > +import org.apache.blur.thrift.generated.BlurException;
> >  import org.apache.blur.thrift.generated.RowMutation;
> >  import org.apache.blur.trace.Trace;
> >  import org.apache.blur.trace.Tracer;
> > +import org.apache.blur.utils.BlurUtil;
> > +import org.apache.hadoop.conf.Configuration;
> > +import org.apache.hadoop.fs.FileStatus;
> > +import org.apache.hadoop.fs.FileSystem;
> >  import org.apache.hadoop.fs.Path;
> > +import org.apache.hadoop.io.BytesWritable;
> >  import org.apache.hadoop.io.IOUtils;
> > +import org.apache.hadoop.io.SequenceFile;
> > +import org.apache.hadoop.io.SequenceFile.Reader;
> > +import org.apache.hadoop.io.SequenceFile.Sorter;
> > +import org.apache.hadoop.io.SequenceFile.Writer;
> > +import org.apache.hadoop.io.Text;
> >  import org.apache.lucene.analysis.Analyzer;
> >  import org.apache.lucene.index.BlurIndexWriter;
> >  import org.apache.lucene.index.DirectoryReader;
> >  import org.apache.lucene.index.IndexReader;
> > +import org.apache.lucene.index.IndexWriter;
> >  import org.apache.lucene.index.IndexWriterConfig;
> >  import org.apache.lucene.index.TieredMergePolicy;
> >  import org.apache.lucene.store.Directory;
> > @@ -88,10 +102,12 @@ public class BlurIndexSimpleWriter extends
> BlurIndex {
> >    private final BlockingQueue<RowMutation> _queue;
> >    private final MutationQueueProcessor _mutationQueueProcessor;
> >    private final Timer _indexImporterTimer;
> > +  private final Map<String, BulkEntry> _bulkWriters;
> >
> >    public BlurIndexSimpleWriter(ShardContext shardContext, Directory
> directory, SharedMergeScheduler mergeScheduler,
> >        final ExecutorService searchExecutor, BlurIndexCloser
> indexCloser, Timer indexImporterTimer) throws IOException {
> >      super(shardContext, directory, mergeScheduler, searchExecutor,
> indexCloser, indexImporterTimer);
> > +    _bulkWriters = new ConcurrentHashMap<String,
> BlurIndexSimpleWriter.BulkEntry>();
> >      _indexImporterTimer = indexImporterTimer;
> >      _searchThreadPool = searchExecutor;
> >      _shardContext = shardContext;
> > @@ -103,7 +119,7 @@ public class BlurIndexSimpleWriter extends BlurIndex
> {
> >      _conf.setWriteLockTimeout(TimeUnit.MINUTES.toMillis(5));
> >      _conf.setCodec(new
> Blur024Codec(_tableContext.getBlurConfiguration()));
> >      _conf.setSimilarity(_tableContext.getSimilarity());
> > -    _conf.setInfoStream(new
> LoggingInfoStream(_tableContext.getTable(),_shardContext.getShard()));
> > +    _conf.setInfoStream(new LoggingInfoStream(_tableContext.getTable(),
> _shardContext.getShard()));
> >      TieredMergePolicy mergePolicy = (TieredMergePolicy)
> _conf.getMergePolicy();
> >      mergePolicy.setUseCompoundFile(false);
> >      _conf.setMergeScheduler(mergeScheduler.getMergeScheduler());
> > @@ -381,4 +397,187 @@ public class BlurIndexSimpleWriter extends
> BlurIndex {
> >      _mutationQueueProcessor.startIfNotRunning();
> >    }
> >
> > +  static class BulkEntry {
> > +    final SequenceFile.Writer _writer;
> > +    final Path _path;
> > +
> > +    BulkEntry(Writer writer, Path path) {
> > +      _writer = writer;
> > +      _path = path;
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void startBulkMutate(String bulkId) throws IOException {
> > +    BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> > +    if (bulkEntry == null) {
> > +      Path tablePath = _tableContext.getTablePath();
> > +      Path bulk = new Path(tablePath, "bulk");
> > +      Path bulkInstance = new Path(bulk, bulkId);
> > +      Path path = new Path(bulkInstance, _shardContext.getShard() +
> ".notsorted.seq");
> > +      Configuration configuration = _tableContext.getConfiguration();
> > +      FileSystem fileSystem = path.getFileSystem(configuration);
> > +      Writer writer = new SequenceFile.Writer(fileSystem,
> configuration, path, Text.class, BytesWritable.class);
> > +      _bulkWriters.put(bulkId, new BulkEntry(writer, path));
> > +    } else {
> > +      LOG.info("Bulk [{0}] mutate already started on shard [{1}] in
> table [{2}].", bulkId, _shardContext.getShard(),
> > +          _tableContext.getTable());
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void finishBulkMutate(final String bulkId, boolean apply,
> boolean blockUntilComplete) throws IOException {
> > +    final String table = _tableContext.getTable();
> > +    final String shard = _shardContext.getShard();
> > +
> > +    LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply
> [{1}]", bulkId, apply, table, shard);
> > +    final BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> > +    bulkEntry._writer.close();
> > +
> > +    Configuration configuration = _tableContext.getConfiguration();
> > +    final Path path = bulkEntry._path;
> > +    final FileSystem fileSystem = path.getFileSystem(configuration);
> > +
> > +    if (!apply) {
> > +      fileSystem.delete(path, false);
> > +      Path parent = path.getParent();
> > +      removeParentIfLastFile(fileSystem, parent);
> > +    } else {
> > +      Runnable runnable = new Runnable() {
> > +        @Override
> > +        public void run() {
> > +          try {
> > +            process(new IndexAction() {
> > +              private Path _sorted;
> > +
> > +              @Override
> > +              public void performMutate(IndexSearcherClosable searcher,
> IndexWriter writer) throws IOException {
> > +                Configuration configuration =
> _tableContext.getConfiguration();
> > +
> > +                SequenceFile.Sorter sorter = new Sorter(fileSystem,
> Text.class, BytesWritable.class, configuration);
> > +
> > +                _sorted = new Path(path.getParent(), shard +
> ".sorted.seq");
> > +
> > +                LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates path
> [{0}] sorted path [{1}]", path, _sorted, table,
> > +                    shard, bulkId);
> > +                sorter.sort(path, _sorted);
> > +
> > +                LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates
> sorted path [{0}]", _sorted, table, shard, bulkId);
> > +                Reader reader = new SequenceFile.Reader(fileSystem,
> _sorted, configuration);
> > +
> > +                Text key = new Text();
> > +                BytesWritable value = new BytesWritable();
> > +
> > +                Text last = null;
> > +                List<RowMutation> list = new ArrayList<RowMutation>();
> > +                while (reader.next(key, value)) {
> > +                  if (!key.equals(last)) {
> > +                    flushMutates(searcher, writer, list);
> > +                    last = new Text(key);
> > +                    list.clear();
> > +                  }
> > +                  list.add(fromBytesWritable(value));
> > +                }
> > +                flushMutates(searcher, writer, list);
> > +                reader.close();
> > +                LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying
> mutates starting commit.", table, shard, bulkId);
> > +              }
> > +
> > +              private void flushMutates(IndexSearcherClosable searcher,
> IndexWriter writer, List<RowMutation> list)
> > +                  throws IOException {
> > +                if (!list.isEmpty()) {
> > +                  List<RowMutation> reduceMutates;
> > +                  try {
> > +                    reduceMutates = MutatableAction.reduceMutates(list);
> > +                  } catch (BlurException e) {
> > +                    throw new IOException(e);
> > +                  }
> > +                  for (RowMutation mutation : reduceMutates) {
> > +                    MutatableAction mutatableAction = new
> MutatableAction(_shardContext);
> > +                    mutatableAction.mutate(mutation);
> > +                    mutatableAction.performMutate(searcher, writer);
> > +                  }
> > +                }
> > +              }
> > +
> > +              private void cleanupFiles() throws IOException {
> > +                fileSystem.delete(path, false);
> > +                fileSystem.delete(_sorted, false);
> > +                Path parent = path.getParent();
> > +                removeParentIfLastFile(fileSystem, parent);
> > +              }
> > +
> > +              @Override
> > +              public void doPreRollback(IndexWriter writer) throws
> IOException {
> > +
> > +              }
> > +
> > +              @Override
> > +              public void doPreCommit(IndexSearcherClosable
> indexSearcher, IndexWriter writer) throws IOException {
> > +
> > +              }
> > +
> > +              @Override
> > +              public void doPostRollback(IndexWriter writer) throws
> IOException {
> > +                cleanupFiles();
> > +              }
> > +
> > +              @Override
> > +              public void doPostCommit(IndexWriter writer) throws
> IOException {
> > +                cleanupFiles();
> > +              }
> > +            });
> > +          } catch (IOException e) {
> > +            LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while
> trying to finish the bulk updates.", table, shard,
> > +                bulkId);
> > +          }
> > +        }
> > +      };
> > +      if (blockUntilComplete) {
> > +        runnable.run();
> > +      } else {
> > +        Thread thread = new Thread(runnable);
> > +        thread.setName("Bulk Finishing Thread Table [" + table + "]
> Shard [" + shard + "] BulkId [" + bulkId + "]");
> > +        thread.start();
> > +      }
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void addBulkMutate(String bulkId, RowMutation mutation) throws
> IOException {
> > +    BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> > +    if (bulkEntry == null) {
> > +      throw new IOException("Bulk writer for [" + bulkId + "] not
> found.");
> > +    }
> > +    bulkEntry._writer.append(getKey(mutation),
> toBytesWritable(mutation));
> > +  }
> > +
> > +  private Text getKey(RowMutation mutation) {
> > +    return new Text(mutation.getRowId());
> > +  }
> > +
> > +  private BytesWritable toBytesWritable(RowMutation mutation) {
> > +    BytesWritable value = new BytesWritable();
> > +    byte[] bytes = BlurUtil.toBytes(mutation);
> > +    value.set(bytes, 0, bytes.length);
> > +    return value;
> > +  }
> > +
> > +  private RowMutation fromBytesWritable(BytesWritable value) {
> > +    return (RowMutation) BlurUtil.fromBytes(value.getBytes(), 0,
> value.getLength());
> > +  }
> > +
> > +  private static void removeParentIfLastFile(final FileSystem
> fileSystem, Path parent) throws IOException {
> > +    FileStatus[] listStatus = fileSystem.listStatus(parent);
> > +    if (listStatus != null) {
> > +      if (listStatus.length == 0) {
> > +        if (!fileSystem.delete(parent, false)) {
> > +          if (fileSystem.exists(parent)) {
> > +            LOG.error("Could not remove parent directory [{0}]",
> parent);
> > +          }
> > +        }
> > +      }
> > +    }
> > +  }
> > +
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
> b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
> > index d561560..47a9376 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java
> > @@ -291,4 +291,19 @@ public class FilteredBlurServer implements Iface {
> >      _iface.loadData(table, location);
> >    }
> >
> > +  @Override
> > +  public void bulkMutateStart(String table, String bulkId) throws
> BlurException, TException {
> > +    _iface.bulkMutateStart(table, bulkId);
> > +  }
> > +
> > +  @Override
> > +  public void bulkMutateAdd(String table, String bulkId, RowMutation
> rowMutation) throws BlurException, TException {
> > +    _iface.bulkMutateAdd(table, bulkId, rowMutation);
> > +  }
> > +
> > +  @Override
> > +  public void bulkMutateFinish(String table, String bulkId, boolean
> apply, boolean blockUntilComplete) throws BlurException, TException {
> > +    _iface.bulkMutateFinish(table, bulkId, apply, blockUntilComplete);
> > +  }
> > +
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
> b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
> > index 23f719c..4cd2327 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
> > @@ -1673,4 +1673,79 @@ public class BlurControllerServer extends
> TableAdmin implements Iface {
> >      throw new BException("Not Implemented");
> >    }
> >
> > +  @Override
> > +  public void bulkMutateStart(final String table, final String bulkId)
> throws BlurException, TException {
> > +    String cluster = getCluster(table);
> > +    try {
> > +      scatter(cluster, new BlurCommand<Void>() {
> > +        @Override
> > +        public Void call(Client client) throws BlurException,
> TException {
> > +          client.bulkMutateStart(table, bulkId);
> > +          return null;
> > +        }
> > +      });
> > +    } catch (Exception e) {
> > +      LOG.error("Unknown error while trying to get start a bulk mutate
> [{0}] [{1}]", e, table, bulkId);
> > +      if (e instanceof BlurException) {
> > +        throw (BlurException) e;
> > +      }
> > +      throw new BException(e.getMessage(), e);
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void bulkMutateAdd(final String table, final String bulkId,
> final RowMutation mutation) throws BlurException, TException {
> > +    try {
> > +      checkTable(mutation.table);
> > +      checkForUpdates(mutation.table);
> > +      MutationHelper.validateMutation(mutation);
> > +      if (!table.equals(mutation.getTable())) {
> > +        throw new BException("RowMutation table [{0}] has to match
> method table [{1}]", mutation.getTable(), table);
> > +      }
> > +
> > +      int numberOfShards = getShardCount(table);
> > +      Map<String, String> tableLayout = getTableLayout(table);
> > +      if (tableLayout.size() != numberOfShards) {
> > +        throw new BException("Cannot update data while shard is
> missing");
> > +      }
> > +
> > +      String shardName = MutationHelper.getShardName(table,
> mutation.rowId, numberOfShards, _blurPartitioner);
> > +      String node = tableLayout.get(shardName);
> > +      _client.execute(node, new BlurCommand<Void>() {
> > +        @Override
> > +        public Void call(Client client) throws BlurException,
> TException {
> > +          client.bulkMutateAdd(table, bulkId, mutation);
> > +          return null;
> > +        }
> > +      }, _maxMutateRetries, _mutateDelay, _maxMutateDelay);
> > +    } catch (Exception e) {
> > +      LOG.error("Unknown error during bulk mutation of [{0}]", e,
> mutation);
> > +      if (e instanceof BlurException) {
> > +        throw (BlurException) e;
> > +      }
> > +      throw new BException("Unknown error during bulk mutation of
> [{0}]", e, mutation);
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void bulkMutateFinish(final String table, final String bulkId,
> final boolean apply, final boolean blockUntilComplete) throws BlurException,
> > +      TException {
> > +    String cluster = getCluster(table);
> > +    try {
> > +      scatter(cluster, new BlurCommand<Void>() {
> > +        @Override
> > +        public Void call(Client client) throws BlurException,
> TException {
> > +          client.bulkMutateFinish(table, bulkId, apply,
> blockUntilComplete);
> > +          return null;
> > +        }
> > +      });
> > +    } catch (Exception e) {
> > +      LOG.error("Unknown error while trying to get finish a bulk mutate
> [{0}] [{1}]", e, table, bulkId);
> > +      if (e instanceof BlurException) {
> > +        throw (BlurException) e;
> > +      }
> > +      throw new BException(e.getMessage(), e);
> > +    }
> > +  }
> > +
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
> b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
> > index 135feaf..976896c 100644
> > --- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
> > +++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
> > @@ -695,4 +695,43 @@ public class BlurShardServer extends TableAdmin
> implements Iface {
> >      return CommandStatusStateEnum.valueOf(state.name());
> >    }
> >
> > +  @Override
> > +  public void bulkMutateStart(String table, String bulkId) throws
> BlurException, TException {
> > +    try {
> > +      _indexManager.bulkMutateStart(table, bulkId);
> > +    } catch (Exception e) {
> > +      LOG.error("Unknown error while trying to start a bulk mutate on
> table [" + table + "]", e);
> > +      if (e instanceof BlurException) {
> > +        throw (BlurException) e;
> > +      }
> > +      throw new BException(e.getMessage(), e);
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void bulkMutateAdd(String table, String bulkId, RowMutation
> rowMutation) throws BlurException, TException {
> > +    try {
> > +      _indexManager.bulkMutateAdd(table, bulkId, rowMutation);
> > +    } catch (Exception e) {
> > +      LOG.error("Unknown error while trying to add to a bulk mutate on
> table [" + table + "]", e);
> > +      if (e instanceof BlurException) {
> > +        throw (BlurException) e;
> > +      }
> > +      throw new BException(e.getMessage(), e);
> > +    }
> > +  }
> > +
> > +  @Override
> > +  public void bulkMutateFinish(String table, String bulkId, boolean
> apply, boolean blockUntilComplete) throws BlurException, TException {
> > +    try {
> > +      _indexManager.bulkMutateFinish(table, bulkId,
> apply,blockUntilComplete);
> > +    } catch (Exception e) {
> > +      LOG.error("Unknown error while trying to finsh a bulk mutate on
> table [" + table + "]", e);
> > +      if (e instanceof BlurException) {
> > +        throw (BlurException) e;
> > +      }
> > +      throw new BException(e.getMessage(), e);
> > +    }
> > +  }
> > +
> >  }
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
> b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
> > index d2b4eed..673838b 100644
> > ---
> a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
> > +++
> b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
> > @@ -430,6 +430,21 @@ public class ShardCommandManagerTest {
> >        public void close() throws IOException {
> >          throw new RuntimeException("Not implemented.");
> >        }
> > +
> > +      @Override
> > +      public void startBulkMutate(String bulkId) throws IOException {
> > +        throw new RuntimeException("Not implemented.");
> > +      }
> > +
> > +      @Override
> > +      public void finishBulkMutate(String bulkId, boolean apply,
> boolean blockUntilComplete) throws IOException {
> > +        throw new RuntimeException("Not implemented.");
> > +      }
> > +
> > +      @Override
> > +      public void addBulkMutate(String bulkId, RowMutation mutation)
> throws IOException {
> > +        throw new RuntimeException("Not implemented.");
> > +      }
> >      };
> >    }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/89945db1/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
> b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
> > index 28b0d21..9e83196 100644
> > ---
> a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
> > +++
> b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
> > @@ -197,6 +197,21 @@ public class IndexImporterTest {
> >        public void enqueue(List<RowMutation> mutations) {
> >          throw new RuntimeException("Not Implemented");
> >        }
> > +
> > +      @Override
> > +      public void startBulkMutate(String bulkId) throws IOException {
> > +        throw new RuntimeException("Not implemented.");
> > +      }
> > +
> > +      @Override
> > +      public void finishBulkMutate(String bulkId, boolean apply,
> boolean blockUntilComplete) throws IOException {
> > +        throw new RuntimeException("Not implemented.");
> > +      }
> > +
> > +      @Override
> > +      public void addBulkMutate(String bulkId, RowMutation mutation)
> throws IOException {
> > +        throw new RuntimeException("Not implemented.");
> > +      }
> >      };
> >    }
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message