accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] 01/01: Merge branch '1.9'
Date Tue, 22 May 2018 22:02:46 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 5a0a63151db7fc3c54f8731257499cb08a72af24
Merge: b484841 1180d76
Author: Keith Turner <kturner@apache.org>
AuthorDate: Tue May 22 18:02:29 2018 -0400

    Merge branch '1.9'

 .../apache/accumulo/core/file/FileOperations.java  | 18 ++++++++
 .../file/blockfile/impl/CachableBlockFile.java     | 50 +++++++++++++++++-----
 .../accumulo/core/file/rfile/RFileOperations.java  |  5 ++-
 .../org/apache/accumulo/tserver/FileManager.java   |  8 +++-
 .../tserver/TabletServerResourceManager.java       |  7 ++-
 5 files changed, 72 insertions(+), 16 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index a6ee3bc,5a26ad2..9c69377
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@@ -444,7 -438,12 +457,12 @@@ public abstract class FileOperations 
      /**
       * (Optional) set the index cache to be used to optimize reads within the constructed
reader.
       */
 -    public SubbuilderType withIndexCache(BlockCache indexCache);
 +    SubbuilderType withIndexCache(BlockCache indexCache);
+ 
+     /**
+      * (Optional) set the file len cache to be used to optimize reads within the constructed
reader.
+      */
 -    public SubbuilderType withFileLenCache(Cache<String,Long> fileLenCache);
++    SubbuilderType withFileLenCache(Cache<String,Long> fileLenCache);
    }
  
    /**
diff --cc core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index de3d88d,336ec4d..7c4f97c
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@@ -16,25 -16,29 +16,26 @@@
   */
  package org.apache.accumulo.core.file.blockfile.impl;
  
 -import java.io.ByteArrayInputStream;
 +import java.io.Closeable;
  import java.io.DataInputStream;
 -import java.io.DataOutputStream;
  import java.io.IOException;
  import java.io.InputStream;
 -import java.io.OutputStream;
 -import java.lang.ref.SoftReference;
 -import java.util.concurrent.Callable;
 +import java.io.UncheckedIOException;
 +import java.util.Collections;
 +import java.util.Map;
+ import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.Supplier;
  
  import org.apache.accumulo.core.conf.AccumuloConfiguration;
 -import org.apache.accumulo.core.file.blockfile.ABlockReader;
 -import org.apache.accumulo.core.file.blockfile.ABlockWriter;
 -import org.apache.accumulo.core.file.blockfile.BlockFileReader;
 -import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
  import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 +import org.apache.accumulo.core.file.blockfile.cache.BlockCache.Loader;
  import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
 +import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
  import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
  import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
 -import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
 -import org.apache.accumulo.core.file.streams.PositionedOutput;
 +import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
  import org.apache.accumulo.core.file.streams.RateLimitedInputStream;
 -import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
  import org.apache.accumulo.core.util.ratelimit.RateLimiter;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
@@@ -43,9 -47,10 +44,10 @@@ import org.apache.hadoop.fs.Seekable
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 +import com.google.common.base.Preconditions;
+ import com.google.common.cache.Cache;
  
  /**
 - *
   * This is a wrapper class for BCFile that includes a cache for independent caches for datablocks
   * and metadatablocks
   */
@@@ -60,24 -147,24 +62,25 @@@ public class CachableBlockFile 
    }
  
    /**
 -   *
 -   *
     * Class wraps the BCFile reader.
 -   *
     */
 -  public static class Reader implements BlockFileReader {
 -
 +  public static class Reader implements Closeable {
      private final RateLimiter readLimiter;
 -    private BCFile.Reader _bc;
 -    private final String fileName;
 -    private BlockCache _dCache = null;
 -    private BlockCache _iCache = null;
 +    // private BCFile.Reader _bc;
 +    private final String cacheId;
 +    private final BlockCache _dCache;
 +    private final BlockCache _iCache;
+     private Cache<String,Long> fileLenCache = null;
 -    private InputStream fin = null;
 -    private FileSystem fs;
 -    private Configuration conf;
 +    private volatile InputStream fin = null;
      private boolean closed = false;
 -    private AccumuloConfiguration accumuloConfiguration = null;
 +    private final Configuration conf;
 +    private final AccumuloConfiguration accumuloConfiguration;
 +
 +    private final IoeSupplier<InputStream> inputSupplier;
 +    private final IoeSupplier<Long> lengthSupplier;
 +    private final AtomicReference<BCFile.Reader> bcfr = new AtomicReference<>();
 +
 +    private static final String ROOT_BLOCK_NAME = "!RootData";
  
      // ACCUMULO-4716 - Define MAX_ARRAY_SIZE smaller than Integer.MAX_VALUE to prevent possible
      // OutOfMemory
@@@ -85,48 -172,23 +88,71 @@@
      // https://stackoverflow.com/a/8381338
      private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  
 -    private interface BlockLoader {
 -      BlockReader get() throws IOException;
 -
 -      String getInfo();
++    private long getCachedFileLen() throws IOException {
++      try {
++        return fileLenCache.get(cacheId, () -> lengthSupplier.get());
++      } catch (ExecutionException e) {
++        throw new IOException("Failed to get " + cacheId + " len from cache ", e);
++      }
+     }
+ 
 -    private class OffsetBlockLoader implements BlockLoader {
 +    private BCFile.Reader getBCFile(byte[] serializedMetadata) throws IOException {
 +
 +      BCFile.Reader reader = bcfr.get();
 +      if (reader == null) {
 +        RateLimitedInputStream fsIn = new RateLimitedInputStream(
 +            (InputStream & Seekable) inputSupplier.get(), readLimiter);
-         BCFile.Reader tmpReader;
++        BCFile.Reader tmpReader = null;
 +        if (serializedMetadata == null) {
-           tmpReader = new BCFile.Reader(fsIn, lengthSupplier.get(), conf, accumuloConfiguration);
++          if (fileLenCache == null) {
++            tmpReader = new BCFile.Reader(fsIn, lengthSupplier.get(), conf, accumuloConfiguration);
++          } else {
++            long len = getCachedFileLen();
++            try {
++              tmpReader = new BCFile.Reader(fsIn, len, conf, accumuloConfiguration);
++            } catch (Exception e) {
++              log.debug("Failed to open {}, clearing file length cache and retrying", cacheId,
e);
++              fileLenCache.invalidate(cacheId);
++            }
++
++            if (tmpReader == null) {
++              len = getCachedFileLen();
++              tmpReader = new BCFile.Reader(fsIn, len, conf, accumuloConfiguration);
++            }
++          }
 +        } else {
 +          tmpReader = new BCFile.Reader(serializedMetadata, fsIn, conf);
 +        }
  
 -      private int blockIndex;
 +        if (!bcfr.compareAndSet(null, tmpReader)) {
 +          fsIn.close();
 +          tmpReader.close();
 +          return bcfr.get();
 +        } else {
 +          fin = fsIn;
 +          return tmpReader;
 +        }
 +      }
  
 -      OffsetBlockLoader(int blockIndex) {
 -        this.blockIndex = blockIndex;
 +      return reader;
 +    }
 +
 +    private BCFile.Reader getBCFile() throws IOException {
 +      if (_iCache != null) {
 +        CacheEntry mce = _iCache.getBlock(cacheId + ROOT_BLOCK_NAME, new BCFileLoader());
 +        if (mce != null) {
 +          return getBCFile(mce.getBuffer());
 +        }
        }
  
 +      return getBCFile(null);
 +    }
 +
 +    private class BCFileLoader implements Loader {
 +
        @Override
 -      public BlockReader get() throws IOException {
 -        return getBCFile(accumuloConfiguration).getDataBlock(blockIndex);
 +      public Map<String,Loader> getDependencies() {
 +        return Collections.emptyMap();
        }
  
        @Override
@@@ -187,122 -242,193 +213,124 @@@
        }
      }
  
 -    public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data,
 -        BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException
{
 -      this(fs, dataFile, conf, null, data, index, null, accumuloConfiguration);
 -    }
 +    private class MetaBlockLoader extends BaseBlockLoader {
 +      String blockName;
  
 -    public Reader(FileSystem fs, Path dataFile, Configuration conf, Cache<String,Long>
fileLenCache,
 -        BlockCache data, BlockCache index, RateLimiter readLimiter,
 -        AccumuloConfiguration accumuloConfiguration) throws IOException {
 +      MetaBlockLoader(String blockName) {
 +        super(true);
 +        this.blockName = blockName;
 +      }
  
 -      /*
 -       * Grab path create input stream grab len create file
 -       */
 +      @Override
 +      BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException {
 +        if (bcfr.getMetaBlockRawSize(blockName) > Math.min(maxSize, MAX_ARRAY_SIZE))
{
 +          return null;
 +        }
 +        return bcfr.getMetaBlock(blockName);
 +      }
  
 -      fileName = dataFile.toString();
 -      this._dCache = data;
 -      this._iCache = index;
 -      this.fileLenCache = fileLenCache;
 -      this.fs = fs;
 -      this.conf = conf;
 -      this.accumuloConfiguration = accumuloConfiguration;
 -      this.readLimiter = readLimiter;
 +      @Override
 +      String getBlockId() {
 +        return "meta-" + blockName;
 +      }
      }
  
 -    public <InputStreamType extends InputStream & Seekable> Reader(String cacheId,
 -        InputStreamType fsin, long len, Configuration conf, BlockCache data, BlockCache
index,
 -        AccumuloConfiguration accumuloConfiguration) throws IOException {
 -      this.fileName = cacheId;
 -      this._dCache = data;
 -      this._iCache = index;
 -      this.readLimiter = null;
 -      init(fsin, len, conf, accumuloConfiguration);
 -    }
 +    private abstract class BaseBlockLoader implements Loader {
  
 -    public <InputStreamType extends InputStream & Seekable> Reader(String cacheId,
 -        InputStreamType fsin, long len, Configuration conf,
 -        AccumuloConfiguration accumuloConfiguration) throws IOException {
 -      this.fileName = cacheId;
 -      this.readLimiter = null;
 -      init(fsin, len, conf, accumuloConfiguration);
 -    }
 +      abstract BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException;
  
 -    private <InputStreamT extends InputStream & Seekable> void init(InputStreamT
fsin, long len,
 -        Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException
{
 -      this._bc = new BCFile.Reader(this, fsin, len, conf, accumuloConfiguration);
 -    }
 +      abstract String getBlockId();
  
 -    private long getFileLen(final Path path) throws IOException {
 -      try {
 -        return fileLenCache.get(fileName, new Callable<Long>() {
 -          @Override
 -          public Long call() throws Exception {
 -            return fs.getFileStatus(path).getLen();
 -          }
 -        });
 -      } catch (ExecutionException e) {
 -        throw new IOException("Failed to get " + path + " len from cache ", e);
 +      private boolean loadingMetaBlock;
 +
 +      public BaseBlockLoader(boolean loadingMetaBlock) {
 +        super();
 +        this.loadingMetaBlock = loadingMetaBlock;
        }
 -    }
  
 -    private synchronized BCFile.Reader getBCFile(AccumuloConfiguration accumuloConfiguration)
 -        throws IOException {
 -      if (closed)
 -        throw new IllegalStateException("File " + fileName + " is closed");
 +      @Override
 +      public Map<String,Loader> getDependencies() {
 +        if (bcfr.get() == null && loadingMetaBlock) {
 +          String _lookup = cacheId + ROOT_BLOCK_NAME;
 +          return Collections.singletonMap(_lookup, new BCFileLoader());
 +        }
 +        return Collections.emptyMap();
 +      }
  
 -      if (_bc == null) {
 -        // lazily open file if needed
 -        final Path path = new Path(fileName);
 +      @Override
 +      public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
  
 -        RateLimitedInputStream fsIn = new RateLimitedInputStream(fs.open(path), this.readLimiter);
 -        fin = fsIn;
 +        try {
 +          BCFile.Reader reader = bcfr.get();
 +          if (reader == null) {
 +            if (loadingMetaBlock) {
 +              byte[] serializedMetadata = dependencies.get(cacheId + ROOT_BLOCK_NAME);
 +              reader = getBCFile(serializedMetadata);
 +            } else {
 +              reader = getBCFile();
 +            }
 +          }
  
 -        if (fileLenCache != null) {
 -          try {
 -            init(fsIn, getFileLen(path), conf, accumuloConfiguration);
 -          } catch (Exception e) {
 -            log.debug("Failed to open {}, clearing file length cache and retrying", fileName,
e);
 -            fileLenCache.invalidate(fileName);
 +          BlockReader _currBlock = getBlockReader(maxSize, reader);
 +          if (_currBlock == null) {
 +            return null;
            }
  
 -          if (_bc == null) {
 -            init(fsIn, getFileLen(path), conf, accumuloConfiguration);
 +          byte b[] = null;
 +          try {
 +            b = new byte[(int) _currBlock.getRawSize()];
 +            _currBlock.readFully(b);
 +          } catch (IOException e) {
 +            log.debug("Error full blockRead for file " + cacheId + " for block " + getBlockId(),
e);
 +            throw new UncheckedIOException(e);
 +          } finally {
 +            _currBlock.close();
            }
 -        } else {
 -          init(fsIn, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration);
  
 +          return b;
 +        } catch (IOException e) {
 +          throw new UncheckedIOException(e);
          }
        }
 -
 -      return _bc;
      }
  
 -    public BlockRead getCachedMetaBlock(String blockName) throws IOException {
 -      String _lookup = fileName + "M" + blockName;
 -
 -      if (_iCache != null) {
 -        CacheEntry cacheEntry = _iCache.getBlock(_lookup);
 -
 -        if (cacheEntry != null) {
 -          return new CachedBlockRead(cacheEntry, cacheEntry.getBuffer());
 -        }
 -
 -      }
 -
 -      return null;
 +    private Reader(String cacheId, IoeSupplier<InputStream> inputSupplier,
-         IoeSupplier<Long> lenghtSupplier, BlockCache data, BlockCache index,
-         RateLimiter readLimiter, Configuration conf, AccumuloConfiguration accumuloConfiguration)
{
++        IoeSupplier<Long> lenghtSupplier, Cache<String,Long> fileLenCache, BlockCache
data,
++        BlockCache index, RateLimiter readLimiter, Configuration conf,
++        AccumuloConfiguration accumuloConfiguration) {
 +      Preconditions.checkArgument(cacheId != null || (data == null && index == null));
 +      this.cacheId = cacheId;
 +      this.inputSupplier = inputSupplier;
 +      this.lengthSupplier = lenghtSupplier;
++      this.fileLenCache = fileLenCache;
 +      this._dCache = data;
 +      this._iCache = index;
 +      this.readLimiter = readLimiter;
 +      this.conf = conf;
 +      this.accumuloConfiguration = accumuloConfiguration;
      }
  
 -    public BlockRead cacheMetaBlock(String blockName, BlockReader _currBlock) throws IOException
{
 -      String _lookup = fileName + "M" + blockName;
 -      return cacheBlock(_lookup, _iCache, _currBlock, blockName);
 +    public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data,
 +        BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException
{
-       this(fs, dataFile, conf, data, index, null, accumuloConfiguration);
++      this(fs, dataFile, conf, null, data, index, null, accumuloConfiguration);
      }
  
-     public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data,
-         BlockCache index, RateLimiter readLimiter, AccumuloConfiguration accumuloConfiguration)
-         throws IOException {
 -    public void cacheMetaBlock(String blockName, byte[] b) {
 -
 -      if (_iCache == null)
 -        return;
 -
 -      String _lookup = fileName + "M" + blockName;
 -      try {
 -        _iCache.cacheBlock(_lookup, b);
 -      } catch (Exception e) {
 -        log.warn("Already cached block: " + _lookup, e);
 -      }
++    public Reader(FileSystem fs, Path dataFile, Configuration conf, Cache<String,Long>
fileLenCache,
++        BlockCache data, BlockCache index, RateLimiter readLimiter,
++        AccumuloConfiguration accumuloConfiguration) throws IOException {
 +      this(dataFile.toString(), () -> fs.open(dataFile), () -> fs.getFileStatus(dataFile).getLen(),
-           data, index, readLimiter, conf, accumuloConfiguration);
++          fileLenCache, data, index, readLimiter, conf, accumuloConfiguration);
      }
  
 -    private BlockRead getBlock(String _lookup, BlockCache cache, BlockLoader loader)
 -        throws IOException {
 -
 -      BlockReader _currBlock;
 -
 -      if (cache != null) {
 -        CacheEntry cb = null;
 -        cb = cache.getBlock(_lookup);
 -
 -        if (cb != null) {
 -          return new CachedBlockRead(cb, cb.getBuffer());
 -        }
 -
 -      }
 -      /**
 -       * grab the currBlock at this point the block is still in the data stream
 -       *
 -       */
 -      _currBlock = loader.get();
 -
 -      /**
 -       * If the block is bigger than the cache just return the stream
 -       */
 -      return cacheBlock(_lookup, cache, _currBlock, loader.getInfo());
 -
 +    public <InputStreamType extends InputStream & Seekable> Reader(String cacheId,
 +        InputStreamType fsin, long len, Configuration conf, BlockCache data, BlockCache
index,
 +        AccumuloConfiguration accumuloConfiguration) throws IOException {
-       this(cacheId, () -> fsin, () -> len, data, index, null, conf, accumuloConfiguration);
++      this(cacheId, () -> fsin, () -> len, null, data, index, null, conf, accumuloConfiguration);
      }
  
 -    private BlockRead cacheBlock(String _lookup, BlockCache cache, BlockReader _currBlock,
 -        String block) throws IOException {
 -
 -      if ((cache == null)
 -          || (_currBlock.getRawSize() > Math.min(cache.getMaxSize(), MAX_ARRAY_SIZE)))
{
 -        return new BlockRead(_currBlock, _currBlock.getRawSize());
 -      } else {
 -
 -        /**
 -         * Try to fully read block for meta data if error try to close file
 -         *
 -         */
 -        byte b[] = null;
 -        try {
 -          b = new byte[(int) _currBlock.getRawSize()];
 -          _currBlock.readFully(b);
 -        } catch (IOException e) {
 -          log.debug("Error full blockRead for file " + fileName + " for block " + block,
e);
 -          throw e;
 -        } finally {
 -          _currBlock.close();
 -        }
 -
 -        CacheEntry ce = null;
 -        try {
 -          ce = cache.cacheBlock(_lookup, b);
 -        } catch (Exception e) {
 -          log.warn("Already cached block: " + _lookup, e);
 -        }
 -
 -        if (ce == null)
 -          return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
 -        else
 -          return new CachedBlockRead(ce, ce.getBuffer());
 -
 -      }
 +    public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType
fsin, long len,
 +        Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException
{
-       this(null, () -> fsin, () -> len, null, null, null, conf, accumuloConfiguration);
++      this(null, () -> fsin, () -> len, null, null, null, null, conf, accumuloConfiguration);
      }
  
      /**
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index 0bbf3ee,a067eaa..c788cad
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@@ -321,8 -325,8 +325,8 @@@ public class FileManager 
          FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
              .forFile(path.toString(), ns, ns.getConf())
              .withTableConfiguration(
 -                context.getServerConfigurationFactory().getTableConfiguration(tablet))
 +                context.getServerConfigurationFactory().getTableConfiguration(tablet.getTableId()))
-             .withBlockCache(dataCache, indexCache).build();
+             .withBlockCache(dataCache, indexCache).withFileLenCache(fileLenCache).build();
          readersReserved.put(reader, file);
        } catch (Exception e) {
  
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 8b541d9,4d1f373..71cebbe
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@@ -253,16 -228,12 +255,19 @@@ public class TabletServerResourceManage
      defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT,
          "metadata tablets read ahead");
  
 +    summaryRetrievalPool = createIdlingEs(Property.TSERV_SUMMARY_RETRIEVAL_THREADS,
 +        "summary file retriever", 60, TimeUnit.SECONDS);
 +    summaryRemotePool = createIdlingEs(Property.TSERV_SUMMARY_REMOTE_THREADS, "summary remote",
60,
 +        TimeUnit.SECONDS);
 +    summaryParitionPool = createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS,
 +        "summary partition", 60, TimeUnit.SECONDS);
 +
      int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
  
-     fileManager = new FileManager(tserver, fs, maxOpenFiles, _dCache, _iCache);
+     Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
+         .maximumSize(Math.min(maxOpenFiles * 1000L, 100_000)).build();
+ 
+     fileManager = new FileManager(tserver, fs, maxOpenFiles, fileLenCache, _dCache, _iCache);
  
      memoryManager = Property.createInstanceFromPropertyName(acuConf, Property.TSERV_MEM_MGMT,
          MemoryManager.class, new LargestFirstMemoryManager());

-- 
To stop receiving notification emails like this one, please contact
kturner@apache.org.

Mime
View raw message