accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] 01/01: Merge branch '1.8'
Date Thu, 30 Nov 2017 20:53:28 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 345070da63a7849f8aac8d71a283836366a2fd8e
Merge: 41dcabb ed313f7
Author: Keith Turner <kturner@apache.org>
AuthorDate: Thu Nov 30 15:50:49 2017 -0500

    Merge branch '1.8'

 .../accumulo/core/client/rfile/RFileScanner.java   |  2 +-
 .../core/client/rfile/RFileSummariesRetriever.java |  6 +++---
 .../file/blockfile/impl/CachableBlockFile.java     | 10 +++++----
 .../accumulo/core/summary/SummaryReader.java       |  6 +++---
 .../accumulo/core/client/rfile/RFileTest.java      | 24 ++++++++++++++++++++++
 .../core/file/rfile/MultiLevelIndexTest.java       |  2 +-
 .../apache/accumulo/core/file/rfile/RFileTest.java |  5 +++--
 7 files changed, 41 insertions(+), 14 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index e10c073,bc0df82..301dfc5
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@@ -306,8 -265,9 +306,8 @@@ class RFileScanner extends ScannerOptio
        List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(sources.length);
        for (int i = 0; i < sources.length; i++) {
          FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
-         readers.add(new RFile.Reader(new CachableBlockFile.Reader(inputStream, sources[i].getLength(),
opts.in.getConf(), dataCache, indexCache,
 -
+         readers.add(new RFile.Reader(new CachableBlockFile.Reader("source-" + i, inputStream,
sources[i].getLength(), opts.in.getConf(), dataCache, indexCache,
 -            AccumuloConfiguration.getDefaultConfiguration())));
 +            DefaultConfiguration.getInstance())));
        }
  
        if (getSamplerConfiguration() != null) {
diff --cc core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
index 1e47f00,0000000..d3a83b0
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
@@@ -1,123 -1,0 +1,123 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.accumulo.core.client.rfile;
 +
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Objects;
 +import java.util.function.Predicate;
 +
 +import org.apache.accumulo.core.client.rfile.RFile.SummaryFSOptions;
 +import org.apache.accumulo.core.client.rfile.RFile.SummaryInputArguments;
 +import org.apache.accumulo.core.client.rfile.RFile.SummaryOptions;
 +import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs;
 +import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 +import org.apache.accumulo.core.client.summary.Summary;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.DefaultConfiguration;
 +import org.apache.accumulo.core.summary.Gatherer;
 +import org.apache.accumulo.core.summary.SummarizerFactory;
 +import org.apache.accumulo.core.summary.SummaryCollection;
 +import org.apache.accumulo.core.summary.SummaryReader;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.io.Text;
 +
 +class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions, SummaryOptions
{
 +
 +  private Predicate<SummarizerConfiguration> summarySelector = sc -> true;
 +  private Text startRow;
 +  private InputArgs in;
 +  private Text endRow;
 +
 +  @Override
 +  public SummaryOptions selectSummaries(Predicate<SummarizerConfiguration> summarySelector)
{
 +    Objects.requireNonNull(summarySelector);
 +    this.summarySelector = summarySelector;
 +    return this;
 +  }
 +
 +  @Override
 +  public SummaryOptions startRow(CharSequence startRow) {
 +    return startRow(new Text(startRow.toString()));
 +  }
 +
 +  @Override
 +  public SummaryOptions startRow(Text startRow) {
 +    Objects.requireNonNull(startRow);
 +    this.startRow = startRow;
 +    return this;
 +  }
 +
 +  @Override
 +  public SummaryOptions endRow(CharSequence endRow) {
 +    return endRow(new Text(endRow.toString()));
 +  }
 +
 +  @Override
 +  public SummaryOptions endRow(Text endRow) {
 +    Objects.requireNonNull(endRow);
 +    this.endRow = endRow;
 +    return this;
 +  }
 +
 +  @Override
 +  public Collection<Summary> read() throws IOException {
 +    SummarizerFactory factory = new SummarizerFactory();
 +    AccumuloConfiguration acuconf = DefaultConfiguration.getInstance();
 +    Configuration conf = in.getFileSystem().getConf();
 +
 +    RFileSource[] sources = in.getSources();
 +    try {
 +      SummaryCollection all = new SummaryCollection();
-       for (RFileSource source : in.getSources()) {
-         SummaryReader fileSummary = SummaryReader.load(conf, acuconf, source.getInputStream(),
source.getLength(), summarySelector, factory);
++      for (int i = 0; i < sources.length; i++) {
++        RFileSource source = sources[i];
++        SummaryReader fileSummary = SummaryReader.load("source-"+i, conf, acuconf, source.getInputStream(),
source.getLength(), summarySelector, factory);
 +        SummaryCollection sc = fileSummary.getSummaries(Collections.singletonList(new Gatherer.RowRange(startRow,
endRow)));
 +        all.merge(sc, factory);
 +      }
- 
 +      return all.getSummaries();
 +    } finally {
 +      for (RFileSource source : sources) {
 +        source.getInputStream().close();
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public SummaryOptions withFileSystem(FileSystem fs) {
 +    Objects.requireNonNull(fs);
 +    this.in.fs = fs;
 +    return this;
 +  }
 +
 +  @Override
 +  public SummaryOptions from(RFileSource... inputs) {
 +    Objects.requireNonNull(inputs);
 +    in = new InputArgs(inputs);
 +    return this;
 +  }
 +
 +  @Override
 +  public SummaryFSOptions from(String... files) {
 +    Objects.requireNonNull(files);
 +    in = new InputArgs(files);
 +    return this;
 +  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index daa8f22,0000000..fa92fe0
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@@ -1,262 -1,0 +1,262 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.accumulo.core.summary;
 +
 +import java.io.DataInputStream;
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.PrintStream;
 +import java.io.UncheckedIOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.function.Predicate;
 +
 +import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 +import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
 +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 +import org.apache.accumulo.core.file.rfile.RFile.Reader;
 +import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
 +import org.apache.accumulo.core.summary.Gatherer.RowRange;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.Seekable;
 +import org.apache.hadoop.io.WritableUtils;
 +
 +public class SummaryReader {
 +
 +  private static interface BlockReader {
 +    DataInputStream getMetaBlock(String name) throws IOException;
 +  }
 +
 +  private static class CompositeCache implements BlockCache {
 +
 +    private BlockCache summaryCache;
 +    private BlockCache indexCache;
 +
 +    CompositeCache(BlockCache summaryCache, BlockCache indexCache) {
 +      this.summaryCache = summaryCache;
 +      this.indexCache = indexCache;
 +    }
 +
 +    @Override
 +    public CacheEntry cacheBlock(String blockName, byte[] buf) {
 +      return summaryCache.cacheBlock(blockName, buf);
 +    }
 +
 +    @Override
 +    public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
 +      return summaryCache.cacheBlock(blockName, buf, inMemory);
 +    }
 +
 +    @Override
 +    public CacheEntry getBlock(String blockName) {
 +      CacheEntry ce = summaryCache.getBlock(blockName);
 +      if (ce == null) {
 +        // Its possible the index cache may have this info, so check there. This is an opportunistic
check.
 +        ce = indexCache.getBlock(blockName);
 +      }
 +      return ce;
 +    }
 +
 +    @Override
 +    public long getMaxSize() {
 +      return summaryCache.getMaxSize();
 +    }
 +
 +    @Override
 +    public Stats getStats() {
 +      return summaryCache.getStats();
 +    }
 +
 +    @Override
 +    public long getMaxHeapSize() {
 +      return summaryCache.getMaxHeapSize();
 +    }
 +  }
 +
 +  private static List<SummarySerializer> load(BlockReader bcReader, Predicate<SummarizerConfiguration>
summarySelector) throws IOException {
 +
 +    try (DataInputStream in = bcReader.getMetaBlock(SummaryWriter.METASTORE_INDEX)) {
 +      List<SummarySerializer> stores = new ArrayList<>();
 +
 +      readHeader(in);
 +      int numSummaries = WritableUtils.readVInt(in);
 +      for (int i = 0; i < numSummaries; i++) {
 +        SummarizerConfiguration conf = readConfig(in);
 +        boolean inline = in.readBoolean();
 +        if (inline) {
 +          if (summarySelector.test(conf)) {
 +            stores.add(SummarySerializer.load(conf, in));
 +          } else {
 +            SummarySerializer.skip(in);
 +          }
 +        } else {
 +          int block = WritableUtils.readVInt(in);
 +          int offset = WritableUtils.readVInt(in);
 +          if (summarySelector.test(conf)) {
 +            try (DataInputStream summaryIn = bcReader.getMetaBlock(SummaryWriter.METASTORE_PREFIX
+ "." + block)) {
 +              long skipped = in.skip(offset);
 +              while (skipped < offset) {
 +                skipped += in.skip(offset - skipped);
 +              }
 +              stores.add(SummarySerializer.load(conf, summaryIn));
 +            } catch (MetaBlockDoesNotExist e) {
 +              // this is unexpected
 +              throw new IOException(e);
 +            }
 +          }
 +        }
 +      }
 +
 +      return stores;
 +    } catch (MetaBlockDoesNotExist e) {
 +      return Collections.emptyList();
 +    }
 +  }
 +
 +  private static SummaryReader load(CachableBlockFile.Reader bcReader, Predicate<SummarizerConfiguration>
summarySelector, SummarizerFactory factory)
 +      throws IOException {
 +    SummaryReader fileSummaries = new SummaryReader();
 +    fileSummaries.summaryStores = load(name -> bcReader.getMetaBlock(name), summarySelector);
 +    fileSummaries.factory = factory;
 +    return fileSummaries;
 +  }
 +
-   public static SummaryReader load(Configuration conf, AccumuloConfiguration aConf, InputStream
inputStream, long length,
++  public static SummaryReader load(String id, Configuration conf, AccumuloConfiguration
aConf, InputStream inputStream, long length,
 +      Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory)
throws IOException {
-     org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.Reader bcReader = new
CachableBlockFile.Reader((InputStream & Seekable) inputStream, length,
-         conf, aConf);
++    org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.Reader bcReader = new
CachableBlockFile.Reader(id, (InputStream & Seekable) inputStream,
++        length, conf, aConf);
 +    return load(bcReader, summarySelector, factory);
 +  }
 +
 +  public static SummaryReader load(FileSystem fs, Configuration conf, AccumuloConfiguration
aConf, SummarizerFactory factory, Path file,
 +      Predicate<SummarizerConfiguration> summarySelector, BlockCache summaryCache,
BlockCache indexCache) {
 +    CachableBlockFile.Reader bcReader = null;
 +
 +    try {
 +      // the reason BCFile is used instead of RFile is to avoid reading in the RFile meta
block when only summary data is wanted.
 +      CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache);
 +      bcReader = new CachableBlockFile.Reader(fs, file, conf, null, compositeCache, aConf);
 +      return load(bcReader, summarySelector, factory);
 +    } catch (FileNotFoundException fne) {
 +      SummaryReader sr = new SummaryReader();
 +      sr.factory = factory;
 +      sr.summaryStores = Collections.emptyList();
 +      sr.deleted = true;
 +      return sr;
 +    } catch (IOException e) {
 +      try {
 +        if (!fs.exists(file)) {
 +          SummaryReader sr = new SummaryReader();
 +          sr.factory = factory;
 +          sr.summaryStores = Collections.emptyList();
 +          sr.deleted = true;
 +          return sr;
 +        }
 +      } catch (IOException e1) {}
 +      throw new UncheckedIOException(e);
 +    } finally {
 +      if (bcReader != null) {
 +        try {
 +          bcReader.close();
 +        } catch (IOException e) {
 +          throw new UncheckedIOException(e);
 +        }
 +      }
 +    }
 +
 +  }
 +
 +  private static void print(FileSKVIterator fsi, String indent, PrintStream out) throws
IOException {
 +
 +    out.printf("Summary data : \n");
 +
 +    List<SummarySerializer> stores = load(name -> fsi.getMetaStore(name), conf
-> true);
 +    int i = 1;
 +    for (SummarySerializer summaryStore : stores) {
 +      out.printf("%sSummary %d of %d generated by : %s\n", indent, i, stores.size(), summaryStore.getSummarizerConfiguration());
 +      i++;
 +      summaryStore.print(indent, indent, out);
 +    }
 +  }
 +
 +  public static void print(Reader iter, PrintStream out) throws IOException {
 +    print(iter, "   ", out);
 +  }
 +
 +  private static SummarizerConfiguration readConfig(DataInputStream in) throws IOException
{
 +    // read summarizer configuration
 +    String summarizerClazz = in.readUTF();
 +    String configId = in.readUTF();
 +    org.apache.accumulo.core.client.summary.SummarizerConfiguration.Builder scb = SummarizerConfiguration.builder(summarizerClazz).setPropertyId(configId);
 +    int numOpts = WritableUtils.readVInt(in);
 +    for (int i = 0; i < numOpts; i++) {
 +      String k = in.readUTF();
 +      String v = in.readUTF();
 +      scb.addOption(k, v);
 +    }
 +
 +    return scb.build();
 +  }
 +
 +  private static byte readHeader(DataInputStream in) throws IOException {
 +    long magic = in.readLong();
 +    if (magic != SummaryWriter.MAGIC) {
 +      throw new IOException("Bad magic : " + String.format("%x", magic));
 +    }
 +
 +    byte ver = in.readByte();
 +    if (ver != SummaryWriter.VER) {
 +      throw new IOException("Unknown version : " + ver);
 +    }
 +
 +    return ver;
 +  }
 +
 +  private List<SummarySerializer> summaryStores;
 +
 +  private SummarizerFactory factory;
 +
 +  private boolean deleted;
 +
 +  public SummaryCollection getSummaries(List<RowRange> ranges) {
 +
 +    List<SummaryCollection.FileSummary> initial = new ArrayList<>();
 +    if (deleted) {
 +      return new SummaryCollection(initial, true);
 +    }
 +    for (SummarySerializer summaryStore : summaryStores) {
 +      if (summaryStore.exceededMaxSize()) {
 +        initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration()));
 +      } else {
 +        Map<String,Long> summary = summaryStore.getSummary(ranges, factory);
 +        boolean exceeded = summaryStore.exceedsRange(ranges);
 +        initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration(),
summary, exceeded));
 +      }
 +    }
 +
 +    return new SummaryCollection(initial);
 +  }
 +}
diff --cc core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
index 98f9983,8748d8c..e662a6b
--- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
@@@ -774,7 -621,30 +775,30 @@@ public class RFileTest 
  
    private Reader getReader(LocalFileSystem localFs, String testFile) throws IOException
{
      Reader reader = (Reader) FileOperations.getInstance().newReaderBuilder().forFile(testFile).inFileSystem(localFs,
localFs.getConf())
 -        .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
 +        .withTableConfiguration(DefaultConfiguration.getInstance()).build();
      return reader;
    }
+ 
+   @Test
+   public void testMultipleFilesAndCache() throws Exception {
+     SortedMap<Key,Value> testData = createTestData(100, 10, 10);
+     List<String> files = Arrays.asList(createTmpTestFile(), createTmpTestFile(), createTmpTestFile());
+ 
+     LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+ 
+     for (int i = 0; i < files.size(); i++) {
+       try (RFileWriter writer = RFile.newWriter().to(files.get(i)).withFileSystem(localFs).build())
{
+         for (Entry<Key,Value> entry : testData.entrySet()) {
+           if (entry.getKey().hashCode() % files.size() == i) {
+             writer.append(entry.getKey(), entry.getValue());
+           }
+         }
+       }
+     }
+ 
+     Scanner scanner = RFile.newScanner().from(files.toArray(new String[files.size()])).withFileSystem(localFs).withIndexCache(1000000).withDataCache(10000000)
+         .build();
+     Assert.assertEquals(testData, toMap(scanner));
+     scanner.close();
+   }
  }
diff --cc core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index f6b77b2,5967f85..2581f91
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@@ -271,22 -264,11 +271,23 @@@ public class RFileTest 
        in = new FSDataInputStream(bais);
        fileLength = data.length;
  
 -      LruBlockCache indexCache = new LruBlockCache(100000000, 100000);
 -      LruBlockCache dataCache = new LruBlockCache(100000000, 100000);
 +      DefaultConfiguration dc = DefaultConfiguration.getInstance();
 +      ConfigurationCopy cc = new ConfigurationCopy(dc);
 +      cc.set(Property.TSERV_CACHE_MANAGER_IMPL, LruBlockCacheManager.class.getName());
 +      try {
 +        manager = BlockCacheManagerFactory.getInstance(cc);
 +      } catch (Exception e) {
 +        throw new RuntimeException("Error creating BlockCacheManager", e);
 +      }
 +      cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(100000));
 +      cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000));
 +      cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000));
 +      manager.start(new BlockCacheConfiguration(cc));
 +      LruBlockCache indexCache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
 +      LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA);
 +
-       CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf,
dataCache, indexCache, DefaultConfiguration.getInstance());
++      CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in, fileLength,
conf, dataCache, indexCache, DefaultConfiguration.getInstance());
+ 
 -      CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in, fileLength,
conf, dataCache, indexCache,
 -          AccumuloConfiguration.getDefaultConfiguration());
        reader = new RFile.Reader(_cbr);
        if (cfsi)
          iter = new ColumnFamilySkippingIterator(reader);
@@@ -1645,8 -1624,8 +1646,8 @@@
      byte data[] = baos.toByteArray();
      SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
      FSDataInputStream in2 = new FSDataInputStream(bais);
 -    AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
 +    AccumuloConfiguration aconf = DefaultConfiguration.getInstance();
-     CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in2, data.length, CachedConfiguration.getInstance(),
aconf);
+     CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in2, data.length,
CachedConfiguration.getInstance(), aconf);
      Reader reader = new RFile.Reader(_cbr);
      checkIndex(reader);
  

-- 
To stop receiving notification emails like this one, please contact
"commits@accumulo.apache.org" <commits@accumulo.apache.org>.

Mime
View raw message