incubator-blur-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aaron McCurry <amccu...@gmail.com>
Subject Re: [1/3] git commit: Fixing issue with bulk mutate where writers may become idle and later they fail because of lease exception issues.
Date Wed, 11 Feb 2015 17:45:32 GMT
Yes I think you are correct.

On Wed, Feb 11, 2015 at 9:43 AM, Tim Williams <williamstw@gmail.com> wrote:

> On Wed, Feb 11, 2015 at 9:19 AM,  <amccurry@apache.org> wrote:
> > Repository: incubator-blur
> > Updated Branches:
> >   refs/heads/master 1f61fff25 -> 45c9d4cdb
> >
> >
> > Fixing issue with bulk mutate where writers may become idle and later
> they fail because of lease exception issues.
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
> > Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9714a1de
> > Tree:
> http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9714a1de
> > Diff:
> http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9714a1de
> >
> > Branch: refs/heads/master
> > Commit: 9714a1ded7788a008640a0ec6c0bfda585960c96
> > Parents: 1f61fff
> > Author: Aaron McCurry <amccurry@gmail.com>
> > Authored: Wed Feb 11 08:50:49 2015 -0500
> > Committer: Aaron McCurry <amccurry@gmail.com>
> > Committed: Wed Feb 11 08:50:49 2015 -0500
> >
> > ----------------------------------------------------------------------
> >  .../indexserver/DistributedIndexServer.java     |   8 +-
> >  .../manager/indexserver/LocalIndexServer.java   |   4 +-
> >  .../apache/blur/manager/writer/BlurIndex.java   |   2 +-
> >  .../blur/manager/writer/BlurIndexReadOnly.java  |   2 +-
> >  .../manager/writer/BlurIndexSimpleWriter.java   | 461
> ++++++++++++-------
> >  .../org/apache/blur/server/TableContext.java    |   9 +-
> >  .../blur/thrift/ThriftBlurShardServer.java      |  12 +-
> >  .../blur/command/ShardCommandManagerTest.java   |   2 +-
> >  .../writer/BlurIndexSimpleWriterTest.java       |  14 +-
> >  .../blur/manager/writer/IndexImporterTest.java  |   2 +-
> >  .../apache/blur/thrift/BlurClusterTestBase.java |   2 +-
> >  11 files changed, 326 insertions(+), 192 deletions(-)
> > ----------------------------------------------------------------------
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> > index 79a5b3c..324090f 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
> > @@ -116,15 +116,17 @@ public class DistributedIndexServer extends
> AbstractDistributedIndexServer {
> >    private final int _minimumNumberOfNodes;
> >    private final Timer _hdfsKeyValueTimer;
> >    private final Timer _indexImporterTimer;
> > +  private final Timer _indexBulkTimer;
> >
> >    public DistributedIndexServer(Configuration configuration, ZooKeeper
> zookeeper, ClusterStatus clusterStatus,
> >        BlurFilterCache filterCache, BlockCacheDirectoryFactory
> blockCacheDirectoryFactory,
> >        DistributedLayoutFactory distributedLayoutFactory, String
> cluster, String nodeName, long safeModeDelay,
> >        int shardOpenerThreadCount, int maxMergeThreads, int
> internalSearchThreads,
> > -      int minimumNumberOfNodesBeforeExitingSafeMode, Timer
> hdfsKeyValueTimer, Timer indexImporterTimer, long smallMergeThreshold)
> > -      throws KeeperException, InterruptedException {
> > +      int minimumNumberOfNodesBeforeExitingSafeMode, Timer
> hdfsKeyValueTimer, Timer indexImporterTimer,
> > +      long smallMergeThreshold, Timer indexBulkTimer) throws
> KeeperException, InterruptedException {
> >      super(clusterStatus, configuration, nodeName, cluster);
> >      _indexImporterTimer = indexImporterTimer;
> > +    _indexBulkTimer = indexBulkTimer;
> >      _hdfsKeyValueTimer = hdfsKeyValueTimer;
> >      _minimumNumberOfNodes = minimumNumberOfNodesBeforeExitingSafeMode;
> >      _running.set(true);
> > @@ -520,7 +522,7 @@ public class DistributedIndexServer extends
> AbstractDistributedIndexServer {
> >      }
> >
> >      BlurIndex index = tableContext.newInstanceBlurIndex(shardContext,
> directory, _mergeScheduler, _searchExecutor,
> > -        _indexCloser, _indexImporterTimer);
> > +        _indexCloser, _indexImporterTimer, _indexBulkTimer);
> >
> >      if (_clusterStatus.isReadOnly(true, _cluster, table)) {
> >        index = new BlurIndexReadOnly(index);
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> > index 54a06f6..a8cbb23 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/indexserver/LocalIndexServer.java
> > @@ -68,6 +68,7 @@ public class LocalIndexServer extends
> AbstractIndexServer {
> >    private final boolean _ramDir;
> >    private final BlurIndexCloser _indexCloser;
> >    private final Timer _timer;
> > +  private final Timer _bulkTimer;
> >
> >    public LocalIndexServer(TableDescriptor tableDescriptor) throws
> IOException {
> >      this(tableDescriptor, false);
> > @@ -75,6 +76,7 @@ public class LocalIndexServer extends
> AbstractIndexServer {
> >
> >    public LocalIndexServer(TableDescriptor tableDescriptor, boolean
> ramDir) throws IOException {
> >      _timer = new Timer("Index Importer", true);
> > +    _bulkTimer = new Timer("Bulk Indexing", true);
> >      _closer = Closer.create();
> >      _tableContext = TableContext.create(tableDescriptor);
> >      _mergeScheduler = _closer.register(new SharedMergeScheduler(3, 128
> * 1000 * 1000));
> > @@ -166,7 +168,7 @@ public class LocalIndexServer extends
> AbstractIndexServer {
> >    private BlurIndex openIndex(String table, String shard, Directory
> dir) throws CorruptIndexException, IOException {
> >      ShardContext shardContext = ShardContext.create(_tableContext,
> shard);
> >      BlurIndexSimpleWriter index = new
> BlurIndexSimpleWriter(shardContext, dir, _mergeScheduler, _searchExecutor,
> > -        _indexCloser, _timer);
> > +        _indexCloser, _timer, _bulkTimer);
> >      return index;
> >    }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > index 192b6fa..dd92fc7 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndex.java
> > @@ -41,7 +41,7 @@ public abstract class BlurIndex {
> >    protected ShardContext _shardContext;
> >
> >    public BlurIndex(ShardContext shardContext, Directory directory,
> SharedMergeScheduler mergeScheduler,
> > -      ExecutorService searchExecutor, BlurIndexCloser indexCloser,
> Timer indexImporterTimer) throws IOException {
> > +      ExecutorService searchExecutor, BlurIndexCloser indexCloser,
> Timer indexImporterTimer, Timer bulkIndexingTimer) throws IOException {
> >      _shardContext = shardContext;
> >    }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > index 9f1ce8a..4145a3d 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexReadOnly.java
> > @@ -28,7 +28,7 @@ public class BlurIndexReadOnly extends BlurIndex {
> >    private final BlurIndex _blurIndex;
> >
> >    public BlurIndexReadOnly(BlurIndex blurIndex) throws IOException {
> > -    super(null, null, null, null, null, null);
> > +    super(null, null, null, null, null, null, null);
> >      _blurIndex = blurIndex;
> >    }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9714a1de/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> > ----------------------------------------------------------------------
> > diff --git
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> > index 0e8d6c5..608adba 100644
> > ---
> a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> > +++
> b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
> > @@ -21,6 +21,7 @@ import static
> org.apache.blur.utils.BlurConstants.ACL_DISCOVER;
> >  import static org.apache.blur.utils.BlurConstants.ACL_READ;
> >  import static
> org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_LENGTH;
> >
> > +import java.io.Closeable;
> >  import java.io.IOException;
> >  import java.lang.reflect.Method;
> >  import java.util.ArrayList;
> > @@ -31,6 +32,7 @@ import java.util.List;
> >  import java.util.Map;
> >  import java.util.Set;
> >  import java.util.Timer;
> > +import java.util.TimerTask;
> >  import java.util.concurrent.ArrayBlockingQueue;
> >  import java.util.concurrent.BlockingQueue;
> >  import java.util.concurrent.ConcurrentHashMap;
> > @@ -71,6 +73,7 @@ import org.apache.hadoop.conf.Configuration;
> >  import org.apache.hadoop.fs.FileStatus;
> >  import org.apache.hadoop.fs.FileSystem;
> >  import org.apache.hadoop.fs.Path;
> > +import org.apache.hadoop.fs.PathFilter;
> >  import org.apache.hadoop.io.IOUtils;
> >  import org.apache.hadoop.io.SequenceFile;
> >  import org.apache.hadoop.io.SequenceFile.CompressionType;
> > @@ -130,17 +133,21 @@ public class BlurIndexSimpleWriter extends
> BlurIndex {
> >    private final AccessControlFactory _accessControlFactory;
> >    private final Set<String> _discoverableFields;
> >    private final Splitter _commaSplitter;
> > +  private final Timer _bulkIndexingTimer;
> > +  private final TimerTask _watchForIdleBulkWriters;
> >
> >    private Thread _optimizeThread;
> >    private Thread _writerOpener;
> >    private IndexImporter _indexImporter;
> >
> >    public BlurIndexSimpleWriter(ShardContext shardContext, Directory
> directory, SharedMergeScheduler mergeScheduler,
> > -      final ExecutorService searchExecutor, BlurIndexCloser
> indexCloser, Timer indexImporterTimer) throws IOException {
> > -    super(shardContext, directory, mergeScheduler, searchExecutor,
> indexCloser, indexImporterTimer);
> > +      final ExecutorService searchExecutor, BlurIndexCloser
> indexCloser, Timer indexImporterTimer,
> > +      Timer bulkIndexingTimer) throws IOException {
> > +    super(shardContext, directory, mergeScheduler, searchExecutor,
> indexCloser, indexImporterTimer, bulkIndexingTimer);
> >      _commaSplitter = Splitter.on(',');
> >      _bulkWriters = new ConcurrentHashMap<String,
> BlurIndexSimpleWriter.BulkEntry>();
> >      _indexImporterTimer = indexImporterTimer;
> > +    _bulkIndexingTimer = bulkIndexingTimer;
> >      _searchThreadPool = searchExecutor;
> >      _shardContext = shardContext;
> >      _tableContext = _shardContext.getTableContext();
> > @@ -189,6 +196,28 @@ public class BlurIndexSimpleWriter extends
> BlurIndex {
> >      _indexReader.set(wrap(DirectoryReader.open(_directory)));
> >
> >      openWriter();
> > +    _watchForIdleBulkWriters = new TimerTask() {
> > +      @Override
> > +      public void run() {
> > +        for (BulkEntry bulkEntry : _bulkWriters.values()) {
> > +          bulkEntry._lock.lock();
> > +          try {
> > +            if (!bulkEntry.isClosed() && bulkEntry.isIdle()) {
> > +              LOG.info("Bulk Entry [{0}] has become idle and now
> closing.", bulkEntry);
> > +              try {
> > +                bulkEntry.close();
> > +              } catch (IOException e) {
> > +                LOG.error("Unkown error while trying to close bulk
> writer when it became idle.", e);
> > +              }
> > +            }
> > +          } finally {
> > +            bulkEntry._lock.unlock();
> > +          }
> > +        }
> > +      }
> > +    };
> > +    long delay = TimeUnit.SECONDS.toMillis(30);
> > +    _bulkIndexingTimer.schedule(_watchForIdleBulkWriters, delay, delay);
> >    }
> >
> >    private synchronized void openWriter() {
> > @@ -355,7 +384,17 @@ public class BlurIndexSimpleWriter extends
> BlurIndex {
> >    @Override
> >    public void close() throws IOException {
> >      _isClosed.set(true);
> > -    IOUtils.cleanup(LOG, _indexImporter, _mutationQueueProcessor,
> _writer.get(), _indexReader.get());
> > +    IOUtils.cleanup(LOG, makeCloseable(_watchForIdleBulkWriters),
> _indexImporter, _mutationQueueProcessor,
> > +        _writer.get(), _indexReader.get());
> > +  }
> > +
> > +  private Closeable makeCloseable(final TimerTask timerTask) {
> > +    return new Closeable() {
> > +      @Override
> > +      public void close() throws IOException {
> > +        timerTask.cancel();
> > +      }
> > +    };
> >    }
> >
> >    @Override
> > @@ -507,25 +546,38 @@ public class BlurIndexSimpleWriter extends
> BlurIndex {
> >    }
> >
> >    static class BulkEntry {
> > -    final SequenceFile.Writer _writer;
> > -    final Path _path;
> >
> > -    BulkEntry(Writer writer, Path path) {
> > -      _writer = writer;
> > -      _path = path;
> > +    private final long _idleTime = TimeUnit.SECONDS.toNanos(30);
> > +    private final Path _parentPath;
> > +    private final String _bulkId;
> > +    private final TableContext _tableContext;
> > +    private final ShardContext _shardContext;
> > +    private final Configuration _configuration;
> > +    private final FileSystem _fileSystem;
> > +    private final String _table;
> > +    private final String _shard;
> > +    private final Lock _lock = new ReentrantReadWriteLock().writeLock();
> > +
> > +    private volatile SequenceFile.Writer _writer;
> > +    private volatile long _lastWrite;
> > +    private volatile int _count = 0;
> > +
> > +    public BulkEntry(String bulkId, Path parentPath, ShardContext
> shardContext) throws IOException {
> > +      _bulkId = bulkId;
> > +      _parentPath = parentPath;
> > +      _shardContext = shardContext;
> > +      _tableContext = shardContext.getTableContext();
> > +      _configuration = _tableContext.getConfiguration();
> > +      _fileSystem = _parentPath.getFileSystem(_configuration);
> > +      _shard = _shardContext.getShard();
> > +      _table = _tableContext.getTable();
> >      }
> > -  }
> >
> > -  public BulkEntry startBulkMutate(String bulkId) throws IOException {
> > -    BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> > -    if (bulkEntry == null) {
> > -      Path tablePath = _tableContext.getTablePath();
> > -      Path bulk = new Path(tablePath, "bulk");
> > -      Path bulkInstance = new Path(bulk, bulkId);
> > -      Path path = new Path(bulkInstance, _shardContext.getShard() +
> ".notsorted.seq");
> > -      Configuration configuration = _tableContext.getConfiguration();
> > -      FileSystem fileSystem = path.getFileSystem(configuration);
> > +    public boolean isClosed() {
> > +      return _writer == null;
> > +    }
> >
> > +    private Writer openSeqWriter() throws IOException {
> >        Progressable progress = new Progressable() {
> >          @Override
> >          public void progress() {
> > @@ -535,7 +587,7 @@ public class BlurIndexSimpleWriter extends BlurIndex
> {
> >        final CompressionCodec codec;
> >        final CompressionType type;
> >
> > -      if (isSnappyCodecLoaded(configuration)) {
> > +      if (isSnappyCodecLoaded(_configuration)) {
> >          codec = new SnappyCodec();
> >          type = CompressionType.BLOCK;
> >        } else {
> > @@ -543,180 +595,211 @@ public class BlurIndexSimpleWriter extends
> BlurIndex {
> >          type = CompressionType.NONE;
> >        }
> >
> > -      Writer writer = SequenceFile.createWriter(fileSystem,
> configuration, path, Text.class, RowMutationWritable.class,
> > -          type, codec, progress);
> > +      Path path = new Path(_parentPath, _shard + "." + _count +
> ".unsorted.seq");
> >
> > -      bulkEntry = new BulkEntry(writer, path);
> > -      _bulkWriters.put(bulkId, bulkEntry);
> > -    } else {
> > -      LOG.info("Bulk [{0}] mutate already started on shard [{1}] in
> table [{2}].", bulkId, _shardContext.getShard(),
> > -          _tableContext.getTable());
> > +      _count++;
> > +
> > +      return SequenceFile.createWriter(_fileSystem, _configuration,
> path, Text.class, RowMutationWritable.class, type,
> > +          codec, progress);
> >      }
> > -    return bulkEntry;
> > -  }
> >
> > -  private boolean isSnappyCodecLoaded(Configuration configuration) {
> > -    try {
> > -      Method methodHadoop1 =
> SnappyCodec.class.getMethod("isNativeSnappyLoaded", new Class[] {
> Configuration.class });
> > -      Boolean loaded = (Boolean) methodHadoop1.invoke(null, new
> Object[] { configuration });
> > -      if (loaded != null && loaded) {
> > -        LOG.info("Using SnappyCodec");
> > -        return true;
> > -      } else {
> > -        LOG.info("Not using SnappyCodec");
> > -        return false;
> > -      }
> > -    } catch (NoSuchMethodException e) {
> > -      Method methodHadoop2;
> > +    public void close() throws IOException {
> > +      _lock.lock();
> >        try {
> > -        methodHadoop2 =
> SnappyCodec.class.getMethod("isNativeCodeLoaded", new Class[] {});
> > -      } catch (NoSuchMethodException ex) {
> > -        LOG.info("Can not determine if SnappyCodec is loaded.");
> > -        return false;
> > -      } catch (SecurityException ex) {
> > -        LOG.error("Not allowed.", ex);
> > -        return false;
> > +        if (_writer != null) {
> > +          _writer.close();
> > +          _writer = null;
> > +        }
> > +      } finally {
> > +        _lock.unlock();
> >        }
> > -      Boolean loaded;
> > +    }
> > +
> > +    public void append(Text key, RowMutationWritable
> rowMutationWritable) throws IOException {
> > +      _lock.lock();
> >        try {
> > -        loaded = (Boolean) methodHadoop2.invoke(null);
> > -        if (loaded != null && loaded) {
> > -          LOG.info("Using SnappyCodec");
> > -          return true;
> > -        } else {
> > -          LOG.info("Not using SnappyCodec");
> > -          return false;
> > -        }
> > -      } catch (Exception ex) {
> > -        LOG.info("Unknown error while trying to determine if
> SnappyCodec is loaded.", ex);
> > -        return false;
> > +        getWriter().append(key, rowMutationWritable);
> > +        _lastWrite = System.nanoTime();
> > +      } finally {
> > +        _lock.unlock();
> > +      }
> > +    }
> > +
> > +    private SequenceFile.Writer getWriter() throws IOException {
> > +      if (_writer == null) {
> > +        _writer = openSeqWriter();
> > +        _lastWrite = System.nanoTime();
> > +      }
> > +      return _writer;
> > +    }
> > +
> > +    public boolean isIdle() {
> > +      if (_lastWrite + _idleTime < System.nanoTime()) {
> > +        return true;
> >        }
> > -    } catch (SecurityException e) {
> > -      LOG.error("Not allowed.", e);
> > -      return false;
> > -    } catch (Exception e) {
> > -      LOG.info("Unknown error while trying to determine if SnappyCodec
> is loaded.", e);
> >        return false;
> >      }
> > -  }
> >
> > -  @Override
> > -  public void finishBulkMutate(final String bulkId, boolean apply,
> boolean blockUntilComplete) throws IOException {
> > -    final String table = _tableContext.getTable();
> > -    final String shard = _shardContext.getShard();
> > +    public List<Path> getUnsortedFiles() throws IOException {
> > +      FileStatus[] listStatus = _fileSystem.listStatus(_parentPath, new
> PathFilter() {
> > +        @Override
> > +        public boolean accept(Path path) {
> > +          return path.getName().matches(_shard +
> "\\.[0-9].*\\.unsorted\\.seq");
> > +        }
> > +      });
> >
> > -    final BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> > -    if (bulkEntry == null) {
> > -      LOG.info("Shard [{2}/{3}] Id [{0}] Nothing to apply.", bulkId,
> apply, table, shard);
> > -      return;
> > +      List<Path> unsortedPaths = new ArrayList<Path>();
> > +      for (FileStatus fileStatus : listStatus) {
> > +        unsortedPaths.add(fileStatus.getPath());
> > +      }
> > +      return unsortedPaths;
> >      }
> > -    LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply
> [{1}]", bulkId, apply, table, shard);
> > -    bulkEntry._writer.close();
> >
> > -    Configuration configuration = _tableContext.getConfiguration();
> > -    final Path path = bulkEntry._path;
> > -    final FileSystem fileSystem = path.getFileSystem(configuration);
> > +    public void cleanupFiles(List<Path> unsortedPaths, Path sorted)
> throws IOException {
> > +      for (Path p : unsortedPaths) {
> > +        _fileSystem.delete(p, false);
> > +      }
> > +      if (sorted != null) {
> > +        _fileSystem.delete(sorted, false);
> > +      }
> > +      removeParentIfLastFile(_fileSystem, _parentPath);
> > +    }
> > +
> > +    public IndexAction getIndexAction() throws IOException {
> > +      return new IndexAction() {
> > +        private Path _sorted;
> > +        private List<Path> _unsortedPaths;
> >
> > -    if (!apply) {
> > -      fileSystem.delete(path, false);
> > -      Path parent = path.getParent();
> > -      removeParentIfLastFile(fileSystem, parent);
> > -    } else {
> > -      Runnable runnable = new Runnable() {
> >          @Override
> > -        public void run() {
> > -          try {
> > -            process(new IndexAction() {
> > -              private Path _sorted;
> > -
> > -              @Override
> > -              public void performMutate(IndexSearcherCloseable
> searcher, IndexWriter writer) throws IOException {
> > -                Configuration configuration =
> _tableContext.getConfiguration();
> > -
> > -                SequenceFile.Sorter sorter = new Sorter(fileSystem,
> Text.class, RowMutationWritable.class,
> > -                    configuration);
> > -
> > -                _sorted = new Path(path.getParent(), shard +
> ".sorted.seq");
> > -
> > -                LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates path
> [{0}] sorted path [{1}]", path, _sorted, table,
> > -                    shard, bulkId);
> > -                sorter.sort(path, _sorted);
> > -
> > -                LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates
> sorted path [{0}]", _sorted, table, shard, bulkId);
> > -                Reader reader = new SequenceFile.Reader(fileSystem,
> _sorted, configuration);
> > -
> > -                Text key = new Text();
> > -                RowMutationWritable value = new RowMutationWritable();
> > -
> > -                Text last = null;
> > -                List<RowMutation> list = new ArrayList<RowMutation>();
> > -                while (reader.next(key, value)) {
> > -                  if (!key.equals(last)) {
> > -                    flushMutates(searcher, writer, list);
> > -                    last = new Text(key);
> > -                    list.clear();
> > -                  }
> > -                  list.add(value.getRowMutation().deepCopy());
> > -                }
> > -                flushMutates(searcher, writer, list);
> > -                reader.close();
> > -                LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying
> mutates starting commit.", table, shard, bulkId);
> > -              }
> > +        public void performMutate(IndexSearcherCloseable searcher,
> IndexWriter writer) throws IOException {
> > +          Configuration configuration =
> _tableContext.getConfiguration();
> >
> > -              private void flushMutates(IndexSearcherCloseable
> searcher, IndexWriter writer, List<RowMutation> list)
> > -                  throws IOException {
> > -                if (!list.isEmpty()) {
> > -                  List<RowMutation> reduceMutates;
> > -                  try {
> > -                    reduceMutates = MutatableAction.reduceMutates(list);
> > -                  } catch (BlurException e) {
> > -                    throw new IOException(e);
> > -                  }
> > -                  for (RowMutation mutation : reduceMutates) {
> > -                    MutatableAction mutatableAction = new
> MutatableAction(_shardContext);
> > -                    mutatableAction.mutate(mutation);
> > -                    mutatableAction.performMutate(searcher, writer);
> > -                  }
> > -                }
> > -              }
> > +          SequenceFile.Sorter sorter = new Sorter(_fileSystem,
> Text.class, RowMutationWritable.class, configuration);
> >
> > -              private void cleanupFiles() throws IOException {
> > -                fileSystem.delete(path, false);
> > -                fileSystem.delete(_sorted, false);
> > -                Path parent = path.getParent();
> > -                removeParentIfLastFile(fileSystem, parent);
> > -              }
> > +          _unsortedPaths = getUnsortedFiles();
> >
> > -              @Override
> > -              public void doPreRollback(IndexWriter writer) throws
> IOException {
> > +          _sorted = new Path(_parentPath, _shard + ".sorted.seq");
> >
> > -              }
> > +          LOG.info("Shard [{2}/{3}] Id [{4}] Sorting mutates paths
> [{0}] sorted path [{1}]", _unsortedPaths, _sorted,
> > +              _table, _shard, _bulkId);
> > +          sorter.sort(_unsortedPaths.toArray(new
> Path[_unsortedPaths.size()]), _sorted, true);
> >
> > -              @Override
> > -              public void doPreCommit(IndexSearcherCloseable
> indexSearcher, IndexWriter writer) throws IOException {
> > +          LOG.info("Shard [{1}/{2}] Id [{3}] Applying mutates sorted
> path [{0}]", _sorted, _table, _shard, _bulkId);
> > +          Reader reader = new SequenceFile.Reader(_fileSystem, _sorted,
> configuration);
> >
> > -              }
> > +          Text key = new Text();
> > +          RowMutationWritable value = new RowMutationWritable();
> >
> > -              @Override
> > -              public void doPostRollback(IndexWriter writer) throws
> IOException {
> > -                cleanupFiles();
> > -              }
> > +          Text last = null;
> > +          List<RowMutation> list = new ArrayList<RowMutation>();
> > +          while (reader.next(key, value)) {
> > +            if (!key.equals(last)) {
> > +              flushMutates(searcher, writer, list);
> > +              last = new Text(key);
> > +              list.clear();
> > +            }
> > +            list.add(value.getRowMutation().deepCopy());
> > +          }
> > +          flushMutates(searcher, writer, list);
> > +          reader.close();
> > +          LOG.info("Shard [{0}/{1}] Id [{2}] Finished applying mutates
> starting commit.", _table, _shard, _bulkId);
> > +        }
> >
> > -              @Override
> > -              public void doPostCommit(IndexWriter writer) throws
> IOException {
> > -                cleanupFiles();
> > -              }
> > -            });
> > -          } catch (IOException e) {
> > -            LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while
> trying to finish the bulk updates.", table, shard,
> > -                bulkId, e);
> > +        private void flushMutates(IndexSearcherCloseable searcher,
> IndexWriter writer, List<RowMutation> list)
> > +            throws IOException {
> > +          if (!list.isEmpty()) {
> > +            List<RowMutation> reduceMutates;
> > +            try {
> > +              reduceMutates = MutatableAction.reduceMutates(list);
> > +            } catch (BlurException e) {
> > +              throw new IOException(e);
> > +            }
> > +            for (RowMutation mutation : reduceMutates) {
> > +              MutatableAction mutatableAction = new
> MutatableAction(_shardContext);
> > +              mutatableAction.mutate(mutation);
> > +              mutatableAction.performMutate(searcher, writer);
> > +            }
> >            }
> >          }
> > +
> > +        @Override
> > +        public void doPreRollback(IndexWriter writer) throws
> IOException {
> > +
> > +        }
> > +
> > +        @Override
> > +        public void doPreCommit(IndexSearcherCloseable indexSearcher,
> IndexWriter writer) throws IOException {
> > +
> > +        }
> > +
> > +        @Override
> > +        public void doPostRollback(IndexWriter writer) throws
> IOException {
> > +          cleanupFiles(_unsortedPaths, _sorted);
> > +        }
> > +
> > +        @Override
> > +        public void doPostCommit(IndexWriter writer) throws IOException
> {
> > +          cleanupFiles(_unsortedPaths, _sorted);
> > +        }
> >        };
> > +    }
> > +
> > +    @Override
> > +    public String toString() {
> > +      return "BulkEntry [_bulkId=" + _bulkId + ", _table=" + _table +
> ", _shard=" + _shard + ", _idleTime=" + _idleTime
> > +          + ", _lastWrite=" + _lastWrite + ", _count=" + _count + "]";
> > +    }
> > +
> > +  }
> > +
> > +  public synchronized BulkEntry startBulkMutate(String bulkId) throws
> IOException {
> > +    BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> > +    if (bulkEntry == null) {
> > +      Path tablePath = _tableContext.getTablePath();
> > +      Path bulk = new Path(tablePath, "bulk");
> > +      Path bulkInstance = new Path(bulk, bulkId);
> > +      Path path = new Path(bulkInstance, _shardContext.getShard() +
> ".notsorted.seq");
> > +
> > +      bulkEntry = new BulkEntry(bulkId, path, _shardContext);
> > +      _bulkWriters.put(bulkId, bulkEntry);
> > +    } else {
> > +      LOG.info("Bulk [{0}] mutate already started on shard [{1}] in
> table [{2}].", bulkId, _shardContext.getShard(),
> > +          _tableContext.getTable());
> > +    }
> > +    return bulkEntry;
> > +  }
> > +
> > +  @Override
> > +  public void finishBulkMutate(final String bulkId, boolean apply,
> boolean blockUntilComplete) throws IOException {
> > +    final String table = _tableContext.getTable();
> > +    final String shard = _shardContext.getShard();
> > +
> > +    final BulkEntry bulkEntry = _bulkWriters.get(bulkId);
> > +    if (bulkEntry == null) {
> > +      LOG.info("Shard [{2}/{3}] Id [{0}] Nothing to apply.", bulkId,
> apply, table, shard);
> > +      return;
> > +    }
> > +    LOG.info("Shard [{2}/{3}] Id [{0}] Finishing bulk mutate apply
> [{1}]", bulkId, apply, table, shard);
> > +    bulkEntry.close();
> > +
> > +    if (!apply) {
> > +      bulkEntry.cleanupFiles(bulkEntry.getUnsortedFiles(), null);
> > +    } else {
> > +      final IndexAction indexAction = bulkEntry.getIndexAction();
> >        if (blockUntilComplete) {
> > -        runnable.run();
> > +        process(indexAction);
> >        } else {
> > -        Thread thread = new Thread(runnable);
> > +        Thread thread = new Thread(new Runnable() {
> > +          @Override
> > +          public void run() {
> > +            try {
> > +              process(indexAction);
> > +            } catch (IOException e) {
> > +              LOG.error("Shard [{0}/{1}] Id [{2}] Unknown error while
> trying to finish the bulk updates.", table,
> > +                  shard, bulkId, e);
>
> I think e is misplaced here?
>
> Thanks,
> --tim
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message