Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 69585200B4D for ; Fri, 8 Jul 2016 21:31:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 68014160A5A; Fri, 8 Jul 2016 19:31:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 20622160A77 for ; Fri, 8 Jul 2016 21:31:29 +0200 (CEST) Received: (qmail 37221 invoked by uid 500); 8 Jul 2016 19:31:29 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 37140 invoked by uid 99); 8 Jul 2016 19:31:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2016 19:31:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D73BCDFFF8; Fri, 8 Jul 2016 19:31:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Fri, 08 Jul 2016 19:31:28 -0000 Message-Id: <65d1675b702e4b2d9c304f675220cf57@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] accumulo git commit: ACCUMULO-4153: Remove synchronization from Compression archived-at: Fri, 08 Jul 2016 19:31:31 -0000 Repository: accumulo Updated Branches: refs/heads/1.7 dffb1c5a9 -> 4d26943e5 ACCUMULO-4153: Remove synchronization from Compression Update the getCodec method to no longer be synchronized using static initializer in enum Update so that we use a codec cache if we are not using the default buffer size for each specific codec. LZO does not need this change. Update to improve comments and other readability concerns. Update tests to check all codecs. Add checks for failures in executor. Instead of more unit tests with Assume checks, we'll simply use a map and loop in existing unit tests to check all codecs. A failure in one will cause a failure Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8d7f04cd Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8d7f04cd Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8d7f04cd Branch: refs/heads/1.7 Commit: 8d7f04cd96d435692726bf2e3dcdb025570e01e8 Parents: 5b971d6 Author: phrocker Authored: Wed Feb 24 15:40:08 2016 -0500 Committer: phrocker Committed: Tue Jun 7 11:08:52 2016 -0400 ---------------------------------------------------------------------- .../core/file/rfile/bcfile/Compression.java | 306 +++++++++++++++---- .../core/file/rfile/bcfile/CompressionTest.java | 250 +++++++++++++++ 2 files changed, 497 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/8d7f04cd/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java index 9defa1c..3b82462 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java @@ -23,6 +23,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,6 +39,11 @@ import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.util.ReflectionUtils; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Maps; + /** * Compression related stuff. */ @@ -78,41 +86,90 @@ public final class Compression { public static final String COMPRESSION_NONE = "none"; /** - * Compression algorithms. + * Compression algorithms. There is a static initializer, below the values defined in the enumeration, that calls the initializer of all defined codecs within + * the Algorithm enum. This promotes a model of the following call graph of initialization by the static initializer, followed by calls to getCodec() and + * createCompressionStream/DecompressionStream. In some cases, the compression and decompression call methods will include a different buffer size for the + * stream. Note that if the compressed buffer size requested in these calls is zero, we will not set the buffer size for that algorithm. Instead, we will use + * the default within the codec. + * + * The buffer size is configured in the Codec by way of a Hadoop Configuration reference. One approach may be to use the same Configuration object, but when + * calls are made to createCompressionStream and DecompressionStream, with non default buffer sizes, the configuration object must be changed. In this case, + * concurrent calls to createCompressionStream and DecompressionStream would mutate the configuration object beneath each other, requiring synchronization to + * avoid undesirable activity via co-modification. To avoid synchronization entirely, we will create Codecs with their own Configuration object and cache them + * for re-use. A default codec will be statically created, as mentioned above to ensure we always have a codec available at loader initialization. + * + * There is a Guava cache defined within Algorithm that allows us to cache Codecs for re-use. Since they will have their own configuration object and thus do + * not need to be mutable, there is no concern for using them concurrently; however, the Guava cache exists to ensure a maximal size of the cache and + * efficient and concurrent read/write access to the cache itself. + * + * To provide Algorithm specific details and to describe what is in code: + * + * LZO will always have the default LZO codec because the buffer size is never overridden within it. + * + * GZ will use the default GZ codec for the compression stream, but can potentially use a different codec instance for the decompression stream if the + * requested buffer size does not match the default GZ buffer size of 32k. + * + * Snappy will use the default Snappy codec with the default buffer size of 64k for the compression stream, but will use a cached codec if the buffer size + * differs from the default. */ public static enum Algorithm { + LZO(COMPRESSION_LZO) { - private transient boolean checked = false; + /** + * determines if we've checked the codec status. ensures we don't recreate the defualt codec + */ + private transient AtomicBoolean checked = new AtomicBoolean(false); private static final String defaultClazz = "org.apache.hadoop.io.compress.LzoCodec"; private transient CompressionCodec codec = null; + /** + * Configuration option for LZO buffer size + */ + private static final String BUFFER_SIZE_OPT = "io.compression.codec.lzo.buffersize"; + + /** + * Default buffer size + */ + private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + @Override - public synchronized boolean isSupported() { - if (!checked) { - checked = true; - String extClazz = (conf.get(CONF_LZO_CLASS) == null ? System.getProperty(CONF_LZO_CLASS) : null); - String clazz = (extClazz != null) ? extClazz : defaultClazz; - try { - LOG.info("Trying to load Lzo codec class: " + clazz); - codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), conf); - } catch (ClassNotFoundException e) { - // that is okay - } - } + public boolean isSupported() { return codec != null; } + public void initializeDefaultCodec() { + if (!checked.get()) { + checked.set(true); + codec = createNewCodec(DEFAULT_BUFFER_SIZE); + } + } + @Override - CompressionCodec getCodec() throws IOException { - if (!isSupported()) { - throw new IOException("LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?"); + CompressionCodec createNewCodec(int bufferSize) { + String extClazz = (conf.get(CONF_LZO_CLASS) == null ? System.getProperty(CONF_LZO_CLASS) : null); + String clazz = (extClazz != null) ? extClazz : defaultClazz; + try { + LOG.info("Trying to load Lzo codec class: " + clazz); + Configuration myConf = new Configuration(conf); + // only use the buffersize if > 0, otherwise we'll use + // the default defined within the codec + if (bufferSize > 0) + myConf.setInt(BUFFER_SIZE_OPT, bufferSize); + codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf); + return codec; + } catch (ClassNotFoundException e) { + // that is okay } + return null; + } + @Override + CompressionCodec getCodec() throws IOException { return codec; } @Override - public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { + public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException("LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?"); } @@ -122,14 +179,13 @@ public final class Compression { } else { bis1 = downStream; } - conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024); CompressionInputStream cis = codec.createInputStream(bis1, decompressor); BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; } @Override - public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { + public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException("LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?"); } @@ -139,46 +195,82 @@ public final class Compression { } else { bos1 = downStream; } - conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024); CompressionOutputStream cos = codec.createOutputStream(bos1, compressor); BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; } + }, GZ(COMPRESSION_GZ) { - private transient DefaultCodec codec; - @Override - synchronized CompressionCodec getCodec() { - if (codec == null) { - codec = new DefaultCodec(); - codec.setConf(conf); - } + private transient DefaultCodec codec = null; + + /** + * Configuration option for gz buffer size + */ + private static final String BUFFER_SIZE_OPT = "io.file.buffer.size"; + + /** + * Default buffer size + */ + private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; + @Override + CompressionCodec getCodec() { return codec; } @Override - public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { + public void initializeDefaultCodec() { + codec = (DefaultCodec) createNewCodec(DEFAULT_BUFFER_SIZE); + } + + /** + * Create a new GZ codec + * + * @param bufferSize + * buffer size to for GZ + * @return created codec + */ + protected CompressionCodec createNewCodec(final int bufferSize) { + DefaultCodec myCodec = new DefaultCodec(); + Configuration myConf = new Configuration(conf); + // only use the buffersize if > 0, otherwise we'll use + // the default defined within the codec + if (bufferSize > 0) + myConf.setInt(BUFFER_SIZE_OPT, bufferSize); + myCodec.setConf(myConf); + return myCodec; + } + + @Override + public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { // Set the internal buffer size to read from down stream. - if (downStreamBufferSize > 0) { - codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize); + CompressionCodec decomCodec = codec; + // if we're not using the default, let's pull from the loading cache + if (DEFAULT_BUFFER_SIZE != downStreamBufferSize) { + Entry sizeOpt = Maps.immutableEntry(GZ, downStreamBufferSize); + try { + decomCodec = codecCache.get(sizeOpt); + } catch (ExecutionException e) { + throw new IOException(e); + } } - CompressionInputStream cis = codec.createInputStream(downStream, decompressor); + CompressionInputStream cis = decomCodec.createInputStream(downStream, decompressor); BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; } @Override - public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { + public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { OutputStream bos1 = null; if (downStreamBufferSize > 0) { bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); } else { bos1 = downStream; } - codec.getConf().setInt("io.file.buffer.size", 32 * 1024); + // always uses the default buffer size CompressionOutputStream cos = codec.createOutputStream(bos1, compressor); BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; @@ -197,15 +289,23 @@ public final class Compression { } @Override - public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { + public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (downStreamBufferSize > 0) { return new BufferedInputStream(downStream, downStreamBufferSize); } return downStream; } + public void initializeDefaultCodec() { + + } + + protected CompressionCodec createNewCodec(final int bufferSize) { + return null; + } + @Override - public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { + public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { if (downStreamBufferSize > 0) { return new BufferedOutputStream(downStream, downStreamBufferSize); } @@ -222,18 +322,65 @@ public final class Compression { SNAPPY(COMPRESSION_SNAPPY) { // Use base type to avoid compile-time dependencies. private transient CompressionCodec snappyCodec = null; - private transient boolean checked = false; + /** + * determines if we've checked the codec status. ensures we don't recreate the defualt codec + */ + private transient AtomicBoolean checked = new AtomicBoolean(false); private static final String defaultClazz = "org.apache.hadoop.io.compress.SnappyCodec"; + /** + * Buffer size option + */ + private static final String BUFFER_SIZE_OPT = "io.compression.codec.snappy.buffersize"; + + /** + * Default buffer size value + */ + private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + public CompressionCodec getCodec() throws IOException { - if (!isSupported()) { - throw new IOException("SNAPPY codec class not specified. Did you forget to set property " + CONF_SNAPPY_CLASS + "?"); - } return snappyCodec; } @Override - public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { + public void initializeDefaultCodec() { + if (!checked.get()) { + checked.set(true); + snappyCodec = createNewCodec(DEFAULT_BUFFER_SIZE); + } + } + + /** + * Creates a new snappy codec. + * + * @param bufferSize + * incoming buffer size + * @return new codec or null, depending on if installed + */ + protected CompressionCodec createNewCodec(final int bufferSize) { + + String extClazz = (conf.get(CONF_SNAPPY_CLASS) == null ? System.getProperty(CONF_SNAPPY_CLASS) : null); + String clazz = (extClazz != null) ? extClazz : defaultClazz; + try { + LOG.info("Trying to load snappy codec class: " + clazz); + + Configuration myConf = new Configuration(conf); + // only use the buffersize if > 0, otherwise we'll use + // the default defined within the codec + if (bufferSize > 0) + myConf.setInt(BUFFER_SIZE_OPT, bufferSize); + + return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf); + + } catch (ClassNotFoundException e) { + // that is okay + } + + return null; + } + + @Override + public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException("SNAPPY codec class not specified. Did you forget to set property " + CONF_SNAPPY_CLASS + "?"); @@ -244,44 +391,71 @@ public final class Compression { } else { bos1 = downStream; } - conf.setInt("io.compression.codec.snappy.buffersize", 64 * 1024); + // use the default codec CompressionOutputStream cos = snappyCodec.createOutputStream(bos1, compressor); BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; } @Override - public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { + public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException("SNAPPY codec class not specified. Did you forget to set property " + CONF_SNAPPY_CLASS + "?"); } - if (downStreamBufferSize > 0) { - conf.setInt("io.file.buffer.size", downStreamBufferSize); + + CompressionCodec decomCodec = snappyCodec; + // if we're not using the same buffer size, we'll pull the codec from the loading cache + if (DEFAULT_BUFFER_SIZE != downStreamBufferSize) { + Entry sizeOpt = Maps.immutableEntry(SNAPPY, downStreamBufferSize); + try { + decomCodec = codecCache.get(sizeOpt); + } catch (ExecutionException e) { + throw new IOException(e); + } } - CompressionInputStream cis = snappyCodec.createInputStream(downStream, decompressor); + + CompressionInputStream cis = decomCodec.createInputStream(downStream, decompressor); BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; } @Override - public synchronized boolean isSupported() { - if (!checked) { - checked = true; - String extClazz = (conf.get(CONF_SNAPPY_CLASS) == null ? System.getProperty(CONF_SNAPPY_CLASS) : null); - String clazz = (extClazz != null) ? extClazz : defaultClazz; - try { - LOG.info("Trying to load snappy codec class: " + clazz); - snappyCodec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), conf); - } catch (ClassNotFoundException e) { - // that is okay - } - } + public boolean isSupported() { + return snappyCodec != null; } }; + + /** + * The model defined by the static block, below, creates a singleton for each defined codec in the Algorithm enumeration. By creating the codecs, each call + * to isSupported shall return true/false depending on if the codec singleton is defined. The static initializer, below, will ensure this occurs when the + * Enumeration is loaded. Furthermore, calls to getCodec will return the singleton, whether it is null or not. + * + * Calls to createCompressionStream and createDecompressionStream may return a different codec than getCodec, if the incoming downStreamBufferSize is + * different than the default. In such a case, we will place the resulting codec into the codecCache, defined below, to ensure we have cache codecs. + * + * Since codecs are immutable, there is no concern about concurrent access to the CompressionCodec objects within the guava cache. + */ + static { + conf = new Configuration(); + for (final Algorithm al : Algorithm.values()) { + al.initializeDefaultCodec(); + } + } + + /** + * Guava cache to have a limited factory pattern defined in the Algorithm enum. + */ + private static LoadingCache,CompressionCodec> codecCache = CacheBuilder.newBuilder().maximumSize(25) + .build(new CacheLoader,CompressionCodec>() { + public CompressionCodec load(Entry key) { + return key.getKey().createNewCodec(key.getValue()); + } + }); + // We require that all compression related settings are configured // statically in the Configuration object. - protected static final Configuration conf = new Configuration(); + protected static final Configuration conf; private final String compressName; // data input buffer size to absorb small reads from application. private static final int DATA_IBUF_SIZE = 1 * 1024; @@ -296,6 +470,20 @@ public final class Compression { abstract CompressionCodec getCodec() throws IOException; + /** + * function to create the default codec object. + */ + abstract void initializeDefaultCodec(); + + /** + * Shared function to create new codec objects. It is expected that if buffersize is invalid, a codec will be created with the default buffer size + * + * @param bufferSize + * configured buffer size. + * @return new codec + */ + abstract CompressionCodec createNewCodec(int bufferSize); + public abstract InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException; public abstract OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException; http://git-wip-us.apache.org/repos/asf/accumulo/blob/8d7f04cd/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java new file mode 100644 index 0000000..9615564 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.accumulo.core.file.rfile.bcfile; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.ReflectionUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class CompressionTest { + + HashMap isSupported = Maps.newHashMap(); + + @Before + public void testSupport() { + // we can safely assert that GZ exists by virtue of it being the DefaultCodec + isSupported.put(Compression.Algorithm.GZ, true); + + Configuration myConf = new Configuration(); + + String extClazz = System.getProperty(Compression.Algorithm.CONF_LZO_CLASS); + String clazz = (extClazz != null) ? extClazz : "org.apache.hadoop.io.compress.LzoCodec"; + try { + CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf); + + Assert.assertNotNull(codec); + isSupported.put(Compression.Algorithm.LZO, true); + + } catch (ClassNotFoundException e) { + // that is okay + } + + extClazz = System.getProperty(Compression.Algorithm.CONF_SNAPPY_CLASS); + clazz = (extClazz != null) ? extClazz : "org.apache.hadoop.io.compress.SnappyCodec"; + try { + CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf); + + Assert.assertNotNull(codec); + + isSupported.put(Compression.Algorithm.SNAPPY, true); + + } catch (ClassNotFoundException e) { + // that is okay + } + + } + + @Test + public void testSingle() throws IOException { + + for (final Algorithm al : Algorithm.values()) { + if (isSupported.get(al) != null && isSupported.get(al) == true) { + + // first call to issupported should be true + Assert.assertTrue(al + " is not supported, but should be", al.isSupported()); + + Assert.assertNotNull(al + " should have a non-null codec", al.getCodec()); + + Assert.assertNotNull(al + " should have a non-null codec", al.getCodec()); + } + } + } + + @Test + public void testSingleNoSideEffect() throws IOException { + + for (final Algorithm al : Algorithm.values()) { + if (isSupported.get(al) != null && isSupported.get(al) == true) { + + Assert.assertTrue(al + " is not supported, but should be", al.isSupported()); + + Assert.assertNotNull(al + " should have a non-null codec", al.getCodec()); + + // assert that additional calls to create will not create + // additional codecs + + Assert.assertNotEquals(al + " should have created a new codec, but did not", System.identityHashCode(al.getCodec()), al.createNewCodec(88 * 1024)); + } + } + } + + @Test(timeout = 60 * 1000) + public void testManyStartNotNull() throws IOException, InterruptedException, ExecutionException { + + for (final Algorithm al : Algorithm.values()) { + if (isSupported.get(al) != null && isSupported.get(al) == true) { + + // first call to issupported should be true + Assert.assertTrue(al + " is not supported, but should be", al.isSupported()); + + final CompressionCodec codec = al.getCodec(); + + Assert.assertNotNull(al + " should not be null", codec); + + ExecutorService service = Executors.newFixedThreadPool(10); + + ArrayList> results = Lists.newArrayList(); + + for (int i = 0; i < 30; i++) { + results.add(service.submit(new Callable() + + { + + @Override + public Boolean call() throws Exception { + Assert.assertNotNull(al + " should not be null", al.getCodec()); + return true; + } + + })); + } + + service.shutdown(); + + Assert.assertNotNull(al + " should not be null", codec); + + while (!service.awaitTermination(1, TimeUnit.SECONDS)) { + // wait + } + + for (Future result : results) { + Assert.assertTrue(al + " resulted in a failed call to getcodec within the thread pool", result.get()); + } + } + } + + } + + // don't start until we have created the codec + @Test(timeout = 60 * 1000) + public void testManyDontStartUntilThread() throws IOException, InterruptedException, ExecutionException { + + for (final Algorithm al : Algorithm.values()) { + if (isSupported.get(al) != null && isSupported.get(al) == true) { + + // first call to issupported should be true + Assert.assertTrue(al + " is not supported, but should be", al.isSupported()); + + ExecutorService service = Executors.newFixedThreadPool(10); + + ArrayList> results = Lists.newArrayList(); + + for (int i = 0; i < 30; i++) { + + results.add(service.submit(new Callable() { + + @Override + public Boolean call() throws Exception { + Assert.assertNotNull(al + " should have a non-null codec", al.getCodec()); + return true; + } + + })); + } + + service.shutdown(); + + while (!service.awaitTermination(1, TimeUnit.SECONDS)) { + // wait + } + + for (Future result : results) { + Assert.assertTrue(al + " resulted in a failed call to getcodec within the thread pool", result.get()); + } + } + } + + } + + @Test(timeout = 60 * 1000) + public void testThereCanBeOnlyOne() throws IOException, InterruptedException, ExecutionException { + + for (final Algorithm al : Algorithm.values()) { + if (isSupported.get(al) != null && isSupported.get(al) == true) { + + // first call to issupported should be true + Assert.assertTrue(al + " is not supported, but should be", al.isSupported()); + + ExecutorService service = Executors.newFixedThreadPool(20); + + ArrayList> list = Lists.newArrayList(); + + ArrayList> results = Lists.newArrayList(); + + // keep track of the system's identity hashcodes. + final HashSet testSet = Sets.newHashSet(); + + for (int i = 0; i < 40; i++) { + list.add(new Callable() { + + @Override + public Boolean call() throws Exception { + CompressionCodec codec = al.getCodec(); + Assert.assertNotNull(al + " resulted in a non-null codec", codec); + // add the identity hashcode to the set. + testSet.add(System.identityHashCode(codec)); + return true; + } + }); + } + + results.addAll(service.invokeAll(list)); + // ensure that we + Assert.assertEquals(al + " created too many codecs", 1, testSet.size()); + service.shutdown(); + + while (!service.awaitTermination(1, TimeUnit.SECONDS)) { + // wait + } + + for (Future result : results) { + Assert.assertTrue(al + " resulted in a failed call to getcodec within the thread pool", result.get()); + } + } + } + } + +}