From commits-return-5790-archive-asf-public=cust-asf.ponee.io@tez.apache.org Fri Apr 27 18:33:49 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id CE5BC180608 for ; Fri, 27 Apr 2018 18:33:48 +0200 (CEST) Received: (qmail 83644 invoked by uid 500); 27 Apr 2018 16:33:48 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 83635 invoked by uid 99); 27 Apr 2018 16:33:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Apr 2018 16:33:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B4533EB4ED; Fri, 27 Apr 2018 16:33:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kshukla@apache.org To: commits@tez.apache.org Message-Id: <35cab5cc277d42edb79b1266dcfeff45@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-3887. Tez Shuffle Handler should support Index Cache configuration (Jonathan Eagles via kshukla) Date: Fri, 27 Apr 2018 16:33:47 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master ebc9f4f6d -> 5bf3e2d45 TEZ-3887. Tez Shuffle Handler should support Index Cache configuration (Jonathan Eagles via kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5bf3e2d4 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5bf3e2d4 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5bf3e2d4 Branch: refs/heads/master Commit: 5bf3e2d45a84bcfa4f90b7f8b03b4ec8875bee57 Parents: ebc9f4f Author: Kuhu Shukla Authored: Fri Apr 27 11:25:01 2018 -0500 Committer: Kuhu Shukla Committed: Fri Apr 27 11:31:36 2018 -0500 ---------------------------------------------------------------------- .../org/apache/tez/auxservices/IndexCache.java | 3 +- .../apache/tez/auxservices/TestIndexCache.java | 335 +++++++++++++++++++ 2 files changed, 337 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5bf3e2d4/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java index 5a945c4..1a9cfb2 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java @@ -38,10 +38,11 @@ class IndexCache { private final LinkedBlockingQueue queue = new LinkedBlockingQueue(); + public static final String INDEX_CACHE_MB = "tez.shuffle.indexcache.mb"; public IndexCache(Configuration conf) { this.conf = conf; - totalMemoryAllowed = 10 * 1024 * 1024; + totalMemoryAllowed = conf.getInt(INDEX_CACHE_MB, 10) * 1024 * 1024; LOG.info("IndexCache created with max memory = " + totalMemoryAllowed); } http://git-wip-us.apache.org/repos/asf/tez/blob/5bf3e2d4/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestIndexCache.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestIndexCache.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestIndexCache.java new file mode 100644 index 0000000..802fdd6 --- /dev/null +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestIndexCache.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.auxservices; + +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Random; +import java.util.zip.CRC32; +import java.util.zip.CheckedOutputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.tez.runtime.library.common.Constants; +import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.tez.auxservices.IndexCache.INDEX_CACHE_MB; +import static org.junit.Assert.*; + +public class TestIndexCache { + private Configuration conf; + private FileSystem fs; + private Path p; + + @Before + public void setUp() throws IOException { + conf = new Configuration(); + fs = FileSystem.getLocal(conf).getRaw(); + p = new Path(System.getProperty("test.build.data", "/tmp"), + "cache").makeQualified(fs.getUri(), fs.getWorkingDirectory()); + } + + @Test + public void testLRCPolicy() throws Exception { + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + System.out.println("seed: " + seed); + fs.delete(p, true); + conf.setInt(INDEX_CACHE_MB, 1); + final int partsPerMap = 1000; + final int bytesPerFile = partsPerMap * 24; + IndexCache cache = new IndexCache(conf); + + // fill cache + int totalsize = bytesPerFile; + for (; totalsize < 1024 * 1024; totalsize += bytesPerFile) { + Path f = new Path(p, Integer.toString(totalsize, 36)); + writeFile(fs, f, totalsize, partsPerMap); + TezIndexRecord rec = cache.getIndexInformation( + Integer.toString(totalsize, 36), r.nextInt(partsPerMap), f, + UserGroupInformation.getCurrentUser().getShortUserName()); + checkRecord(rec, totalsize); + } + + // delete files, ensure cache retains all elem + for (FileStatus stat : fs.listStatus(p)) { + fs.delete(stat.getPath(),true); + } + for (int i = bytesPerFile; i < 1024 * 1024; i += bytesPerFile) { + Path f = new Path(p, Integer.toString(i, 36)); + TezIndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36), + r.nextInt(partsPerMap), f, + UserGroupInformation.getCurrentUser().getShortUserName()); + checkRecord(rec, i); + } + + // push oldest (bytesPerFile) out of cache + Path f = new Path(p, Integer.toString(totalsize, 36)); + writeFile(fs, f, totalsize, partsPerMap); + cache.getIndexInformation(Integer.toString(totalsize, 36), + r.nextInt(partsPerMap), f, + UserGroupInformation.getCurrentUser().getShortUserName()); + fs.delete(f, false); + + // oldest fails to read, or error + boolean fnf = false; + try { + cache.getIndexInformation(Integer.toString(bytesPerFile, 36), + r.nextInt(partsPerMap), new Path(p, Integer.toString(bytesPerFile)), + UserGroupInformation.getCurrentUser().getShortUserName()); + } catch (IOException e) { + if (e.getCause() == null || + !(e.getCause() instanceof FileNotFoundException)) { + throw e; + } + else { + fnf = true; + } + } + if (!fnf) + fail("Failed to push out last entry"); + // should find all the other entries + for (int i = bytesPerFile << 1; i < 1024 * 1024; i += bytesPerFile) { + TezIndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36), + r.nextInt(partsPerMap), new Path(p, Integer.toString(i, 36)), + UserGroupInformation.getCurrentUser().getShortUserName()); + checkRecord(rec, i); + } + TezIndexRecord rec = cache.getIndexInformation(Integer.toString(totalsize, 36), + r.nextInt(partsPerMap), f, + UserGroupInformation.getCurrentUser().getShortUserName()); + + checkRecord(rec, totalsize); + } + + @Test + public void testBadIndex() throws Exception { + final int parts = 30; + fs.delete(p, true); + conf.setInt(INDEX_CACHE_MB, 1); + IndexCache cache = new IndexCache(conf); + + Path f = new Path(p, "badindex"); + FSDataOutputStream out = fs.create(f, false); + CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32()); + DataOutputStream dout = new DataOutputStream(iout); + for (int i = 0; i < parts; ++i) { + for (int j = 0; j < Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) { + if (0 == (i % 3)) { + dout.writeLong(i); + } else { + out.writeLong(i); + } + } + } + out.writeLong(iout.getChecksum().getValue()); + dout.close(); + try { + cache.getIndexInformation("badindex", 7, f, + UserGroupInformation.getCurrentUser().getShortUserName()); + fail("Did not detect bad checksum"); + } catch (IOException e) { + if (!(e.getCause() instanceof ChecksumException)) { + throw e; + } + } + } + + @Test + public void testInvalidReduceNumberOrLength() throws Exception { + fs.delete(p, true); + conf.setInt(INDEX_CACHE_MB, 1); + final int partsPerMap = 1000; + final int bytesPerFile = partsPerMap * 24; + IndexCache cache = new IndexCache(conf); + + // fill cache + Path feq = new Path(p, "invalidReduceOrPartsPerMap"); + writeFile(fs, feq, bytesPerFile, partsPerMap); + + // Number of reducers should always be less than partsPerMap as reducer + // numbers start from 0 and there cannot be more reducer than parts + + try { + // Number of reducers equal to partsPerMap + cache.getIndexInformation("reduceEqualPartsPerMap", + partsPerMap, // reduce number == partsPerMap + feq, UserGroupInformation.getCurrentUser().getShortUserName()); + fail("Number of reducers equal to partsPerMap did not fail"); + } catch (Exception e) { + if (!(e instanceof IOException)) { + throw e; + } + } + + try { + // Number of reducers more than partsPerMap + cache.getIndexInformation( + "reduceMorePartsPerMap", + partsPerMap + 1, // reduce number > partsPerMap + feq, UserGroupInformation.getCurrentUser().getShortUserName()); + fail("Number of reducers more than partsPerMap did not fail"); + } catch (Exception e) { + if (!(e instanceof IOException)) { + throw e; + } + } + } + + @Test + public void testRemoveMap() throws Exception { + // This test case use two thread to call getIndexInformation and + // removeMap concurrently, in order to construct race condition. + // This test case may not repeatable. But on my macbook this test + // fails with probability of 100% on code before MAPREDUCE-2541, + // so it is repeatable in practice. + fs.delete(p, true); + conf.setInt(INDEX_CACHE_MB, 10); + // Make a big file so removeMapThread almost surely runs faster than + // getInfoThread + final int partsPerMap = 100000; + final int bytesPerFile = partsPerMap * 24; + final IndexCache cache = new IndexCache(conf); + + final Path big = new Path(p, "bigIndex"); + final String user = + UserGroupInformation.getCurrentUser().getShortUserName(); + writeFile(fs, big, bytesPerFile, partsPerMap); + + // run multiple times + for (int i = 0; i < 20; ++i) { + Thread getInfoThread = new Thread() { + @Override + public void run() { + try { + cache.getIndexInformation("bigIndex", partsPerMap, big, user); + } catch (Exception e) { + // should not be here + } + } + }; + Thread removeMapThread = new Thread() { + @Override + public void run() { + cache.removeMap("bigIndex"); + } + }; + if (i%2==0) { + getInfoThread.start(); + removeMapThread.start(); + } else { + removeMapThread.start(); + getInfoThread.start(); + } + getInfoThread.join(); + removeMapThread.join(); + assertEquals(true, cache.checkTotalMemoryUsed()); + } + } + + @Test + public void testCreateRace() throws Exception { + fs.delete(p, true); + conf.setInt(INDEX_CACHE_MB, 1); + final int partsPerMap = 1000; + final int bytesPerFile = partsPerMap * 24; + final IndexCache cache = new IndexCache(conf); + + final Path racy = new Path(p, "racyIndex"); + final String user = + UserGroupInformation.getCurrentUser().getShortUserName(); + writeFile(fs, racy, bytesPerFile, partsPerMap); + + // run multiple instances + Thread[] getInfoThreads = new Thread[50]; + for (int i = 0; i < 50; i++) { + getInfoThreads[i] = new Thread() { + @Override + public void run() { + try { + cache.getIndexInformation("racyIndex", partsPerMap, racy, user); + cache.removeMap("racyIndex"); + } catch (Exception e) { + // should not be here + } + } + }; + } + + for (int i = 0; i < 50; i++) { + getInfoThreads[i].start(); + } + + final Thread mainTestThread = Thread.currentThread(); + + Thread timeoutThread = new Thread() { + @Override + public void run() { + try { + Thread.sleep(15000); + mainTestThread.interrupt(); + } catch (InterruptedException ie) { + // we are done; + } + } + }; + + for (int i = 0; i < 50; i++) { + try { + getInfoThreads[i].join(); + } catch (InterruptedException ie) { + // we haven't finished in time. Potential deadlock/race. + fail("Unexpectedly long delay during concurrent cache entry creations"); + } + } + // stop the timeoutThread. If we get interrupted before stopping, there + // must be something wrong, although it wasn't a deadlock. No need to + // catch and swallow. + timeoutThread.interrupt(); + } + + private static void checkRecord(TezIndexRecord rec, long fill) { + assertEquals(fill, rec.getStartOffset()); + assertEquals(fill, rec.getRawLength()); + assertEquals(fill, rec.getPartLength()); + } + + private static void writeFile(FileSystem fs, Path f, long fill, int parts) + throws IOException { + FSDataOutputStream out = fs.create(f, false); + CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32()); + DataOutputStream dout = new DataOutputStream(iout); + for (int i = 0; i < parts; ++i) { + for (int j = 0; j < Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) { + dout.writeLong(fill); + } + } + out.writeLong(iout.getChecksum().getValue()); + dout.close(); + } +}