accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [accumulo] 02/02: Improve use of Hadoop's DistributedCache (#1188)
Date Mon, 10 Jun 2019 01:18:50 GMT
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 68f8f5e5ae0c0451cbb530c86a251853e7cf6b39
Author: Christopher Tubbs <ctubbsii@apache.org>
AuthorDate: Sun Jun 9 21:17:23 2019 -0400

    Improve use of Hadoop's DistributedCache (#1188)
    
    Update DistributedCacheHelper to both add a cached file to a MapReduce
    job and to retrieve an InputStream from the DistributedCache (or HDFS,
    if the item was not successfully cached).
    
    Ensure uniqueness in the local cached file names and that the files are
    read from the correct filesystem.
    
    Add some tests to TokenFileIT
    
    Specify the charset explicitly when java.util.Scanner is used (most
    occurrences were related to code using the DistributedCache, but also
    fixed two trivial occurrences in shell-related code).
---
 .../mapreduce/lib/partition/RangePartitioner.java  | 55 ++++---------
 .../clientImpl/mapreduce/lib/ConfiguratorBase.java | 91 ++++++++++------------
 .../mapreduce/lib/DistributedCacheHelper.java      | 60 ++++++++++++--
 .../mapreduce/partition/RangePartitioner.java      | 55 ++++---------
 .../hadoopImpl/mapreduce/lib/ConfiguratorBase.java | 45 ++++-------
 .../org/apache/accumulo/shell/ShellOptionsJC.java  |  4 +-
 .../accumulo/shell/PasswordConverterTest.java      |  3 +-
 .../apache/accumulo/test/mapred/TokenFileIT.java   | 15 +++-
 .../accumulo/test/mapreduce/TokenFileIT.java       | 53 ++++++-------
 9 files changed, 179 insertions(+), 202 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
index 8461ff3..1e11178 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java
@@ -16,30 +16,23 @@
  */
 package org.apache.accumulo.core.client.mapreduce.lib.partition;
 
-import java.io.File;
-import java.io.FileNotFoundException;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Scanner;
 import java.util.TreeSet;
 
-import javax.imageio.IIOException;
-
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.DistributedCacheHelper;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Partitioner;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
 /**
  * Hadoop partitioner that uses ranges, and optionally sub-bins based on hashing.
  *
@@ -70,8 +63,9 @@ public class RangePartitioner extends Partitioner<Text,Writable> implements
Conf
 
     // both conditions work with numSubBins == 1, but this check is to avoid
     // hashing, when we don't need to, for speed
-    if (numSubBins < 2)
+    if (numSubBins < 2) {
       return index;
+    }
     return (key.toString().hashCode() & Integer.MAX_VALUE) % numSubBins + index * numSubBins;
   }
 
@@ -87,26 +81,14 @@ public class RangePartitioner extends Partitioner<Text,Writable>
implements Conf
 
   private Text[] cutPointArray = null;
 
-  @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN",
-      justification = "path provided by distributed cache framework, not user input")
   private synchronized Text[] getCutPoints() throws IOException {
     if (cutPointArray == null) {
-      Path path;
       String cutFileName = conf.get(CUTFILE_KEY);
-      File tempFile = new File(CUTFILE_KEY);
-      if (tempFile.exists()) {
-        path = new Path(CUTFILE_KEY);
-      } else {
-        path = new Path(cutFileName);
-      }
-
-      if (path == null)
-        throw new FileNotFoundException("Cut point file not found in distributed cache");
-
       TreeSet<Text> cutPoints = new TreeSet<>();
-      FileSystem fs = FileSystem.get(conf);
-      FSDataInputStream inputStream = fs.open(path);
-      try (Scanner in = new Scanner(inputStream)) {
+      try (
+          InputStream inputStream =
+              DistributedCacheHelper.openCachedFile(cutFileName, CUTFILE_KEY, conf);
+          Scanner in = new Scanner(inputStream, UTF_8.name())) {
         while (in.hasNextLine()) {
           cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine())));
         }
@@ -114,8 +96,9 @@ public class RangePartitioner extends Partitioner<Text,Writable>
implements Conf
 
       cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]);
 
-      if (cutPointArray == null)
-        throw new IIOException("Cutpoint array not properly created from file" + path.getName());
+      if (cutPointArray == null) {
+        throw new IOException("Cutpoint array not properly created from file" + cutFileName);
+      }
     }
     return cutPointArray;
   }
@@ -135,16 +118,8 @@ public class RangePartitioner extends Partitioner<Text,Writable>
implements Conf
    * points that represent ranges for partitioning
    */
   public static void setSplitFile(Job job, String file) {
-    URI uri;
-    try {
-      uri = new URI(file + "#" + CUTFILE_KEY);
-    } catch (URISyntaxException e) {
-      throw new IllegalStateException(
-          "Unable to add split file \"" + CUTFILE_KEY + "\" to distributed cache.");
-    }
-
-    job.addCacheFile(uri);
-    job.getConfiguration().set(CUTFILE_KEY, uri.getPath());
+    DistributedCacheHelper.addCacheFile(job, file, CUTFILE_KEY);
+    job.getConfiguration().set(CUTFILE_KEY, file);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
index bdbbdb0..82ab4f9 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java
@@ -17,15 +17,15 @@
 package org.apache.accumulo.core.clientImpl.mapreduce.lib;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.requireNonNull;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
-import java.io.File;
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.io.InputStream;
 import java.util.Base64;
+import java.util.Scanner;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -37,9 +37,6 @@ import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.Credentials;
 import org.apache.accumulo.core.clientImpl.DelegationTokenImpl;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -63,8 +60,6 @@ public class ConfiguratorBase {
     IS_CONFIGURED, PRINCIPAL, TOKEN
   }
 
-  public static final String cachedFileName = "tokenfile";
-
   public enum TokenSource {
     FILE, INLINE, JOB;
 
@@ -144,9 +139,10 @@ public class ConfiguratorBase {
    */
   public static void setConnectorInfo(Class<?> implementingClass, Configuration conf,
       String principal, AuthenticationToken token) {
-    if (isConnectorInfoSet(implementingClass, conf))
+    if (isConnectorInfoSet(implementingClass, conf)) {
       throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName()
           + " can only be set once per job");
+    }
     checkArgument(principal != null, "principal is null");
     checkArgument(token != null, "token is null");
     conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
@@ -164,6 +160,10 @@ public class ConfiguratorBase {
     }
   }
 
+  private static String cachedTokenFileName(Class<?> implementingClass) {
+    return implementingClass.getSimpleName() + ".tokenfile";
+  }
+
   /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    *
@@ -184,19 +184,15 @@ public class ConfiguratorBase {
    */
   public static void setConnectorInfo(Class<?> implementingClass, Configuration conf,
       String principal, String tokenFile) {
-    if (isConnectorInfoSet(implementingClass, conf))
+    if (isConnectorInfoSet(implementingClass, conf)) {
       throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName()
           + " can only be set once per job");
+    }
 
     checkArgument(principal != null, "principal is null");
     checkArgument(tokenFile != null, "tokenFile is null");
 
-    try {
-      DistributedCacheHelper.addCacheFile(new URI(tokenFile + "#" + cachedFileName), conf);
-    } catch (URISyntaxException e) {
-      throw new IllegalStateException(
-          "Unable to add tokenFile \"" + tokenFile + "\" to distributed cache.");
-    }
+    DistributedCacheHelper.addCacheFile(tokenFile, cachedTokenFileName(implementingClass),
conf);
 
     conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
     conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal);
@@ -250,16 +246,19 @@ public class ConfiguratorBase {
   public static AuthenticationToken getAuthenticationToken(Class<?> implementingClass,
       Configuration conf) {
     String token = conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN));
-    if (token == null || token.isEmpty())
+    if (token == null || token.isEmpty()) {
       return null;
+    }
     if (token.startsWith(TokenSource.INLINE.prefix())) {
       String[] args = token.substring(TokenSource.INLINE.prefix().length()).split(":", 2);
-      if (args.length == 2)
+      if (args.length == 2) {
         return AuthenticationTokenSerializer.deserialize(args[0],
             Base64.getDecoder().decode(args[1]));
+      }
     } else if (token.startsWith(TokenSource.FILE.prefix())) {
       String tokenFileName = token.substring(TokenSource.FILE.prefix().length());
-      return getTokenFromFile(conf, getPrincipal(implementingClass, conf), tokenFileName);
+      return getTokenFromFile(implementingClass, conf, getPrincipal(implementingClass, conf),
+          tokenFileName);
     } else if (token.startsWith(TokenSource.JOB.prefix())) {
       String[] args = token.substring(TokenSource.JOB.prefix().length()).split(":", 2);
       if (args.length == 2) {
@@ -283,38 +282,26 @@ public class ConfiguratorBase {
    * @since 1.6.0
    * @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
    */
-  public static AuthenticationToken getTokenFromFile(Configuration conf, String principal,
-      String tokenFile) {
-    FSDataInputStream in = null;
-    try {
-      Path path;
-      // See if the "tokenfile" symlink was created and try to open the file it points to
by it.
-      File tempFile = new File(ConfiguratorBase.cachedFileName);
-      if (tempFile.exists()) {
-        path = new Path(ConfiguratorBase.cachedFileName);
-      } else {
-        path = new Path(tokenFile);
-      }
-      if (path == null) {
-        throw new IllegalArgumentException("Couldn't find password file called \"" + tokenFile
-            + "\" in the distributed cache or the specified path in the distributed filesystem.");
-      }
-      FileSystem fs = FileSystem.get(conf);
-      in = fs.open(path);
-    } catch (IOException e) {
-      throw new IllegalArgumentException(
-          "Couldn't open password file called \"" + tokenFile + "\".");
-    }
-    try (java.util.Scanner fileScanner = new java.util.Scanner(in)) {
-      while (fileScanner.hasNextLine()) {
-        Credentials creds = Credentials.deserialize(fileScanner.nextLine());
-        if (principal.equals(creds.getPrincipal())) {
-          return creds.getToken();
+  public static AuthenticationToken getTokenFromFile(Class<?> implementingClass, Configuration
conf,
+      String principal, String tokenFile) {
+
+    try (InputStream inputStream = DistributedCacheHelper.openCachedFile(tokenFile,
+        cachedTokenFileName(implementingClass), conf)) {
+
+      try (Scanner fileScanner = new Scanner(inputStream, UTF_8.name())) {
+        while (fileScanner.hasNextLine()) {
+          Credentials creds = Credentials.deserialize(fileScanner.nextLine());
+          if (principal.equals(creds.getPrincipal())) {
+            return creds.getToken();
+          }
         }
+        throw new IllegalArgumentException("No token found for " + principal);
       }
-      throw new IllegalArgumentException(
-          "Couldn't find token for user \"" + principal + "\" in file \"" + tokenFile + "\"");
+
+    } catch (IOException e) {
+      throw new IllegalStateException("Error closing token file stream", e);
     }
+
   }
 
   /**
@@ -331,10 +318,11 @@ public class ConfiguratorBase {
   public static void setZooKeeperInstance(Class<?> implementingClass, Configuration
conf,
       org.apache.accumulo.core.client.ClientConfiguration clientConfig) {
     String key = enumToConfKey(implementingClass, InstanceOpts.TYPE);
-    if (!conf.get(key, "").isEmpty())
+    if (!conf.get(key, "").isEmpty()) {
       throw new IllegalStateException(
           "Instance info can only be set once per job; it has already been configured with
"
               + conf.get(key));
+    }
     conf.set(key, "ZooKeeperInstance");
     if (clientConfig != null) {
       conf.set(enumToConfKey(implementingClass, InstanceOpts.CLIENT_CONFIG),
@@ -358,11 +346,12 @@ public class ConfiguratorBase {
     if ("ZooKeeperInstance".equals(instanceType)) {
       return new org.apache.accumulo.core.client.ZooKeeperInstance(
           getClientConfiguration(implementingClass, conf));
-    } else if (instanceType.isEmpty())
+    } else if (instanceType.isEmpty()) {
       throw new IllegalStateException(
           "Instance has not been configured for " + implementingClass.getSimpleName());
-    else
+    } else {
       throw new IllegalStateException("Unrecognized instance type " + instanceType);
+    }
   }
 
   /**
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java
index 49ddc74..94d1cae 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java
@@ -16,10 +16,22 @@
  */
 package org.apache.accumulo.core.clientImpl.mapreduce.lib;
 
+import static java.util.Objects.requireNonNull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
 /**
  * @since 1.6.0
@@ -30,15 +42,47 @@ public class DistributedCacheHelper {
    * @since 1.6.0
    */
   @SuppressWarnings("deprecation")
-  public static void addCacheFile(URI uri, Configuration conf) {
-    org.apache.hadoop.filecache.DistributedCache.addCacheFile(uri, conf);
+  public static void addCacheFile(String path, String fragment, Configuration conf) {
+    org.apache.hadoop.filecache.DistributedCache.addCacheFile(getUri(path, fragment), conf);
   }
 
-  /**
-   * @since 1.6.0
-   */
-  @SuppressWarnings("deprecation")
-  public static URI[] getCacheFiles(Configuration conf) throws IOException {
-    return org.apache.hadoop.filecache.DistributedCache.getCacheFiles(conf);
+  public static void addCacheFile(Job job, String path, String fragment) {
+    job.addCacheFile(getUri(path, fragment));
   }
+
+  @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN",
+      justification = "arbitrary path provided by user, through MapReduce APIs")
+  public static InputStream openCachedFile(String path, String fragment, Configuration conf)
{
+    // try to get it from the local copy provided by the distributed cache
+    File tempFile = new File(fragment);
+    if (tempFile.exists()) {
+      try {
+        return new FileInputStream(tempFile);
+      } catch (FileNotFoundException e) {
+        throw new AssertionError("FileNotFoundException after verifying file exists", e);
+      }
+    } else {
+
+      // try to get token directly from HDFS path, without using the distributed cache
+      try {
+        return FileSystem.get(conf).open(new Path(path));
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Unable to read file at DFS Path " + path, e);
+      }
+    }
+  }
+
+  private static URI getUri(String path, String fragment) {
+    String uriString = requireNonNull(path) + "#" + requireNonNull(fragment);
+    if (path.contains("#")) {
+      throw new IllegalArgumentException("Path to cache cannot contain a URI fragment");
+    }
+
+    try {
+      return new URI(uriString);
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException("Invalid URI for item to cache", e);
+    }
+  }
+
 }
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java
b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java
index 7e18f5c..f0ec055 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java
@@ -16,30 +16,23 @@
  */
 package org.apache.accumulo.hadoop.mapreduce.partition;
 
-import java.io.File;
-import java.io.FileNotFoundException;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Scanner;
 import java.util.TreeSet;
 
-import javax.imageio.IIOException;
-
+import org.apache.accumulo.core.clientImpl.mapreduce.lib.DistributedCacheHelper;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Partitioner;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
 /**
  * Hadoop partitioner that uses ranges, and optionally sub-bins based on hashing.
  *
@@ -68,8 +61,9 @@ public class RangePartitioner extends Partitioner<Text,Writable> implements
Conf
 
     // both conditions work with numSubBins == 1, but this check is to avoid
     // hashing, when we don't need to, for speed
-    if (numSubBins < 2)
+    if (numSubBins < 2) {
       return index;
+    }
     return (key.toString().hashCode() & Integer.MAX_VALUE) % numSubBins + index * numSubBins;
   }
 
@@ -85,26 +79,14 @@ public class RangePartitioner extends Partitioner<Text,Writable>
implements Conf
 
   private Text[] cutPointArray = null;
 
-  @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN",
-      justification = "path provided by distributed cache framework, not user input")
   private synchronized Text[] getCutPoints() throws IOException {
     if (cutPointArray == null) {
-      Path path;
       String cutFileName = conf.get(CUTFILE_KEY);
-      File tempFile = new File(CUTFILE_KEY);
-      if (tempFile.exists()) {
-        path = new Path(CUTFILE_KEY);
-      } else {
-        path = new Path(cutFileName);
-      }
-
-      if (path == null)
-        throw new FileNotFoundException("Cut point file not found in distributed cache");
-
       TreeSet<Text> cutPoints = new TreeSet<>();
-      FileSystem fs = FileSystem.get(conf);
-      FSDataInputStream inputStream = fs.open(path);
-      try (Scanner in = new Scanner(inputStream)) {
+      try (
+          InputStream inputStream =
+              DistributedCacheHelper.openCachedFile(cutFileName, CUTFILE_KEY, conf);
+          Scanner in = new Scanner(inputStream, UTF_8.name())) {
         while (in.hasNextLine()) {
           cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine())));
         }
@@ -112,8 +94,9 @@ public class RangePartitioner extends Partitioner<Text,Writable>
implements Conf
 
       cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]);
 
-      if (cutPointArray == null)
-        throw new IIOException("Cutpoint array not properly created from file" + path.getName());
+      if (cutPointArray == null) {
+        throw new IOException("Cutpoint array not properly created from file" + cutFileName);
+      }
     }
     return cutPointArray;
   }
@@ -133,16 +116,8 @@ public class RangePartitioner extends Partitioner<Text,Writable>
implements Conf
    * points that represent ranges for partitioning
    */
   public static void setSplitFile(Job job, String file) {
-    URI uri;
-    try {
-      uri = new URI(file + "#" + CUTFILE_KEY);
-    } catch (URISyntaxException e) {
-      throw new IllegalStateException(
-          "Unable to add split file \"" + CUTFILE_KEY + "\" to distributed cache.");
-    }
-
-    job.addCacheFile(uri);
-    job.getConfiguration().set(CUTFILE_KEY, uri.getPath());
+    DistributedCacheHelper.addCacheFile(job, file, CUTFILE_KEY);
+    job.getConfiguration().set(CUTFILE_KEY, file);
   }
 
   /**
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
index 367046c..ad6ad58 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java
@@ -16,12 +16,12 @@
  */
 package org.apache.accumulo.hadoopImpl.mapreduce.lib;
 
-import java.io.File;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.StringReader;
 import java.io.StringWriter;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Properties;
 import java.util.Scanner;
 
@@ -30,9 +30,6 @@ import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.clientImpl.mapreduce.lib.DistributedCacheHelper;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -44,8 +41,6 @@ public class ConfiguratorBase {
     CLIENT_PROPS, CLIENT_PROPS_FILE, IS_CONFIGURED, STORE_JOB_CALLED
   }
 
-  public static final String clientPropsFileName = "propsfile";
-
   /**
    * Configuration keys for general configuration options.
    *
@@ -83,16 +78,15 @@ public class ConfiguratorBase {
         + StringUtils.camelize(e.name().toLowerCase());
   }
 
+  private static String cachedClientPropsFileName(Class<?> implementingClass) {
+    return implementingClass.getSimpleName() + ".propsfile";
+  }
+
   public static void setClientProperties(Class<?> implementingClass, Configuration
conf,
       Properties props, String clientPropsPath) {
     if (clientPropsPath != null) {
-      try {
-        DistributedCacheHelper.addCacheFile(new URI(clientPropsPath + "#" + clientPropsFileName),
-            conf);
-      } catch (URISyntaxException e) {
-        throw new IllegalStateException("Unable to add client properties file \"" + clientPropsPath
-            + "\" to distributed cache.");
-      }
+      DistributedCacheHelper.addCacheFile(clientPropsPath,
+          cachedClientPropsFileName(implementingClass), conf);
       conf.set(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS_FILE), clientPropsPath);
     } else {
       StringWriter writer = new StringWriter();
@@ -111,31 +105,18 @@ public class ConfiguratorBase {
     String clientPropsFile =
         conf.get(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS_FILE), "");
     if (!clientPropsFile.isEmpty()) {
-      try {
-        Path path;
-        // See if the "propsfile" symlink was created and try to open the file it points
to by it.
-        File tempFile = new File(ConfiguratorBase.clientPropsFileName);
-        if (tempFile.exists()) {
-          path = new Path(ConfiguratorBase.clientPropsFileName);
-        } else {
-          path = new Path(clientPropsFile);
-        }
-
-        if (path == null)
-          throw new IllegalStateException("Could not initialize properties file");
+      try (InputStream inputStream = DistributedCacheHelper.openCachedFile(clientPropsFile,
+          cachedClientPropsFileName(implementingClass), conf)) {
 
-        FileSystem fs = FileSystem.get(conf);
-        FSDataInputStream inputStream = fs.open(path);
         StringBuilder sb = new StringBuilder();
-        try (Scanner scanner = new Scanner(inputStream)) {
+        try (Scanner scanner = new Scanner(inputStream, UTF_8.name())) {
           while (scanner.hasNextLine()) {
             sb.append(scanner.nextLine() + "\n");
           }
         }
         propString = sb.toString();
       } catch (IOException e) {
-        throw new IllegalStateException(
-            "Failed to read client properties from distributed cache: " + clientPropsFile);
+        throw new IllegalStateException("Error closing client properties file stream", e);
       }
     } else {
       propString = conf.get(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS), "");
diff --git a/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java b/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java
index 323b14b..e428c51 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.shell;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.util.ArrayList;
@@ -60,7 +62,7 @@ public class ShellOptionsJC {
         String process(String value) {
           Scanner scanner = null;
           try {
-            scanner = new Scanner(new File(value));
+            scanner = new Scanner(new File(value), UTF_8.name());
             return scanner.nextLine();
           } catch (FileNotFoundException e) {
             throw new ParameterException(e);
diff --git a/shell/src/test/java/org/apache/accumulo/shell/PasswordConverterTest.java b/shell/src/test/java/org/apache/accumulo/shell/PasswordConverterTest.java
index 0dbc183..58f065d 100644
--- a/shell/src/test/java/org/apache/accumulo/shell/PasswordConverterTest.java
+++ b/shell/src/test/java/org/apache/accumulo/shell/PasswordConverterTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.shell;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 
 import java.io.File;
@@ -92,7 +93,7 @@ public class PasswordConverterTest {
   @Test
   public void testFile() throws FileNotFoundException {
     argv[1] = "file:pom.xml";
-    Scanner scan = new Scanner(new File("pom.xml"));
+    Scanner scan = new Scanner(new File("pom.xml"), UTF_8.name());
     String expected = scan.nextLine();
     scan.close();
     new JCommander(password).parse(argv);
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
index 2609260..06901a9 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
@@ -70,6 +71,16 @@ public class TokenFileIT extends AccumuloClusterHarness {
 
       @Override
       public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter
reporter) {
+        // verify cached token file is available locally
+        for (Class<?> formatClass : Arrays.asList(
+            org.apache.accumulo.core.client.mapred.AccumuloInputFormat.class,
+            org.apache.accumulo.core.client.mapred.AccumuloOutputFormat.class)) {
+          String formatName = formatClass.getSimpleName();
+          File file = new File(formatName + ".tokenfile");
+          assertTrue(file.exists());
+          assertTrue(file.canRead());
+        }
+
         finalOutput = output;
         try {
           if (key != null)
@@ -181,7 +192,9 @@ public class TokenFileIT extends AccumuloClusterHarness {
           new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
       assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(),
           new String[] {tf.getAbsolutePath(), table1, table2}));
-
+      if (e1 != null) {
+        e1.printStackTrace();
+      }
       assertNull(e1);
 
       try (Scanner scanner = c.createScanner(table2, new Authorizations())) {
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
index 25a814a..6fd9300 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java
@@ -25,12 +25,9 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map.Entry;
-import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -39,7 +36,6 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.clientImpl.ClientInfo;
 import org.apache.accumulo.core.clientImpl.Credentials;
-import org.apache.accumulo.core.clientImpl.mapreduce.lib.ConfiguratorBase;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
@@ -73,8 +69,28 @@ public class TokenFileIT extends AccumuloClusterHarness {
       @Override
       protected void map(Key k, Value v, Context context) {
         try {
-          if (key != null)
+          // verify cached token file is available locally
+          URI[] cachedFiles;
+          try {
+            cachedFiles = context.getCacheFiles();
+          } catch (IOException e) {
+            throw new AssertionError("IOException getting cache files", e);
+          }
+          assertEquals(2, cachedFiles.length); // one for each in/out format
+          for (Class<?> formatClass : Arrays.asList(
+              org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat.class,
+              org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.class)) {
+            String formatName = formatClass.getSimpleName();
+            assertTrue(Arrays.stream(cachedFiles)
+                .anyMatch(uri -> uri.toString().endsWith(formatName + ".tokenfile")));
+            File file = new File(formatName + ".tokenfile");
+            assertTrue(file.exists());
+            assertTrue(file.canRead());
+          }
+
+          if (key != null) {
             assertEquals(key.getRow().toString(), new String(v.get()));
+          }
           assertEquals(k.getRow(), new Text(String.format("%09x", count + 1)));
           assertEquals(new String(v.get()), String.format("%09x", count));
         } catch (AssertionError e) {
@@ -91,28 +107,6 @@ public class TokenFileIT extends AccumuloClusterHarness {
         context.write(new Text(), m);
       }
 
-      @Override
-      protected void setup(Context context) throws IOException, InterruptedException {
-        if (context.getCacheFiles() != null && context.getCacheFiles().length >
0) {
-          // At this point in the MapReduce Job you can get the cached files in HDFS if you
want
-          URI[] cachedFiles = context.getCacheFiles();
-          // On the line below we access the file by the hdfs fragment name created during
caching
-          // in ConfiguratorBase
-          String fileByPsuedonym = "";
-          fileByPsuedonym = getFileContents(ConfiguratorBase.cachedFileName);
-
-          assertTrue(!fileByPsuedonym.isEmpty());
-          assertTrue(cachedFiles.length > 0);
-        }
-        super.setup(context);
-      }
-
-      private String getFileContents(String filename) throws IOException {
-
-        Path filePath = Paths.get(filename);
-        return Files.lines(filePath).collect(Collectors.joining(System.lineSeparator()));
-      }
-
     }
 
     @Override
@@ -204,6 +198,9 @@ public class TokenFileIT extends AccumuloClusterHarness {
           new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
       assertEquals(0, ToolRunner.run(conf, new MRTokenFileTester(),
           new String[] {tf.getAbsolutePath(), table1, table2}));
+      if (e1 != null) {
+        e1.printStackTrace();
+      }
       assertNull(e1);
 
       try (Scanner scanner = c.createScanner(table2, new Authorizations())) {


Mime
View raw message