Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E18F1200BA6 for ; Sat, 10 Sep 2016 01:37:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E086F160AC2; Fri, 9 Sep 2016 23:37:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7D604160ADF for ; Sat, 10 Sep 2016 01:37:22 +0200 (CEST) Received: (qmail 29308 invoked by uid 500); 9 Sep 2016 23:37:21 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 28941 invoked by uid 99); 9 Sep 2016 23:37:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Sep 2016 23:37:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1F07EDFCC0; Fri, 9 Sep 2016 23:37:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Fri, 09 Sep 2016 23:37:38 -0000 Message-Id: <055b2b9df5ca441cad60ec2fa22efdd6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/45] accumulo git commit: ACCUMULO-4391 Added appropriate synchronization to allow RFile.Reader deepcopies to be used in separate threads. Added a test case which forces at least one of the race conditions. The remaining conditions have been tested archived-at: Fri, 09 Sep 2016 23:37:26 -0000 ACCUMULO-4391 Added appropriate synchronization to allow RFile.Reader deepcopies to be used in separate threads. Added a test case which forces at least one of the race conditions. The remaining conditions have been tested on systems that demonstrated these issues. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3c8bc120 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3c8bc120 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3c8bc120 Branch: refs/heads/master Commit: 3c8bc120b1527eb0cba5390da8e57eb947734af8 Parents: dcc5dff Author: Ivan Bella Authored: Wed Jul 27 13:11:12 2016 -0400 Committer: Ivan Bella Committed: Thu Sep 8 12:24:16 2016 -0400 ---------------------------------------------------------------------- .../file/blockfile/impl/CachableBlockFile.java | 5 +- .../accumulo/core/file/rfile/bcfile/BCFile.java | 24 +- .../bcfile/BoundedRangeFileInputStream.java | 34 +- .../core/file/rfile/MultiThreadedRFileTest.java | 382 +++++++++++++++++++ .../accumulo/core/file/rfile/RFileTest.java | 7 - core/src/test/resources/log4j.properties | 2 + 6 files changed, 426 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/3c8bc120/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index 3b12d07..be464be 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -417,7 +417,10 @@ public class CachableBlockFile { _bc.close(); if (fin != null) { - fin.close(); + // synchronize on the FSDataInputStream to ensure thread safety with the BoundedRangeFileInputStream + synchronized (fin) { + fin.close(); + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3c8bc120/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java index 4767d91..60376d5 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java @@ -615,6 +615,7 @@ public final class BCFile { private Decompressor decompressor; private final BlockRegion region; private final InputStream in; + private volatile boolean closed; public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region, Configuration conf, CryptoModule cryptoModule, Version bcFileVersion, CryptoModuleParameters cryptoParams) throws IOException { @@ -652,6 +653,7 @@ public final class BCFile { compressAlgo.returnDecompressor(decompressor); throw e; } + closed = false; } /** @@ -672,11 +674,23 @@ public final class BCFile { } public void finish() throws IOException { - try { - in.close(); - } finally { - compressAlgo.returnDecompressor(decompressor); - decompressor = null; + if (!closed) { + synchronized (in) { + if (!closed) { + try { + in.close(); + } finally { + closed = true; + if (decompressor != null) { + try { + compressAlgo.returnDecompressor(decompressor); + } finally { + decompressor = null; + } + } + } + } + } } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3c8bc120/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java index f93bb84..b5ee61b 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java @@ -31,7 +31,8 @@ import org.apache.hadoop.fs.FSDataInputStream; */ class BoundedRangeFileInputStream extends InputStream { - private FSDataInputStream in; + private volatile boolean closed = false; + private final FSDataInputStream in; private long pos; private long end; private long mark; @@ -62,12 +63,7 @@ class BoundedRangeFileInputStream extends InputStream { @Override public int available() throws IOException { - int avail = in.available(); - if (pos + avail > end) { - avail = (int) (end - pos); - } - - return avail; + return (int) (end - pos); } @Override @@ -93,15 +89,18 @@ class BoundedRangeFileInputStream extends InputStream { if (n == 0) return -1; Integer ret = 0; - final FSDataInputStream inLocal = in; - synchronized (inLocal) { - inLocal.seek(pos); + synchronized (in) { + // ensuring we are not closed which would be followed by someone else reusing the decompressor + if (closed) { + throw new IOException("Stream closed"); + } + in.seek(pos); try { ret = AccessController.doPrivileged(new PrivilegedExceptionAction() { @Override public Integer run() throws IOException { int ret = 0; - ret = inLocal.read(b, off, n); + ret = in.read(b, off, n); return ret; } }); @@ -146,9 +145,14 @@ class BoundedRangeFileInputStream extends InputStream { @Override public void close() { - // Invalidate the state of the stream. - in = null; - pos = end; - mark = -1; + // Synchronize on the FSDataInputStream to ensure we are blocked if in the read method: + // Once this close completes, the underlying decompression stream may be returned to + // the pool and subsequently used. Turns out this is a problem if currently using it to read. + if (!closed) { + synchronized (in) { + // Invalidate the state of the stream. + closed = true; + } + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3c8bc120/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java new file mode 100644 index 0000000..498551f --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.rfile; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; +import org.apache.accumulo.core.file.rfile.RFile.Reader; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.NamingThreadFactory; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MultiThreadedRFileTest { + + private static final Logger LOG = Logger.getLogger(MultiThreadedRFileTest.class); + private static final Collection EMPTY_COL_FAMS = new ArrayList(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); + + private static void checkIndex(Reader reader) throws IOException { + FileSKVIterator indexIter = reader.getIndex(); + + if (indexIter.hasTop()) { + Key lastKey = new Key(indexIter.getTopKey()); + + if (reader.getFirstKey().compareTo(lastKey) > 0) + throw new RuntimeException("First key out of order " + reader.getFirstKey() + " " + lastKey); + + indexIter.next(); + + while (indexIter.hasTop()) { + if (lastKey.compareTo(indexIter.getTopKey()) > 0) + throw new RuntimeException("Indext out of order " + lastKey + " " + indexIter.getTopKey()); + + lastKey = new Key(indexIter.getTopKey()); + indexIter.next(); + + } + + if (!reader.getLastKey().equals(lastKey)) { + throw new RuntimeException("Last key out of order " + reader.getLastKey() + " " + lastKey); + } + } + } + + public static class TestRFile { + + private Configuration conf = CachedConfiguration.getInstance(); + public RFile.Writer writer; + private FSDataOutputStream dos; + private FSDataInputStream in; + private AccumuloConfiguration accumuloConfiguration; + public Reader reader; + public SortedKeyValueIterator iter; + public File rfile = null; + public boolean deepCopy = false; + + public TestRFile(AccumuloConfiguration accumuloConfiguration) { + this.accumuloConfiguration = accumuloConfiguration; + if (this.accumuloConfiguration == null) + this.accumuloConfiguration = AccumuloConfiguration.getDefaultConfiguration(); + } + + public void close() throws IOException { + if (rfile != null) { + FileSystem fs = FileSystem.newInstance(conf); + Path path = new Path("file://" + rfile.toString()); + fs.delete(path, false); + } + } + + public TestRFile deepCopy() throws IOException { + TestRFile copy = new TestRFile(accumuloConfiguration); + // does not copy any writer resources. This would be for read only. + copy.reader = (Reader) reader.deepCopy(null); + copy.rfile = rfile; + copy.iter = new ColumnFamilySkippingIterator(copy.reader); + copy.deepCopy = true; + + checkIndex(copy.reader); + return copy; + } + + public void openWriter(boolean startDLG) throws IOException { + if (deepCopy) { + throw new IOException("Cannot open writer on a deep copy"); + } + if (rfile == null) { + rfile = File.createTempFile("TestRFile", ".rf"); + } + FileSystem fs = FileSystem.newInstance(conf); + Path path = new Path("file://" + rfile.toString()); + dos = fs.create(path, true); + CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf, accumuloConfiguration); + writer = new RFile.Writer(_cbw, 1000, 1000); + + if (startDLG) + writer.startDefaultLocalityGroup(); + } + + public void openWriter() throws IOException { + openWriter(true); + } + + public void closeWriter() throws IOException { + if (deepCopy) { + throw new IOException("Cannot open writer on a deepcopy"); + } + dos.flush(); + writer.close(); + dos.flush(); + dos.close(); + } + + public void openReader() throws IOException { + FileSystem fs = FileSystem.newInstance(conf); + Path path = new Path("file://" + rfile.toString()); + + // the caches used to obfuscate the multithreaded issues + CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, null, null, AccumuloConfiguration.getDefaultConfiguration()); + reader = new RFile.Reader(_cbr); + iter = new ColumnFamilySkippingIterator(reader); + + checkIndex(reader); + } + + public void closeReader() throws IOException { + reader.close(); + } + + public void seek(Key nk) throws IOException { + iter.seek(new Range(nk, null), EMPTY_COL_FAMS, false); + } + } + + static Key nk(String row, String cf, String cq, String cv, long ts) { + return new Key(row.getBytes(), cf.getBytes(), cq.getBytes(), cv.getBytes(), ts); + } + + static Value nv(String val) { + return new Value(val.getBytes()); + } + + public AccumuloConfiguration conf = null; + + @Test + public void testMultipleReaders() throws IOException { + final List threadExceptions = Collections.synchronizedList(new ArrayList()); + Map messages = new HashMap(); + Map stackTrace = new HashMap(); + + final TestRFile trfBase = new TestRFile(conf); + + writeData(trfBase); + + trfBase.openReader(); + + try { + + validate(trfBase); + + final TestRFile trfBaseCopy = trfBase.deepCopy(); + + validate(trfBaseCopy); + + // now start up multiple RFile deepcopies + int maxThreads = 10; + String name = "MultiThreadedRFileTestThread"; + ThreadPoolExecutor pool = new ThreadPoolExecutor(maxThreads + 1, maxThreads + 1, 5 * 60, TimeUnit.SECONDS, new LinkedBlockingQueue(), + new NamingThreadFactory(name)); + pool.allowCoreThreadTimeOut(true); + try { + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + TestRFile trf = trfBase; + synchronized (trfBaseCopy) { + trf = trfBaseCopy.deepCopy(); + } + validate(trf); + } catch (Throwable t) { + threadExceptions.add(t); + } + } + }; + for (int i = 0; i < maxThreads; i++) { + pool.submit(runnable); + } + } finally { + pool.shutdown(); + try { + pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + for (Throwable t : threadExceptions) { + String msg = t.getClass() + " : " + t.getMessage(); + if (!messages.containsKey(msg)) { + messages.put(msg, new MutableInt(1)); + } else { + messages.get(msg).increment(); + } + StringWriter string = new StringWriter(); + PrintWriter writer = new PrintWriter(string); + t.printStackTrace(writer); + writer.flush(); + stackTrace.put(msg, string.getBuffer().toString()); + } + } finally { + trfBase.closeReader(); + trfBase.close(); + } + + for (String message : messages.keySet()) { + LOG.error(messages.get(message) + ": " + message); + LOG.error(stackTrace.get(message)); + } + + assertTrue(threadExceptions.isEmpty()); + } + + private void validate(TestRFile trf) throws IOException { + Random random = new Random(); + for (int iteration = 0; iteration < 10; iteration++) { + int part = random.nextInt(4); + + Range range = new Range(getKey(part, 0, 0), true, getKey(part, 4, 2048), true); + trf.iter.seek(range, EMPTY_COL_FAMS, false); + + Key last = null; + for (int locality = 0; locality < 4; locality++) { + for (int i = 0; i < 2048; i++) { + Key key = getKey(part, locality, i); + Value value = getValue(i); + assertTrue("No record found for row " + part + " locality " + locality + " index " + i, trf.iter.hasTop()); + assertEquals("Invalid key found for row " + part + " locality " + locality + " index " + i, key, trf.iter.getTopKey()); + assertEquals("Invalie value found for row " + part + " locality " + locality + " index " + i, value, trf.iter.getTopValue()); + last = trf.iter.getTopKey(); + trf.iter.next(); + } + } + if (trf.iter.hasTop()) { + assertFalse("Found " + trf.iter.getTopKey() + " after " + last + " in " + range, trf.iter.hasTop()); + } + + range = new Range(getKey(4, 4, 0), true, null, true); + trf.iter.seek(range, EMPTY_COL_FAMS, false); + if (trf.iter.hasTop()) { + assertFalse("Found " + trf.iter.getTopKey() + " in " + range, trf.iter.hasTop()); + } + } + + Range range = new Range((Key) null, null); + trf.iter.seek(range, EMPTY_COL_FAMS, false); + + Key last = null; + for (int part = 0; part < 4; part++) { + for (int locality = 0; locality < 4; locality++) { + for (int i = 0; i < 2048; i++) { + Key key = getKey(part, locality, i); + Value value = getValue(i); + assertTrue("No record found for row " + part + " locality " + locality + " index " + i, trf.iter.hasTop()); + assertEquals("Invalid key found for row " + part + " locality " + locality + " index " + i, key, trf.iter.getTopKey()); + assertEquals("Invalie value found for row " + part + " locality " + locality + " index " + i, value, trf.iter.getTopValue()); + last = trf.iter.getTopKey(); + trf.iter.next(); + } + } + } + + if (trf.iter.hasTop()) { + assertFalse("Found " + trf.iter.getTopKey() + " after " + last + " in " + range, trf.iter.hasTop()); + } + } + + private void writeData(TestRFile trfBase) throws IOException { + trfBase.openWriter(false); + + try { + for (int locality = 1; locality < 4; locality++) { + trfBase.writer.startNewLocalityGroup("locality" + locality, Collections.singleton((ByteSequence) (new ArrayByteSequence(getCf(locality))))); + for (int part = 0; part < 4; part++) { + for (int i = 0; i < 2048; i++) { + trfBase.writer.append(getKey(part, locality, i), getValue(i)); + } + } + } + + trfBase.writer.startDefaultLocalityGroup(); + for (int part = 0; part < 4; part++) { + for (int i = 0; i < 2048; i++) { + trfBase.writer.append(getKey(part, 0, i), getValue(i)); + } + } + } finally { + trfBase.closeWriter(); + } + } + + private Key getKey(int part, int locality, int index) { + String row = "r000" + part; + String cf = getCf(locality); + String cq = "cq" + pad(index); + + return nk(row, cf, cq, "", 1); + } + + private String pad(int val) { + String valStr = String.valueOf(val); + switch (valStr.length()) { + case 1: + return "000" + valStr; + case 2: + return "00" + valStr; + case 3: + return "0" + valStr; + default: + return valStr; + } + } + + private Value getValue(int index) { + return nv("" + index); + } + + private String getCf(int locality) { + return "cf" + locality; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3c8bc120/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 6a29610..c345065 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -67,8 +67,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.Text; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -82,11 +80,6 @@ public class RFileTest { @Rule public TemporaryFolder tempFolder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); - static { - Logger.getLogger(org.apache.hadoop.io.compress.CodecPool.class).setLevel(Level.OFF); - Logger.getLogger(org.apache.hadoop.util.NativeCodeLoader.class).setLevel(Level.OFF); - } - static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable { public SeekableByteArrayInputStream(byte[] buf) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3c8bc120/core/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 9f968f8..40adebf 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -25,4 +25,6 @@ log4j.logger.org.apache.commons.vfs2.impl.DefaultFileSystemManager=WARN log4j.logger.org.apache.hadoop.mapred=ERROR log4j.logger.org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter=ERROR log4j.logger.org.apache.hadoop.util.ProcessTree=ERROR +log4j.logger.org.apache.hadoop.io.compress.CodecPool=FATAL +log4j.logger.org.apache.hadoop.util.NativeCodeLoader=FATAL log4j.logger.org.apache.accumulo.core.util.format=FATAL