incubator-blur-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Williams <william...@gmail.com>
Subject Re: [1/3] git commit: Fixing issue with bulk mutate where writers may become idle and later they fail because of lease exception issues.
Date Wed, 11 Feb 2015 14:43:55 GMT
On Wed, Feb 11, 2015 at 9:19 AM,  <amccurry@apache.org> wrote:
> Repository: incubator-blur
> Updated Branches:
>   refs/heads/master 1f61fff25 -> 45c9d4cdb
>
>
> Fixing issue with bulk mutate where writers may become idle and later they fail because
of lease exception issues.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
> Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9714a1de
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9714a1de
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9714a1de
>
> Branch: refs/heads/master
> Commit: 9714a1ded7788a008640a0ec6c0bfda585960c96
> Parents: 1f61fff
> Author: Aaron McCurry <amccurry@gmail.com>
> Authored: Wed Feb 11 08:50:49 2015 -0500
> Committer: Aaron McCurry <amccurry@gmail.com>
> Committed: Wed Feb 11 08:50:49 2015 -0500
>
> ----------------------------------------------------------------------
>  .../indexserver/DistributedIndexServer.java     |   8 +-
>  .../manager/indexserver/LocalIndexServer.java   |   4 +-
>  .../apache/blur/manager/writer/BlurIndex.java   |   2 +-
>  .../blur/manager/writer/BlurIndexReadOnly.java  |   2 +-
>  .../manager/writer/BlurIndexSimpleWriter.java   | 461 ++++++++++++-------
>  .../org/apache/blur/server/TableContext.java    |   9 +-
>  .../blur/thrift/ThriftBlurShardServer.java      |  12 +-
>  .../blur/command/ShardCommandManagerTest.java   |   2 +-
>  .../writer/BlurIndexSimpleWriterTest.java       |  14 +-
>  .../blur/manager/writer/IndexImporterTest.java  |   2 +-
>  .../apache/blur/thrift/BlurClusterTestBase.java |   2 +-
>  11 files changed, 326 insertions(+), 192 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> index 79a5b3c..324090f 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> @@ -116,15 +116,17 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
>    private final int _minimumNumberOfNodes;
>    private final Timer _hdfsKeyValueTimer;
>    private final Timer _indexImporterTimer;
> +  private final Timer _indexBulkTimer;
>
>    public DistributedIndexServer(Configuration configuration, ZooKeeper zookeeper, ClusterStatus
clusterStatus,
>        BlurFilterCache filterCache, BlockCacheDirectoryFactory blockCacheDirectoryFactory,
>        DistributedLayoutFactory distributedLayoutFactory, String cluster, String nodeName,
long safeModeDelay,
>        int shardOpenerThreadCount, int maxMergeThreads, int internalSearchThreads,
> -      int minimumNumberOfNodesBeforeExitingSafeMode, Timer hdfsKeyValueTimer, Timer
indexImporterTimer, long smallMergeThreshold)
> -      throws KeeperException, InterruptedException {
> +      int minimumNumberOfNodesBeforeExitingSafeMode, Timer hdfsKeyValueTimer, Timer
indexImporterTimer,
> +      long smallMergeThreshold, Timer indexBulkTimer) throws KeeperException, InterruptedException
{
>      super(clusterStatus, configuration, nodeName, cluster);
>      _indexImporterTimer = indexImporterTimer;
> +    _indexBulkTimer = indexBulkTimer;
>      _hdfsKeyValueTimer = hdfsKeyValueTimer;
>      _minimumNumberOfNodes = minimumNumberOfNodesBeforeExitingSafeMode;
>      _running.set(true);
> @@ -520,7 +522,7 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
>      }
>
>      BlurIndex index = tableContext.newInstanceBlurIndex(shardContext, directory, _mergeScheduler,
_searchExecutor,
> -        _indexCloser, _indexImporterTimer);
> +        _indexCloser, _indexImporterTimer, _indexBulkTimer);
>
>      if (_clusterStatus.isReadOnly(true, _cluster, table)) {
>        index = new BlurIndexReadOnly(index);
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> index 54a06f6..a8cbb23 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> @@ -68,6 +68,7 @@ public class LocalIndexServer extends AbstractIndexServer {
>    private final boolean _ramDir;
>    private final BlurIndexCloser _indexCloser;
>    private final Timer _timer;
> +  private final Timer _bulkTimer;
>
>    public LocalIndexServer(TableDescriptor tableDescriptor) throws IOException {
>      this(tableDescriptor, false);
> @@ -75,6 +76,7 @@ public class LocalIndexServer extends AbstractIndexServer {
>
>    public LocalIndexServer(TableDescriptor tableDescriptor, boolean ramDir) throws IOException
{
>      _timer = new Timer("Index Importer", true);
> +    _bulkTimer = new Timer("Bulk Indexing", true);
>      _closer = Closer.create();
>      _tableContext = TableContext.create(tableDescriptor);
>      _mergeScheduler = _closer.register(new SharedMergeScheduler(3, 128 * 1000 * 1000));
> @@ -166,7 +168,7 @@ public class LocalIndexServer extends AbstractIndexServer {
>    private BlurIndex openIndex(String table, String shard, Directory dir) throws CorruptIndexException,
IOException {
>      ShardContext shardContext = ShardContext.create(_tableContext, shard);
>      BlurIndexSimpleWriter index = new BlurIndexSimpleWriter(shardContext, dir, _mergeScheduler,
_searchExecutor,
> -        _indexCloser, _timer);
> +        _indexCloser, _timer, _bulkTimer);
>      return index;
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> index 192b6fa..dd92fc7 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> @@ -41,7 +41,7 @@ public abstract class BlurIndex {
>    protected ShardContext _shardContext;
>
>    public BlurIndex(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
> -      ExecutorService searchExecutor, BlurIndexCloser indexCloser, Timer indexImporterTimer)
throws IOException {
> +      ExecutorService searchExecutor, BlurIndexCloser indexCloser, Timer indexImporterTimer,
Timer bulkIndexingTimer) throws IOException {
>      _shardContext = shardContext;
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> index 9f1ce8a..4145a3d 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> @@ -28,7 +28,7 @@ public class BlurIndexReadOnly extends BlurIndex {
>    private final BlurIndex _blurIndex;
>
>    public BlurIndexReadOnly(BlurIndex blurIndex) throws IOException {
> -    super(null, null, null, null, null, null);
> +    super(null, null, null, null, null, null, null);
>      _blurIndex = blurIndex;
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> ----------------------------------------------------------------------
> diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> index 0e8d6c5..608adba 100644
> --- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> +++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> @@ -21,6 +21,7 @@ import static org.apache.blur.utils.BlurConstants.ACL_DISCOVER;
>  import static org.apache.blur.utils.BlurConstants.ACL_READ;
>  import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_LENGTH;
>
> +import java.io.Closeable;
>  import java.io.IOException;
>  import java.lang.reflect.Method;
>  import java.util.ArrayList;
> @@ -31,6 +32,7 @@ import java.util.List;
>  import java.util.Map;
>  import java.util.Set;
>  import java.util.Timer;
> +import java.util.TimerTask;
>  import java.util.concurrent.ArrayBlockingQueue;
>  import java.util.concurrent.BlockingQueue;
>  import java.util.concurrent.ConcurrentHashMap;
> @@ -71,6 +73,7 @@ import org.apache.hadoop.conf.Configuration;
>  import org.apache.hadoop.fs.FileStatus;
>  import org.apache.hadoop.fs.FileSystem;
>  import org.apache.hadoop.fs.Path;
> +import org.apache.hadoop.fs.PathFilter;
>  import org.apache.hadoop.io.IOUtils;
>  import org.apache.hadoop.io.SequenceFile;
>  import org.apache.hadoop.io.SequenceFile.CompressionType;
> @@ -130,17 +133,21 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>    private final AccessControlFactory _accessControlFactory;
>    private final Set<String> _discoverableFields;
>    private final Splitter _commaSplitter;
> +  private final Timer _bulkIndexingTimer;
> +  private final TimerTask _watchForIdleBulkWriters;
>
>    private Thread _optimizeThread;
>    private Thread _writerOpener;
>    private IndexImporter _indexImporter;
>
>    public BlurIndexSimpleWriter(ShardContext shardContext, Directory directory, SharedMergeScheduler
mergeScheduler,
> -      final ExecutorService searchExecutor, BlurIndexCloser indexCloser, Timer indexImporterTimer)
throws IOException {
> -    super(shardContext, directory, mergeScheduler, searchExecutor, indexCloser, indexImporterTimer);
> +      final ExecutorService searchExecutor, BlurIndexCloser indexCloser, Timer indexImporterTimer,
> +      Timer bulkIndexingTimer) throws IOException {
> +    super(shardContext, directory, mergeScheduler, searchExecutor, indexCloser, indexImporterTimer,
bulkIndexingTimer);
>      _commaSplitter = Splitter.on(',');
>      _bulkWriters = new ConcurrentHashMap<String, BlurIndexSimpleWriter.BulkEntry>();
>      _indexImporterTimer = indexImporterTimer;
> +    _bulkIndexingTimer = bulkIndexingTimer;
>      _searchThreadPool = searchExecutor;
>      _shardContext = shardContext;
>      _tableContext = _shardContext.getTableContext();
> @@ -189,6 +196,28 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>      _indexReader.set(wrap(DirectoryReader.open(_directory)));
>
>      openWriter();
> +    _watchForIdleBulkWriters = new TimerTask() {
> +      @Override
> +      public void run() {
> +        for (BulkEntry bulkEntry : _bulkWriters.values()) {
> +          bulkEntry._lock.lock();
> +          try {
> +            if (!bulkEntry.isClosed() && bulkEntry.isIdle()) {
> +              LOG.info("Bulk Entry [{0}] has become idle and now closing.", bulkEntry);
> +              try {
> +                bulkEntry.close();
> +              } catch (IOException e) {
> +                LOG.error("Unkown error while trying to close bulk writer when it became
idle.", e);
> +              }
> +            }
> +          } finally {
> +            bulkEntry._lock.unlock();
> +          }
> +        }
> +      }
> +    };
> +    long delay = TimeUnit.SECONDS.toMillis(30);
> +    _bulkIndexingTimer.schedule(_watchForIdleBulkWriters, delay, delay);
>    }
>
>    private synchronized void openWriter() {
> @@ -355,7 +384,17 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>    @Override
>    public void close() throws IOException {
>      _isClosed.set(true);
> -    IOUtils.cleanup(LOG, _indexImporter, _mutationQueueProcessor, _writer.get(), _indexReader.get());
> +    IOUtils.cleanup(LOG, makeCloseable(_watchForIdleBulkWriters), _indexImporter, _mutationQueueProcessor,
> +        _writer.get(), _indexReader.get());
> +  }
> +
> +  private Closeable makeCloseable(final TimerTask timerTask) {
> +    return new Closeable() {
> +      @Override
> +      public void close() throws IOException {
> +        timerTask.cancel();
> +      }
> +    };
>    }
>
>    @Override
> @@ -507,25 +546,38 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>    }
>
>    static class BulkEntry {
> -    final SequenceFile.Writer _writer;
> -    final Path _path;
>
> -    BulkEntry(Writer writer, Path path) {
> -      _writer = writer;
> -      _path = path;
> +    private final long _idleTime = TimeUnit.SECONDS.toNanos(30);
> +    private final Path _parentPath;
> +    private final String _bulkId;
> +    private final TableContext _tableContext;
> +    private final ShardContext _shardContext;
> +    private final Configuration _configuration;
> +    private final FileSystem _fileSystem;
> +    private final String _table;
> +    private final String _shard;
> +    private final Lock _lock = new ReentrantReadWriteLock().writeLock();
> +
> +    private volatile SequenceFile.Writer _writer;
> +    private volatile long _lastWrite;
> +    private volatile int _count = 0;
> +
> +    public BulkEntry(String bulkId, Path parentPath, ShardContext shardContext) throws
IOException {
> +      _bulkId = bulkId;
> +      _parentPath = parentPath;
> +      _shardContext = shardContext;
> +      _tableContext = shardContext.getTableContext();
> +      _configuration = _tableContext.getConfiguration();
> +      _fileSystem = _parentPath.getFileSystem(_configuration);
> +      _shard = _shardContext.getShard();
> +      _table = _tableContext.getTable();
>      }
> -  }
>
> -  public BulkEntry startBulkMutate(String bulkId) throws IOException {
> -    BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> -    if (bulkEntry == null) {
> -      Path tablePath = _tableContext.getTablePath();
> -      Path bulk = new Path(tablePath, "bulk");
> -      Path bulkInstance = new Path(bulk, bulkId);
> -      Path path = new Path(bulkInstance, _shardContext.getShard() + ".notsorted.seq");
> -      Configuration configuration = _tableContext.getConfiguration();
> -      FileSystem fileSystem = path.getFileSystem(configuration);
> +    public boolean isClosed() {
> +      return _writer == null;
> +    }
>
> +    private Writer openSeqWriter() throws IOException {
>        Progressable progress = new Progressable() {
>          @Override
>          public void progress() {
> @@ -535,7 +587,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>        final CompressionCodec codec;
>        final CompressionType type;
>
> -      if (isSnappyCodecLoaded(configuration)) {
> +      if (isSnappyCodecLoaded(_configuration)) {
>          codec = new SnappyCodec();
>          type = CompressionType.BLOCK;
>        } else {
> @@ -543,180 +595,211 @@ public class BlurIndexSimpleWriter extends BlurIndex {
>          type = CompressionType.NONE;
>        }
>
> -      Writer writer = SequenceFile.createWriter(fileSystem, configuration, path, Text.class,
RowMutationWritable.class,
> -          type, codec, progress);
> +      Path path = new Path(_parentPath, _shard + "." + _count + ".unsorted.seq");
>
> -      bulkEntry = new BulkEntry(writer, path);
> -      _bulkWriters.put(bulkId, bulkEntry);
> -    } else {
> -      LOG.info("Bulk [{0}] mutate already started on shard [{1}] in table [{2}].", bulkId,
_shardContext.getShard(),
> -          _tableContext.getTable());
> +      _count++;
> +
> +      return SequenceFile.createWriter(_fileSystem, _configuration, path, Text.class,
RowMutationWritable.class, type,
> +          codec, progress);
>      }
> -    return bulkEntry;
> -  }
>
> -  private boolean isSnappyCodecLoaded(Configuration configuration) {
> -    try {
> -      Method methodHadoop1 = SnappyCodec.class.getMethod("isNativeSnappyLoaded", new
Class[] { Configuration.class });
> -      Boolean loaded = (Boolean) methodHadoop1.invoke(null, new Object[] { configuration
});
> -      if (loaded != null && loaded) {
> -        LOG.info("Using SnappyCodec");
> -        return true;
> -      } else {
> -        LOG.info("Not using SnappyCodec");
> -        return false;
> -      }
> -    } catch (NoSuchMethodException e) {
> -      Method methodHadoop2;
> +    public void close() throws IOException {
> +      _lock.lock();
>        try {
> -        methodHadoop2 = SnappyCodec.class.getMethod("isNativeCodeLoaded", new Class[]
{});
> -      } catch (NoSuchMethodException ex) {
> -        LOG.info("Can not determine if SnappyCodec is loaded.");
> -        return false;
> -      } catch (SecurityException ex) {
> -        LOG.error("Not allowed.", ex);
> -        return false;
> +        if (_writer != null) {
> +          _writer.close();
> +          _writer = null;
> +        }
> +      } finally {
> +        _lock.unlock();
>        }
> -      Boolean loaded;
> +    }
> +
> +    public void append(Text key, RowMutationWritable rowMutationWritable) throws IOException
{
> +      _lock.lock();
>        try {
> -        loaded = (Boolean) methodHadoop2.invoke(null);
> -        if (loaded != null && loaded) {
> -          LOG.info("Using SnappyCodec");
> -          return true;
> -        } else {
> -          LOG.info("Not using SnappyCodec");
> -          return false;
> -        }
> -      } catch (Exception ex) {
> -        LOG.info("Unknown error while trying to determine if SnappyCodec is loaded.",
ex);
> -        return false;
> +        getWriter().append(key, rowMutationWritable);
> +        _lastWrite = System.nanoTime();
> +      } finally {
> +        _lock.unlock();
> +      }
> +    }
> +
> +    private SequenceFile.Writer getWriter() throws IOException {
> +      if (_writer == null) {
> +        _writer = openSeqWriter();
> +        _lastWrite = System.nanoTime();
> +      }
> +      return _writer;
> +    }
> +
> +    public boolean isIdle() {
> +      if (_lastWrite + _idleTime < System.nanoTime()) {
> +        return true;
>        }
> -    } catch (SecurityException e) {
> -      LOG.error("Not allowed.", e);
> -      return false;
> -    } catch (Exception e) {
> -      LOG.info("Unknown error while trying to determine if SnappyCodec is loaded.",
e);
>        return false;
>      }
> -  }
>
> -  @Override
> -  public void finishBulkMutate(final String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException {
> -    final String table = _tableContext.getTable();
> -    final String shard = _shardContext.getShard();
> +    public List<Path> getUnsortedFiles() throws IOException {
> +      FileStatus[] listStatus = _fileSystem.listStatus(_parentPath, new PathFilter()
{
> +        @Override
> +        public boolean accept(Path path) {
> +          return path.getName().matches(_shard + "\\.[0-9].*\\.unsorted\\.seq");
> +        }
> +      });
>
> -    final BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> -    if (bulkEntry == null) {
> -      LOG.info("Shard [{2}/{3}] Id [{0}] Nothing to apply.", bulkId, apply, table, shard);
> -      return;
> +      List<Path> unsortedPaths = new ArrayList<Path>();
> +      for (FileStatus fileStatus : listStatus) {
> +        unsortedPaths.add(fileStatus.getPath());
> +      }
> +      return unsortedPaths;
>      }
> -    LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply [{1}]", bulkId, apply,
table, shard);
> -    bulkEntry._writer.close();
>
> -    Configuration configuration = _tableContext.getConfiguration();
> -    final Path path = bulkEntry._path;
> -    final FileSystem fileSystem = path.getFileSystem(configuration);
> +    public void cleanupFiles(List<Path> unsortedPaths, Path sorted) throws IOException
{
> +      for (Path p : unsortedPaths) {
> +        _fileSystem.delete(p, false);
> +      }
> +      if (sorted != null) {
> +        _fileSystem.delete(sorted, false);
> +      }
> +      removeParentIfLastFile(_fileSystem, _parentPath);
> +    }
> +
> +    public IndexAction getIndexAction() throws IOException {
> +      return new IndexAction() {
> +        private Path _sorted;
> +        private List<Path> _unsortedPaths;
>
> -    if (!apply) {
> -      fileSystem.delete(path, false);
> -      Path parent = path.getParent();
> -      removeParentIfLastFile(fileSystem, parent);
> -    } else {
> -      Runnable runnable = new Runnable() {
>          @Override
> -        public void run() {
> -          try {
> -            process(new IndexAction() {
> -              private Path _sorted;
> -
> -              @Override
> -              public void performMutate(IndexSearcherCloseable searcher, IndexWriter
writer) throws IOException {
> -                Configuration configuration = _tableContext.getConfiguration();
> -
> -                SequenceFile.Sorter sorter = new Sorter(fileSystem, Text.class, RowMutationWritable.class,
> -                    configuration);
> -
> -                _sorted = new Path(path.getParent(), shard + ".sorted.seq");
> -
> -                LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates path [{0}] sorted
path [{1}]", path, _sorted, table,
> -                    shard, bulkId);
> -                sorter.sort(path, _sorted);
> -
> -                LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates sorted path [{0}]",
_sorted, table, shard, bulkId);
> -                Reader reader = new SequenceFile.Reader(fileSystem, _sorted, configuration);
> -
> -                Text key = new Text();
> -                RowMutationWritable value = new RowMutationWritable();
> -
> -                Text last = null;
> -                List<RowMutation> list = new ArrayList<RowMutation>();
> -                while (reader.next(key, value)) {
> -                  if (!key.equals(last)) {
> -                    flushMutates(searcher, writer, list);
> -                    last = new Text(key);
> -                    list.clear();
> -                  }
> -                  list.add(value.getRowMutation().deepCopy());
> -                }
> -                flushMutates(searcher, writer, list);
> -                reader.close();
> -                LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying mutates starting
commit.", table, shard, bulkId);
> -              }
> +        public void performMutate(IndexSearcherCloseable searcher, IndexWriter writer)
throws IOException {
> +          Configuration configuration = _tableContext.getConfiguration();
>
> -              private void flushMutates(IndexSearcherCloseable searcher, IndexWriter
writer, List<RowMutation> list)
> -                  throws IOException {
> -                if (!list.isEmpty()) {
> -                  List<RowMutation> reduceMutates;
> -                  try {
> -                    reduceMutates = MutatableAction.reduceMutates(list);
> -                  } catch (BlurException e) {
> -                    throw new IOException(e);
> -                  }
> -                  for (RowMutation mutation : reduceMutates) {
> -                    MutatableAction mutatableAction = new MutatableAction(_shardContext);
> -                    mutatableAction.mutate(mutation);
> -                    mutatableAction.performMutate(searcher, writer);
> -                  }
> -                }
> -              }
> +          SequenceFile.Sorter sorter = new Sorter(_fileSystem, Text.class, RowMutationWritable.class,
configuration);
>
> -              private void cleanupFiles() throws IOException {
> -                fileSystem.delete(path, false);
> -                fileSystem.delete(_sorted, false);
> -                Path parent = path.getParent();
> -                removeParentIfLastFile(fileSystem, parent);
> -              }
> +          _unsortedPaths = getUnsortedFiles();
>
> -              @Override
> -              public void doPreRollback(IndexWriter writer) throws IOException {
> +          _sorted = new Path(_parentPath, _shard + ".sorted.seq");
>
> -              }
> +          LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates paths [{0}] sorted path
[{1}]", _unsortedPaths, _sorted,
> +              _table, _shard, _bulkId);
> +          sorter.sort(_unsortedPaths.toArray(new Path[_unsortedPaths.size()]), _sorted,
true);
>
> -              @Override
> -              public void doPreCommit(IndexSearcherCloseable indexSearcher, IndexWriter
writer) throws IOException {
> +          LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates sorted path [{0}]", _sorted,
_table, _shard, _bulkId);
> +          Reader reader = new SequenceFile.Reader(_fileSystem, _sorted, configuration);
>
> -              }
> +          Text key = new Text();
> +          RowMutationWritable value = new RowMutationWritable();
>
> -              @Override
> -              public void doPostRollback(IndexWriter writer) throws IOException {
> -                cleanupFiles();
> -              }
> +          Text last = null;
> +          List<RowMutation> list = new ArrayList<RowMutation>();
> +          while (reader.next(key, value)) {
> +            if (!key.equals(last)) {
> +              flushMutates(searcher, writer, list);
> +              last = new Text(key);
> +              list.clear();
> +            }
> +            list.add(value.getRowMutation().deepCopy());
> +          }
> +          flushMutates(searcher, writer, list);
> +          reader.close();
> +          LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying mutates starting commit.",
_table, _shard, _bulkId);
> +        }
>
> -              @Override
> -              public void doPostCommit(IndexWriter writer) throws IOException {
> -                cleanupFiles();
> -              }
> -            });
> -          } catch (IOException e) {
> -            LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while trying to finish
the bulk updates.", table, shard,
> -                bulkId, e);
> +        private void flushMutates(IndexSearcherCloseable searcher, IndexWriter writer,
List<RowMutation> list)
> +            throws IOException {
> +          if (!list.isEmpty()) {
> +            List<RowMutation> reduceMutates;
> +            try {
> +              reduceMutates = MutatableAction.reduceMutates(list);
> +            } catch (BlurException e) {
> +              throw new IOException(e);
> +            }
> +            for (RowMutation mutation : reduceMutates) {
> +              MutatableAction mutatableAction = new MutatableAction(_shardContext);
> +              mutatableAction.mutate(mutation);
> +              mutatableAction.performMutate(searcher, writer);
> +            }
>            }
>          }
> +
> +        @Override
> +        public void doPreRollback(IndexWriter writer) throws IOException {
> +
> +        }
> +
> +        @Override
> +        public void doPreCommit(IndexSearcherCloseable indexSearcher, IndexWriter writer)
throws IOException {
> +
> +        }
> +
> +        @Override
> +        public void doPostRollback(IndexWriter writer) throws IOException {
> +          cleanupFiles(_unsortedPaths, _sorted);
> +        }
> +
> +        @Override
> +        public void doPostCommit(IndexWriter writer) throws IOException {
> +          cleanupFiles(_unsortedPaths, _sorted);
> +        }
>        };
> +    }
> +
> +    @Override
> +    public String toString() {
> +      return "BulkEntry [_bulkId=" + _bulkId + ", _table=" + _table + ", _shard=" +
_shard + ", _idleTime=" + _idleTime
> +          + ", _lastWrite=" + _lastWrite + ", _count=" + _count + "]";
> +    }
> +
> +  }
> +
> +  public synchronized BulkEntry startBulkMutate(String bulkId) throws IOException {
> +    BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> +    if (bulkEntry == null) {
> +      Path tablePath = _tableContext.getTablePath();
> +      Path bulk = new Path(tablePath, "bulk");
> +      Path bulkInstance = new Path(bulk, bulkId);
> +      Path path = new Path(bulkInstance, _shardContext.getShard() + ".notsorted.seq");
> +
> +      bulkEntry = new BulkEntry(bulkId, path, _shardContext);
> +      _bulkWriters.put(bulkId, bulkEntry);
> +    } else {
> +      LOG.info("Bulk [{0}] mutate already started on shard [{1}] in table [{2}].", bulkId,
_shardContext.getShard(),
> +          _tableContext.getTable());
> +    }
> +    return bulkEntry;
> +  }
> +
> +  @Override
> +  public void finishBulkMutate(final String bulkId, boolean apply, boolean blockUntilComplete)
throws IOException {
> +    final String table = _tableContext.getTable();
> +    final String shard = _shardContext.getShard();
> +
> +    final BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> +    if (bulkEntry == null) {
> +      LOG.info("Shard [{2}/{3}] Id [{0}] Nothing to apply.", bulkId, apply, table, shard);
> +      return;
> +    }
> +    LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply [{1}]", bulkId, apply,
table, shard);
> +    bulkEntry.close();
> +
> +    if (!apply) {
> +      bulkEntry.cleanupFiles(bulkEntry.getUnsortedFiles(), null);
> +    } else {
> +      final IndexAction indexAction = bulkEntry.getIndexAction();
>        if (blockUntilComplete) {
> -        runnable.run();
> +        process(indexAction);
>        } else {
> -        Thread thread = new Thread(runnable);
> +        Thread thread = new Thread(new Runnable() {
> +          @Override
> +          public void run() {
> +            try {
> +              process(indexAction);
> +            } catch (IOException e) {
> +              LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while trying to finish
the bulk updates.", table,
> +                  shard, bulkId, e);

I think e is misplaced here?

Thanks,
--tim

Mime
View raw message