From commits-return-22973-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Mon Jun 10 01:18:52 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 82849180670 for ; Mon, 10 Jun 2019 03:18:52 +0200 (CEST) Received: (qmail 55049 invoked by uid 500); 10 Jun 2019 01:18:51 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 55036 invoked by uid 99); 10 Jun 2019 01:18:51 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Jun 2019 01:18:51 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 6532787A83; Mon, 10 Jun 2019 01:18:48 +0000 (UTC) Date: Mon, 10 Jun 2019 01:18:49 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] 01/02: Fix #1052 Correct distributed cache usage (#1112) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: ctubbsii@apache.org In-Reply-To: <156012952824.1359.5169948783445972002@gitbox.apache.org> References: <156012952824.1359.5169948783445972002@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/2.0 X-Git-Reftype: branch X-Git-Rev: ecd98de9ff9a1a2a46849a6c3f26ba7468cdbac6 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190610011848.6532787A83@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/accumulo.git commit ecd98de9ff9a1a2a46849a6c3f26ba7468cdbac6 Author: Jeffrey L. Zeiberg AuthorDate: Wed Apr 10 07:46:24 2019 -0400 Fix #1052 Correct distributed cache usage (#1112) * Correct distributed cache usage for Tokenfile and Property and RangePartition key cut files. --- .../mapreduce/lib/partition/RangePartitioner.java | 57 +++++++++++++--------- .../clientImpl/mapreduce/lib/ConfiguratorBase.java | 22 +++++---- .../mapreduce/lib/DistributedCacheHelper.java | 1 - .../mapreduce/partition/RangePartitioner.java | 57 +++++++++++++--------- .../hadoopImpl/mapreduce/lib/ConfiguratorBase.java | 24 ++++++--- .../accumulo/test/mapreduce/TokenFileIT.java | 30 +++++++++++- 6 files changed, 129 insertions(+), 62 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java index 6b32130..8461ff3 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java @@ -16,21 +16,22 @@ */ package org.apache.accumulo.core.client.mapreduce.lib.partition; -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.BufferedReader; -import java.io.FileInputStream; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStreamReader; import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.Base64; import java.util.Scanner; import java.util.TreeSet; +import javax.imageio.IIOException; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -90,26 +91,31 @@ public class RangePartitioner extends Partitioner implements Conf justification = "path provided by distributed cache framework, not user input") private synchronized Text[] getCutPoints() throws IOException { if (cutPointArray == null) { + Path path; String cutFileName = conf.get(CUTFILE_KEY); - Path[] cf = Job.getInstance().getLocalCacheFiles(); - - if (cf != null) { - for (Path path : cf) { - if (path.toUri().getPath() - .endsWith(cutFileName.substring(cutFileName.lastIndexOf('/')))) { - TreeSet cutPoints = new TreeSet<>(); - try (Scanner in = new Scanner(new BufferedReader( - new InputStreamReader(new FileInputStream(path.toString()), UTF_8)))) { - while (in.hasNextLine()) - cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine()))); - } - cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]); - break; - } + File tempFile = new File(CUTFILE_KEY); + if (tempFile.exists()) { + path = new Path(CUTFILE_KEY); + } else { + path = new Path(cutFileName); + } + + if (path == null) + throw new FileNotFoundException("Cut point file not found in distributed cache"); + + TreeSet cutPoints = new TreeSet<>(); + FileSystem fs = FileSystem.get(conf); + FSDataInputStream inputStream = fs.open(path); + try (Scanner in = new Scanner(inputStream)) { + while (in.hasNextLine()) { + cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine()))); } } + + cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]); + if (cutPointArray == null) - throw new FileNotFoundException(cutFileName + " not found in distributed cache"); + throw new IIOException("Cutpoint array not properly created from file" + path.getName()); } return cutPointArray; } @@ -129,7 +135,14 @@ public class RangePartitioner extends Partitioner implements Conf * points that represent ranges for partitioning */ public static void setSplitFile(Job job, String file) { - URI uri = new Path(file).toUri(); + URI uri; + try { + uri = new URI(file + "#" + CUTFILE_KEY); + } catch (URISyntaxException e) { + throw new IllegalStateException( + "Unable to add split file \"" + CUTFILE_KEY + "\" to distributed cache."); + } + job.addCacheFile(uri); job.getConfiguration().set(CUTFILE_KEY, uri.getPath()); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java index acced30..bdbbdb0 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java @@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull; import java.io.ByteArrayInputStream; import java.io.DataInputStream; +import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -62,6 +63,8 @@ public class ConfiguratorBase { IS_CONFIGURED, PRINCIPAL, TOKEN } + public static final String cachedFileName = "tokenfile"; + public enum TokenSource { FILE, INLINE, JOB; @@ -189,7 +192,7 @@ public class ConfiguratorBase { checkArgument(tokenFile != null, "tokenFile is null"); try { - DistributedCacheHelper.addCacheFile(new URI(tokenFile), conf); + DistributedCacheHelper.addCacheFile(new URI(tokenFile + "#" + cachedFileName), conf); } catch (URISyntaxException e) { throw new IllegalStateException( "Unable to add tokenFile \"" + tokenFile + "\" to distributed cache."); @@ -284,16 +287,17 @@ public class ConfiguratorBase { String tokenFile) { FSDataInputStream in = null; try { - URI[] uris = DistributedCacheHelper.getCacheFiles(conf); - Path path = null; - for (URI u : uris) { - if (u.toString().equals(tokenFile)) { - path = new Path(u); - } + Path path; + // See if the "tokenfile" symlink was created and try to open the file it points to by it. + File tempFile = new File(ConfiguratorBase.cachedFileName); + if (tempFile.exists()) { + path = new Path(ConfiguratorBase.cachedFileName); + } else { + path = new Path(tokenFile); } if (path == null) { - throw new IllegalArgumentException( - "Couldn't find password file called \"" + tokenFile + "\" in cache."); + throw new IllegalArgumentException("Couldn't find password file called \"" + tokenFile + + "\" in the distributed cache or the specified path in the distributed filesystem."); } FileSystem fs = FileSystem.get(conf); in = fs.open(path); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java index ead3c2a..49ddc74 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java @@ -41,5 +41,4 @@ public class DistributedCacheHelper { public static URI[] getCacheFiles(Configuration conf) throws IOException { return org.apache.hadoop.filecache.DistributedCache.getCacheFiles(conf); } - } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java index 7304904..7e18f5c 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java @@ -16,21 +16,22 @@ */ package org.apache.accumulo.hadoop.mapreduce.partition; -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.BufferedReader; -import java.io.FileInputStream; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStreamReader; import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.Base64; import java.util.Scanner; import java.util.TreeSet; +import javax.imageio.IIOException; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -88,26 +89,31 @@ public class RangePartitioner extends Partitioner implements Conf justification = "path provided by distributed cache framework, not user input") private synchronized Text[] getCutPoints() throws IOException { if (cutPointArray == null) { + Path path; String cutFileName = conf.get(CUTFILE_KEY); - Path[] cf = Job.getInstance().getLocalCacheFiles(); - - if (cf != null) { - for (Path path : cf) { - if (path.toUri().getPath() - .endsWith(cutFileName.substring(cutFileName.lastIndexOf('/')))) { - TreeSet cutPoints = new TreeSet<>(); - try (Scanner in = new Scanner(new BufferedReader( - new InputStreamReader(new FileInputStream(path.toString()), UTF_8)))) { - while (in.hasNextLine()) - cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine()))); - } - cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]); - break; - } + File tempFile = new File(CUTFILE_KEY); + if (tempFile.exists()) { + path = new Path(CUTFILE_KEY); + } else { + path = new Path(cutFileName); + } + + if (path == null) + throw new FileNotFoundException("Cut point file not found in distributed cache"); + + TreeSet cutPoints = new TreeSet<>(); + FileSystem fs = FileSystem.get(conf); + FSDataInputStream inputStream = fs.open(path); + try (Scanner in = new Scanner(inputStream)) { + while (in.hasNextLine()) { + cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine()))); } } + + cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]); + if (cutPointArray == null) - throw new FileNotFoundException(cutFileName + " not found in distributed cache"); + throw new IIOException("Cutpoint array not properly created from file" + path.getName()); } return cutPointArray; } @@ -127,7 +133,14 @@ public class RangePartitioner extends Partitioner implements Conf * points that represent ranges for partitioning */ public static void setSplitFile(Job job, String file) { - URI uri = new Path(file).toUri(); + URI uri; + try { + uri = new URI(file + "#" + CUTFILE_KEY); + } catch (URISyntaxException e) { + throw new IllegalStateException( + "Unable to add split file \"" + CUTFILE_KEY + "\" to distributed cache."); + } + job.addCacheFile(uri); job.getConfiguration().set(CUTFILE_KEY, uri.getPath()); } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java index 315ce2a..367046c 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.hadoopImpl.mapreduce.lib; +import java.io.File; import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; @@ -43,6 +44,8 @@ public class ConfiguratorBase { CLIENT_PROPS, CLIENT_PROPS_FILE, IS_CONFIGURED, STORE_JOB_CALLED } + public static final String clientPropsFileName = "propsfile"; + /** * Configuration keys for general configuration options. * @@ -62,6 +65,7 @@ public class ConfiguratorBase { * @return the configuration key * @since 1.6.0 */ + protected static String enumToConfKey(Class implementingClass, Enum e) { return implementingClass.getSimpleName() + "." + e.getDeclaringClass().getSimpleName() + "." + StringUtils.camelize(e.name().toLowerCase()); @@ -83,7 +87,8 @@ public class ConfiguratorBase { Properties props, String clientPropsPath) { if (clientPropsPath != null) { try { - DistributedCacheHelper.addCacheFile(new URI(clientPropsPath), conf); + DistributedCacheHelper.addCacheFile(new URI(clientPropsPath + "#" + clientPropsFileName), + conf); } catch (URISyntaxException e) { throw new IllegalStateException("Unable to add client properties file \"" + clientPropsPath + "\" to distributed cache."); @@ -107,13 +112,18 @@ public class ConfiguratorBase { conf.get(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS_FILE), ""); if (!clientPropsFile.isEmpty()) { try { - URI[] uris = DistributedCacheHelper.getCacheFiles(conf); - Path path = null; - for (URI u : uris) { - if (u.toString().equals(clientPropsFile)) { - path = new Path(u); - } + Path path; + // See if the "propsfile" symlink was created and try to open the file it points to by it. + File tempFile = new File(ConfiguratorBase.clientPropsFileName); + if (tempFile.exists()) { + path = new Path(ConfiguratorBase.clientPropsFileName); + } else { + path = new Path(clientPropsFile); } + + if (path == null) + throw new IllegalStateException("Could not initialize properties file"); + FileSystem fs = FileSystem.get(conf); FSDataInputStream inputStream = fs.open(path); StringBuilder sb = new StringBuilder(); diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java index c43b001..25a814a 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java @@ -24,8 +24,13 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.io.PrintStream; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Iterator; import java.util.Map.Entry; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -34,6 +39,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.clientImpl.ClientInfo; import org.apache.accumulo.core.clientImpl.Credentials; +import org.apache.accumulo.core.clientImpl.mapreduce.lib.ConfiguratorBase; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; @@ -84,6 +90,29 @@ public class TokenFileIT extends AccumuloClusterHarness { m.put("", "", Integer.toString(count)); context.write(new Text(), m); } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) { + // At this point in the MapReduce Job you can get the cached files in HDFS if you want + URI[] cachedFiles = context.getCacheFiles(); + // On the line below we access the file by the hdfs fragment name created during caching + // in ConfiguratorBase + String fileByPsuedonym = ""; + fileByPsuedonym = getFileContents(ConfiguratorBase.cachedFileName); + + assertTrue(!fileByPsuedonym.isEmpty()); + assertTrue(cachedFiles.length > 0); + } + super.setup(context); + } + + private String getFileContents(String filename) throws IOException { + + Path filePath = Paths.get(filename); + return Files.lines(filePath).collect(Collectors.joining(System.lineSeparator())); + } + } @Override @@ -139,7 +168,6 @@ public class TokenFileIT extends AccumuloClusterHarness { return 1; } } - } @Rule