hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [1/3] hive git commit: HIVE-13120 : propagate doAs when generating ORC splits (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Date Tue, 05 Apr 2016 18:02:14 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2.0 63f53069c -> a209ced3c


HIVE-13120 : propagate doAs when generating ORC splits (Sergey Shelukhin, reviewed by Prasanth
Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b82a2ce9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b82a2ce9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b82a2ce9

Branch: refs/heads/branch-2.0
Commit: b82a2ce94aec0a0a6a7b567270aee8ec914c179d
Parents: 63f5306
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Thu Feb 25 15:27:36 2016 -0800
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Tue Apr 5 10:52:42 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 117 ++++++++--
 .../hive/ql/io/orc/TestInputOutputFormat.java   | 225 ++++++++++++++-----
 2 files changed, 261 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b82a2ce9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 359cbf7..b1e582d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -35,6 +36,7 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -100,6 +102,7 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.orc.OrcProto;
 
@@ -492,7 +495,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private FooterCache footerCache;
     private static LocalCache localCache;
     private static MetastoreCache metaCache;
-    private static ExecutorService threadPool = null;
+    static ExecutorService threadPool = null;
     private final int numBuckets;
     private final int splitStrategyBatchMs;
     private final long maxSize;
@@ -500,6 +503,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private final int minSplits;
     private final boolean footerInSplits;
     private final boolean cacheStripeDetails;
+    private final boolean forceThreadpool;
     private final AtomicInteger cacheHitCounter = new AtomicInteger(0);
     private final AtomicInteger numFilesCounter = new AtomicInteger(0);
     private final ValidTxnList transactionList;
@@ -512,6 +516,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     Context(Configuration conf, final int minSplits) {
       this.conf = conf;
+      this.forceThreadpool = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
       this.sarg = ConvertAstToSearchArg.createFromConf(conf);
       minSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, DEFAULT_MIN_SPLIT_SIZE);
       maxSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMAXSPLITSIZE, DEFAULT_MAX_SPLIT_SIZE);
@@ -570,6 +575,21 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
                               Long.MAX_VALUE + ":");
       transactionList = new ValidReadTxnList(value);
     }
+
+    @VisibleForTesting
+    static int getCurrentThreadPoolSize() {
+      synchronized (Context.class) {
+        return (threadPool instanceof ThreadPoolExecutor)
+            ? ((ThreadPoolExecutor)threadPool).getPoolSize() : ((threadPool == null) ? 0
: -1);
+      }
+    }
+
+    @VisibleForTesting
+    public static void resetThreadPool() {
+      synchronized (Context.class) {
+        threadPool = null;
+      }
+    }
   }
 
   /**
@@ -659,10 +679,11 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
     List<DeltaMetaData> deltas;
     boolean[] covered;
     private List<Future<List<OrcSplit>>> splitFuturesRef;
+    private final UserGroupInformation ugi;
 
     public ETLSplitStrategy(Context context, FileSystem fs, Path dir,
         List<HdfsFileStatusWithId> children, boolean isOriginal, List<DeltaMetaData>
deltas,
-        boolean[] covered) {
+        boolean[] covered, UserGroupInformation ugi) {
       assert !children.isEmpty();
       this.context = context;
       this.dirs = Lists.newArrayList(new ETLDir(dir, fs, children.size()));
@@ -670,6 +691,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       this.isOriginal = isOriginal;
       this.deltas = deltas;
       this.covered = covered;
+      this.ugi = ugi;
     }
 
     @Override
@@ -756,26 +778,42 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
 
     public Future<Void> generateSplitWork(
         Context context, List<Future<List<OrcSplit>>> splitFutures) throws
IOException {
-      if (context.cacheStripeDetails && context.footerCache.isBlocking()) {
+      if ((context.cacheStripeDetails && context.footerCache.isBlocking())
+          || context.forceThreadpool) {
         this.splitFuturesRef = splitFutures;
         return Context.threadPool.submit(this);
       } else {
-        runGetSplitsSync(splitFutures);
+        runGetSplitsSync(splitFutures, null);
         return null;
       }
     }
 
     @Override
     public Void call() throws IOException {
-      runGetSplitsSync(splitFuturesRef);
-      return null;
+      if (ugi == null) {
+        runGetSplitsSync(splitFuturesRef, null);
+        return null;
+      }
+      try {
+        return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            runGetSplitsSync(splitFuturesRef, ugi);
+            return null;
+          }
+        });
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
     }
 
-    private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures)
throws IOException {
+    private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures,
+        UserGroupInformation ugi) throws IOException {
       List<SplitInfo> splits = getSplits();
       List<Future<List<OrcSplit>>> localList = new ArrayList<>(splits.size());
+      UserGroupInformation tpUgi = ugi == null ? UserGroupInformation.getCurrentUser() :
ugi;
       for (SplitInfo splitInfo : splits) {
-        localList.add(Context.threadPool.submit(new SplitGenerator(splitInfo)));
+        localList.add(Context.threadPool.submit(new SplitGenerator(splitInfo, tpUgi)));
       }
       synchronized (splitFutures) {
         splitFutures.addAll(localList);
@@ -877,16 +915,35 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
     private final FileSystem fs;
     private final Path dir;
     private final boolean useFileIds;
+    private final UserGroupInformation ugi;
 
-    FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds) {
+    FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds,
+        UserGroupInformation ugi) {
       this.context = context;
       this.fs = fs;
       this.dir = dir;
       this.useFileIds = useFileIds;
+      this.ugi = ugi;
     }
 
     @Override
     public AcidDirInfo call() throws IOException {
+      if (ugi == null) {
+        return callInternal();
+      }
+      try {
+        return ugi.doAs(new PrivilegedExceptionAction<AcidDirInfo>() {
+          @Override
+          public AcidDirInfo run() throws Exception {
+            return callInternal();
+          }
+        });
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    private AcidDirInfo callInternal() throws IOException {
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
           context.conf, context.transactionList, useFileIds);
       Path base = dirInfo.getBaseDirectory();
@@ -939,8 +996,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private OrcFile.WriterVersion writerVersion;
     private long projColsUncompressedSize;
     private final List<OrcSplit> deltaSplits;
+    private final UserGroupInformation ugi;
 
-    public SplitGenerator(SplitInfo splitInfo) throws IOException {
+    public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi) throws IOException
{
+      this.ugi = ugi;
       this.context = splitInfo.context;
       this.fs = splitInfo.fs;
       this.fileWithId = splitInfo.fileWithId;
@@ -1063,6 +1122,22 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
      */
     @Override
     public List<OrcSplit> call() throws IOException {
+      if (ugi == null) {
+        return callInternal();
+      }
+      try {
+        return ugi.doAs(new PrivilegedExceptionAction<List<OrcSplit>>() {
+          @Override
+          public List<OrcSplit> run() throws Exception {
+            return callInternal();
+          }
+        });
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    private List<OrcSplit> callInternal() throws IOException {
       populateAndCacheStripeDetails();
       List<OrcSplit> splits = Lists.newArrayList();
 
@@ -1218,13 +1293,14 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
     List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();
     List<Future<Void>> strategyFutures = Lists.newArrayList();
     final List<Future<List<OrcSplit>>> splitFutures = Lists.newArrayList();
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 
     // multi-threaded file statuses and split strategy
     Path[] paths = getInputPaths(conf);
     CompletionService<AcidDirInfo> ecs = new ExecutorCompletionService<>(Context.threadPool);
     for (Path dir : paths) {
       FileSystem fs = dir.getFileSystem(conf);
-      FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds);
+      FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds, ugi);
       pathFutures.add(ecs.submit(fileGenerator));
     }
 
@@ -1256,7 +1332,7 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
         // We have received a new directory information, make a split strategy.
         --resultsLeft;
         SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, context,
-            adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
+            adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, ugi);
         if (splitStrategy == null) continue; // Combined.
 
         if (isDebugEnabled) {
@@ -1325,12 +1401,13 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
 
   private static SplitStrategy<?> combineOrCreateETLStrategy(CombinedCtx combinedCtx,
       Context context, FileSystem fs, Path dir, List<HdfsFileStatusWithId> files,
-      List<DeltaMetaData> deltas, boolean[] covered, boolean isOriginal) {
+      List<DeltaMetaData> deltas, boolean[] covered, boolean isOriginal,
+      UserGroupInformation ugi) {
     if (!deltas.isEmpty() || combinedCtx == null) {
-      return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered);
+      return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered, ugi);
     } else if (combinedCtx.combined == null) {
       combinedCtx.combined = new ETLSplitStrategy(
-          context, fs, dir, files, isOriginal, deltas, covered);
+          context, fs, dir, files, isOriginal, deltas, covered, ugi);
       combinedCtx.combineStartUs = System.nanoTime();
       return null;
     } else {
@@ -1339,11 +1416,11 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
       switch (r) {
       case YES: return null;
       case NO_AND_CONTINUE:
-        return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered);
+        return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered,
ugi);
       case NO_AND_SWAP: {
         ETLSplitStrategy oldBase = combinedCtx.combined;
         combinedCtx.combined = new ETLSplitStrategy(
-            context, fs, dir, files, isOriginal, deltas, covered);
+            context, fs, dir, files, isOriginal, deltas, covered, ugi);
         combinedCtx.combineStartUs = System.nanoTime();
         return oldBase;
       }
@@ -1681,7 +1758,7 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
   @VisibleForTesting
   static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
       FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
-      List<HdfsFileStatusWithId> baseOrOriginalFiles) {
+      List<HdfsFileStatusWithId> baseOrOriginalFiles, UserGroupInformation ugi) {
     Path base = dirInfo.getBaseDirectory();
     List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
     List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
@@ -1714,12 +1791,12 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
         case ETL:
           // ETL strategy requested through config
           return combineOrCreateETLStrategy(combinedCtx, context, fs,
-            dir, baseOrOriginalFiles, deltas, covered, isOriginal);
+            dir, baseOrOriginalFiles, deltas, covered, isOriginal, ugi);
         default:
           // HYBRID strategy
           if (avgFileSize > context.maxSize || totalFiles <= context.minSplits) {
             return combineOrCreateETLStrategy(combinedCtx, context, fs,
-                dir, baseOrOriginalFiles, deltas, covered, isOriginal);
+                dir, baseOrOriginalFiles, deltas, covered, isOriginal, ugi);
           } else {
             return new BISplitStrategy(
                 context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);

http://git-wip-us.apache.org/repos/asf/hive/blob/b82a2ce9/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index f81f5bb8..593e335 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -17,13 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -31,6 +25,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
@@ -56,6 +51,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -107,6 +103,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 import org.apache.orc.OrcProto;
 
@@ -114,12 +111,15 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Output;
 
 @SuppressWarnings({ "deprecation", "unchecked", "rawtypes" })
 public class TestInputOutputFormat {
+  private static final Logger LOG = LoggerFactory.getLogger(TestInputOutputFormat.class);
 
   public static String toKryo(SearchArgument sarg) {
     Output out = new Output(4 * 1024, 10 * 1024 * 1024);
@@ -503,7 +503,7 @@ public class TestInputOutputFormat {
           final OrcInputFormat.Context context = new OrcInputFormat.Context(
               conf, n);
           OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
-              context, fs, new MockPath(fs, "mock:/a/b"), false);
+              context, fs, new MockPath(fs, "mock:/a/b"), false, null);
           final SplitStrategy splitStrategy = createSplitStrategy(context, gen);
           assertTrue(
               String.format(
@@ -527,7 +527,7 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/b/part-04", 1000, new byte[0]));
     OrcInputFormat.FileGenerator gen =
       new OrcInputFormat.FileGenerator(context, fs,
-          new MockPath(fs, "mock:/a/b"), false);
+          new MockPath(fs, "mock:/a/b"), false, null);
     OrcInputFormat.SplitStrategy splitStrategy = createSplitStrategy(context, gen);
     assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
 
@@ -540,7 +540,7 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/b/.part-03", 1000, new byte[1000]),
         new MockFile("mock:/a/b/part-04", 1000, new byte[1000]));
     gen = new OrcInputFormat.FileGenerator(context, fs,
-            new MockPath(fs, "mock:/a/b"), false);
+            new MockPath(fs, "mock:/a/b"), false, null);
     splitStrategy = createSplitStrategy(context, gen);
     assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy);
   }
@@ -614,19 +614,20 @@ public class TestInputOutputFormat {
       MockFileSystem fs, String path, OrcInputFormat.CombinedCtx combineCtx) throws IOException
{
     OrcInputFormat.AcidDirInfo adi = createAdi(context, fs, path);
     return OrcInputFormat.determineSplitStrategy(
-        combineCtx, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
+        combineCtx, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles,
null);
   }
 
   public OrcInputFormat.AcidDirInfo createAdi(
       OrcInputFormat.Context context, MockFileSystem fs, String path) throws IOException
{
-    return new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, path), false).call();
+    return new OrcInputFormat.FileGenerator(
+        context, fs, new MockPath(fs, path), false, null).call();
   }
 
   private OrcInputFormat.SplitStrategy createSplitStrategy(
       OrcInputFormat.Context context, OrcInputFormat.FileGenerator gen) throws IOException
{
     OrcInputFormat.AcidDirInfo adi = gen.call();
     return OrcInputFormat.determineSplitStrategy(
-        null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles);
+        null, context, adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, null);
   }
 
   public static class MockBlock {
@@ -788,6 +789,9 @@ public class TestInputOutputFormat {
   public static class MockFileSystem extends FileSystem {
     final List<MockFile> files = new ArrayList<MockFile>();
     Path workingDir = new Path("/");
+    // statics for when the mock fs is created via FileSystem.get
+    private static String blockedUgi = null;
+    private final static List<MockFile> globalFiles = new ArrayList<MockFile>();
 
     public MockFileSystem() {
       // empty
@@ -803,6 +807,10 @@ public class TestInputOutputFormat {
       this.files.addAll(Arrays.asList(files));
     }
 
+    public static void setBlockedUgi(String s) {
+      blockedUgi = s;
+    }
+
     void clear() {
       files.clear();
     }
@@ -816,14 +824,36 @@ public class TestInputOutputFormat {
       }
     }
 
+    @SuppressWarnings("serial")
+    public static class MockAccessDenied extends IOException {
+    }
+
     @Override
     public FSDataInputStream open(Path path, int i) throws IOException {
-      for(MockFile file: files) {
+      checkAccess();
+      MockFile file = findFile(path);
+      if (file != null) return new FSDataInputStream(new MockInputStream(file));
+      throw new IOException("File not found: " + path);
+    }
+
+    private MockFile findFile(Path path) {
+      for (MockFile file: files) {
         if (file.path.equals(path)) {
-          return new FSDataInputStream(new MockInputStream(file));
+          return file;
         }
       }
-      throw new IOException("File not found: " + path);
+      for (MockFile file: globalFiles) {
+        if (file.path.equals(path)) {
+          return file;
+        }
+      }
+      return null;
+    }
+
+    private void checkAccess() throws IOException {
+      if (blockedUgi == null) return;
+      if (!blockedUgi.equals(UserGroupInformation.getCurrentUser().getShortUserName())) return;
+      throw new MockAccessDenied();
     }
 
     @Override
@@ -832,13 +862,8 @@ public class TestInputOutputFormat {
                                      short replication, long blockSize,
                                      Progressable progressable
                                      ) throws IOException {
-      MockFile file = null;
-      for(MockFile currentFile: files) {
-        if (currentFile.path.equals(path)) {
-          file = currentFile;
-          break;
-        }
-      }
+      checkAccess();
+      MockFile file = findFile(path);
       if (file == null) {
         file = new MockFile(path.toString(), (int) blockSize, new byte[0]);
         files.add(file);
@@ -850,37 +875,55 @@ public class TestInputOutputFormat {
     public FSDataOutputStream append(Path path, int bufferSize,
                                      Progressable progressable
                                      ) throws IOException {
+      checkAccess();
       return create(path, FsPermission.getDefault(), true, bufferSize,
           (short) 3, 256 * 1024, progressable);
     }
 
     @Override
     public boolean rename(Path path, Path path2) throws IOException {
+      checkAccess();
       return false;
     }
 
     @Override
     public boolean delete(Path path) throws IOException {
+      checkAccess();
       return false;
     }
 
     @Override
     public boolean delete(Path path, boolean b) throws IOException {
+      checkAccess();
       return false;
     }
 
     @Override
     public FileStatus[] listStatus(Path path) throws IOException {
+      checkAccess();
       path = path.makeQualified(this);
       List<FileStatus> result = new ArrayList<FileStatus>();
       String pathname = path.toString();
       String pathnameAsDir = pathname + "/";
       Set<String> dirs = new TreeSet<String>();
-      for(MockFile file: files) {
+      MockFile file = findFile(path);
+      if (file != null) {
+        return new FileStatus[]{createStatus(file)};
+      }
+      findMatchingFiles(files, pathnameAsDir, dirs, result);
+      findMatchingFiles(globalFiles, pathnameAsDir, dirs, result);
+      // for each directory add it once
+      for(String dir: dirs) {
+        result.add(createDirectory(new MockPath(this, pathnameAsDir + dir)));
+      }
+      return result.toArray(new FileStatus[result.size()]);
+    }
+
+    private void findMatchingFiles(
+        List<MockFile> files, String pathnameAsDir, Set<String> dirs, List<FileStatus>
result) {
+      for (MockFile file: files) {
         String filename = file.path.toString();
-        if (pathname.equals(filename)) {
-          return new FileStatus[]{createStatus(file)};
-        } else if (filename.startsWith(pathnameAsDir)) {
+        if (filename.startsWith(pathnameAsDir)) {
           String tail = filename.substring(pathnameAsDir.length());
           int nextSlash = tail.indexOf('/');
           if (nextSlash > 0) {
@@ -890,11 +933,6 @@ public class TestInputOutputFormat {
           }
         }
       }
-      // for each directory add it once
-      for(String dir: dirs) {
-        result.add(createDirectory(new MockPath(this, pathnameAsDir + dir)));
-      }
-      return result.toArray(new FileStatus[result.size()]);
     }
 
     @Override
@@ -925,12 +963,18 @@ public class TestInputOutputFormat {
 
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
+      checkAccess();
       path = path.makeQualified(this);
       String pathnameAsDir = path.toString() + "/";
-      for(MockFile file: files) {
-        if (file.path.equals(path)) {
-          return createStatus(file);
-        } else if (file.path.toString().startsWith(pathnameAsDir)) {
+      MockFile file = findFile(path);
+      if (file != null) return createStatus(file);
+      for (MockFile dir : files) {
+        if (dir.path.toString().startsWith(pathnameAsDir)) {
+          return createDirectory(path);
+        }
+      }
+      for (MockFile dir : globalFiles) {
+        if (dir.path.toString().startsWith(pathnameAsDir)) {
           return createDirectory(path);
         }
       }
@@ -939,23 +983,23 @@ public class TestInputOutputFormat {
 
     @Override
     public BlockLocation[] getFileBlockLocations(FileStatus stat,
-                                                 long start, long len) {
+                                                 long start, long len) throws IOException
{
+      checkAccess();
       List<BlockLocation> result = new ArrayList<BlockLocation>();
-      for(MockFile file: files) {
-        if (file.path.equals(stat.getPath())) {
-          for(MockBlock block: file.blocks) {
-            if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
-                block.length, start, len) > 0) {
-              String[] topology = new String[block.hosts.length];
-              for(int i=0; i < topology.length; ++i) {
-                topology[i] = "/rack/ " + block.hosts[i];
-              }
-              result.add(new BlockLocation(block.hosts, block.hosts,
-                  topology, block.offset, block.length));
+      MockFile file = findFile(stat.getPath());
+      if (file != null) {
+        for(MockBlock block: file.blocks) {
+          if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
+              block.length, start, len) > 0) {
+            String[] topology = new String[block.hosts.length];
+            for(int i=0; i < topology.length; ++i) {
+              topology[i] = "/rack/ " + block.hosts[i];
             }
+            result.add(new BlockLocation(block.hosts, block.hosts,
+                topology, block.offset, block.length));
           }
-          return result.toArray(new BlockLocation[result.size()]);
         }
+        return result.toArray(new BlockLocation[result.size()]);
       }
       return new BlockLocation[0];
     }
@@ -973,6 +1017,14 @@ public class TestInputOutputFormat {
       buffer.append("]}");
       return buffer.toString();
     }
+
+    public static void addGlobalFile(MockFile mockFile) {
+      globalFiles.add(mockFile);
+    }
+
+    public static void clearGlobalFiles() {
+      globalFiles.clear();
+    }
   }
 
   static void fill(DataOutputBuffer out, long length) throws IOException {
@@ -1053,7 +1105,7 @@ public class TestInputOutputFormat {
     OrcInputFormat.SplitGenerator splitter =
         new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
             AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null,
true,
-            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
+            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), null);
     OrcSplit result = splitter.createSplit(0, 200, null);
     assertEquals(0, result.getStart());
     assertEquals(200, result.getLength());
@@ -1094,7 +1146,7 @@ public class TestInputOutputFormat {
     OrcInputFormat.SplitGenerator splitter =
         new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
             AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null,
true,
-            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
+            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), null);
     List<OrcSplit> results = splitter.call();
     OrcSplit result = results.get(0);
     assertEquals(3, result.getStart());
@@ -1117,7 +1169,7 @@ public class TestInputOutputFormat {
     context = new OrcInputFormat.Context(conf);
     splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
       AcidUtils.createOriginalObj(null, fs.getFileStatus(new Path("/a/file"))), null, true,
-        new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
+        new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), null);
     results = splitter.call();
     for(int i=0; i < stripeSizes.length; ++i) {
       assertEquals("checking stripe " + i + " size",
@@ -1145,7 +1197,7 @@ public class TestInputOutputFormat {
     OrcInputFormat.SplitGenerator splitter =
         new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
             fs.getFileStatus(new Path("/a/file")), null, true,
-            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null));
+            new ArrayList<AcidInputFormat.DeltaMetaData>(), true, null, null), null);
     List<OrcSplit> results = splitter.call();
     OrcSplit result = results.get(0);
     assertEquals(3, results.size());
@@ -1168,7 +1220,7 @@ public class TestInputOutputFormat {
     splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
         fs.getFileStatus(new Path("/a/file")), null, true,
         new ArrayList<AcidInputFormat.DeltaMetaData>(),
-        true, null, null));
+        true, null, null), null);
     results = splitter.call();
     assertEquals(5, results.size());
     for (int i = 0; i < stripeSizes.length; ++i) {
@@ -1188,7 +1240,7 @@ public class TestInputOutputFormat {
     splitter = new OrcInputFormat.SplitGenerator(new OrcInputFormat.SplitInfo(context, fs,
         fs.getFileStatus(new Path("/a/file")), null, true,
         new ArrayList<AcidInputFormat.DeltaMetaData>(),
-        true, null, null));
+        true, null, null), null);
     results = splitter.call();
     assertEquals(1, results.size());
     result = results.get(0);
@@ -2040,12 +2092,7 @@ public class TestInputOutputFormat {
   @Test
   public void testSplitEliminationNullStats() throws Exception {
     Properties properties = new Properties();
-    StructObjectInspector inspector;
-    synchronized (TestOrcFile.class) {
-      inspector = (StructObjectInspector)
-          ObjectInspectorFactory.getReflectionObjectInspector(SimpleRow.class,
-              ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
-    }
+    StructObjectInspector inspector = createSoi();
     SerDe serde = new OrcSerde();
     OutputFormat<?, ?> outFormat = new OrcOutputFormat();
     conf.setInt("mapred.max.split.size", 50);
@@ -2078,4 +2125,60 @@ public class TestInputOutputFormat {
     assertEquals(0, splits.length);
   }
 
+  @Test
+  public void testDoAs() throws Exception {
+    conf.setInt(ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS.varname, 1);
+    conf.set(ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "ETL");
+    conf.setBoolean(ConfVars.HIVE_IN_TEST.varname, true);
+    conf.setClass("fs.mock.impl", MockFileSystem.class, FileSystem.class);
+    String badUser = UserGroupInformation.getCurrentUser().getShortUserName() + "-foo";
+    MockFileSystem.setBlockedUgi(badUser);
+    MockFileSystem.clearGlobalFiles();
+    OrcInputFormat.Context.resetThreadPool(); // We need the size above to take effect.
+    try {
+      // OrcInputFormat will get a mock fs from FileSystem.get; add global files.
+      MockFileSystem.addGlobalFile(new MockFile("mock:/ugi/1/file", 10000,
+          createMockOrcFile(197, 300, 600), new MockBlock("host1-1", "host1-2", "host1-3")));
+      MockFileSystem.addGlobalFile(new MockFile("mock:/ugi/2/file", 10000,
+          createMockOrcFile(197, 300, 600), new MockBlock("host1-1", "host1-2", "host1-3")));
+      FileInputFormat.setInputPaths(conf, "mock:/ugi/1");
+      UserGroupInformation ugi = UserGroupInformation.createUserForTesting(badUser, new String[0]);
+      assertEquals(0, OrcInputFormat.Context.getCurrentThreadPoolSize());
+      try {
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            OrcInputFormat.generateSplitsInfo(conf, -1);
+            return null;
+          }
+        });
+        fail("Didn't throw");
+      } catch (Exception ex) {
+        Throwable cause = ex;
+        boolean found = false;
+        while (cause != null) {
+          if (cause instanceof MockFileSystem.MockAccessDenied) {
+            found = true; // Expected.
+            break;
+          }
+          cause = cause.getCause();
+        }
+        if (!found) throw ex; // Unexpected.
+      }
+      assertEquals(1, OrcInputFormat.Context.getCurrentThreadPoolSize());
+      FileInputFormat.setInputPaths(conf, "mock:/ugi/2");
+      List<OrcSplit> splits = OrcInputFormat.generateSplitsInfo(conf, -1);
+      assertEquals(1, splits.size());
+    } finally {
+      MockFileSystem.clearGlobalFiles();
+    }
+  }
+
+
+  private StructObjectInspector createSoi() {
+    synchronized (TestOrcFile.class) {
+      return (StructObjectInspector)ObjectInspectorFactory.getReflectionObjectInspector(
+          SimpleRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+  }
 }


Mime
View raw message