incubator-bigtop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r1205607 [2/3] - in /incubator/bigtop/branches/hadoop-0.23: ./ bigtop-packages/src/common/hive/ bigtop-packages/src/common/sqoop/ bigtop-packages/src/rpm/hive/SPECS/ bigtop-packages/src/rpm/mahout/SPECS/ bigtop-packages/src/rpm/sqoop/SPECS/
Date Wed, 23 Nov 2011 21:26:19 GMT

Modified: incubator/bigtop/branches/hadoop-0.23/bigtop-packages/src/common/hive/patch
URL: http://svn.apache.org/viewvc/incubator/bigtop/branches/hadoop-0.23/bigtop-packages/src/common/hive/patch?rev=1205607&r1=1205606&r2=1205607&view=diff
==============================================================================
--- incubator/bigtop/branches/hadoop-0.23/bigtop-packages/src/common/hive/patch (original)
+++ incubator/bigtop/branches/hadoop-0.23/bigtop-packages/src/common/hive/patch Wed Nov 23 21:26:18 2011
@@ -1,6 +1,350 @@
+Index: shims/src/0.20/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
+===================================================================
+--- shims/src/0.20/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java	(revision 1203794)
++++ shims/src/0.20/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java	(working copy)
+@@ -1,61 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements.  See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership.  The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License.  You may obtain a copy of the License at
+- *
+- *     http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-
+-package org.apache.hadoop.fs;
+-
+-import java.io.*;
+-import java.net.URI;
+-import java.net.URISyntaxException;
+-
+-import org.apache.hadoop.conf.Configuration;
+-import org.apache.hadoop.fs.permission.FsPermission;
+-import org.apache.hadoop.util.Progressable;
+-
+-/****************************************************************
+- * A Proxy for LocalFileSystem
+- *
+- * Serves uri's corresponding to 'pfile:///' namespace with using
+- * a LocalFileSystem 
+- *****************************************************************/
+-
+-public class ProxyLocalFileSystem extends FilterFileSystem {
+-
+-  protected LocalFileSystem localFs;
+-
+-  public ProxyLocalFileSystem() {
+-    localFs = new LocalFileSystem();
+-  }
+-
+-  public ProxyLocalFileSystem(FileSystem fs) {
+-    throw new RuntimeException ("Unsupported Constructor");
+-  }
+-
+-  @Override
+-  public void initialize(URI name, Configuration conf) throws IOException {
+-    // create a proxy for the local filesystem
+-    // the scheme/authority serving as the proxy is derived
+-    // from the supplied URI
+-
+-    String scheme = name.getScheme();
+-    String authority = name.getAuthority() != null ? name.getAuthority() : "";
+-    String proxyUriString = name + "://" + authority + "/";
+-    fs = new ProxyFileSystem(localFs, URI.create(proxyUriString));
+-
+-    fs.initialize(name, conf);
+-  }
+-}
+Index: shims/src/0.20/java/org/apache/hadoop/fs/ProxyFileSystem.java
+===================================================================
+--- shims/src/0.20/java/org/apache/hadoop/fs/ProxyFileSystem.java	(revision 1203794)
++++ shims/src/0.20/java/org/apache/hadoop/fs/ProxyFileSystem.java	(working copy)
+@@ -1,273 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements.  See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership.  The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License.  You may obtain a copy of the License at
+- *
+- *     http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-
+-package org.apache.hadoop.fs;
+-
+-import java.io.IOException;
+-import java.net.URI;
+-import java.net.URISyntaxException;
+-
+-import org.apache.hadoop.conf.Configuration;
+-import org.apache.hadoop.fs.permission.FsPermission;
+-import org.apache.hadoop.util.Progressable;
+-
+-/****************************************************************
+- * A FileSystem that can serve a given scheme/authority using some
+- * other file system. In that sense, it serves as a proxy for the
+- * real/underlying file system
+- *****************************************************************/
+-
+-public class ProxyFileSystem extends FilterFileSystem {
+-
+-  protected String myScheme;
+-  protected String myAuthority;
+-  protected URI myUri;
+-
+-  protected String realScheme;
+-  protected String realAuthority;
+-  protected URI realUri;
+-
+-  
+-
+-  private Path swizzleParamPath(Path p) {
+-    return new Path (realScheme, realAuthority, p.toUri().getPath());
+-  }
+-
+-  private Path swizzleReturnPath(Path p) {
+-    return new Path (myScheme, myAuthority, p.toUri().getPath());
+-  }
+-
+-  private FileStatus swizzleFileStatus(FileStatus orig, boolean isParam) {
+-    FileStatus ret =
+-      new FileStatus(orig.getLen(), orig.isDir(), orig.getReplication(),
+-                     orig.getBlockSize(), orig.getModificationTime(),
+-                     orig.getAccessTime(), orig.getPermission(),
+-                     orig.getOwner(), orig.getGroup(),
+-                     isParam ? swizzleParamPath(orig.getPath()) :
+-                     swizzleReturnPath(orig.getPath()));
+-    return ret;
+-  }
+-
+-  public ProxyFileSystem() {
+-    throw new RuntimeException ("Unsupported constructor");
+-  }
+-  
+-  public ProxyFileSystem(FileSystem fs) {
+-    throw new RuntimeException ("Unsupported constructor");
+-  }
+-
+-  /**
+-   * Create a proxy file system for fs.
+-   * 
+-   * @param fs FileSystem to create proxy for
+-   * @param myUri URI to use as proxy. Only the scheme and authority from
+-   *              this are used right now
+-   */
+-  public ProxyFileSystem(FileSystem fs, URI myUri) {
+-    super(fs);
+-
+-    URI realUri = fs.getUri();
+-    this.realScheme = realUri.getScheme();
+-    this.realAuthority=realUri.getAuthority();
+-    this.realUri = realUri;
+-
+-    this.myScheme = myUri.getScheme();
+-    this.myAuthority=myUri.getAuthority();
+-    this.myUri = myUri;
+-  }
+-
+-  @Override
+-  public void initialize(URI name, Configuration conf) throws IOException {
+-    try {
+-      URI realUri = new URI (realScheme, realAuthority,
+-                            name.getPath(), name.getQuery(), name.getFragment());
+-      super.initialize(realUri, conf);
+-    } catch (URISyntaxException e) {
+-      throw new RuntimeException(e);
+-    }
+-  }
+-
+-  @Override
+-  public URI getUri() {
+-    return myUri;
+-  }
+-
+-  @Override
+-  public String getName() {
+-    return getUri().toString();
+-  }
+-
+-  @Override
+-  public Path makeQualified(Path path) {
+-    return swizzleReturnPath(super.makeQualified(swizzleParamPath(path)));
+-  }
+-
+-
+-  @Override
+-  protected void checkPath(Path path) {
+-    super.checkPath(swizzleParamPath(path));
+-  }
+-
+-  @Override
+-  public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
+-    long len) throws IOException {
+-    return super.getFileBlockLocations(swizzleFileStatus(file, true),
+-                                       start, len);
+-  }
+-
+-  @Override
+-  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+-    return super.open(swizzleParamPath(f), bufferSize);
+-  }
+-
+-  @Override
+-  public FSDataOutputStream append(Path f, int bufferSize,
+-      Progressable progress) throws IOException {
+-    return super.append(swizzleParamPath(f), bufferSize, progress);
+-  }
+-
+-  @Override
+-  public FSDataOutputStream create(Path f, FsPermission permission,
+-      boolean overwrite, int bufferSize, short replication, long blockSize,
+-      Progressable progress) throws IOException {
+-    return super.create(swizzleParamPath(f), permission,
+-        overwrite, bufferSize, replication, blockSize, progress);
+-  }
+-
+-  @Override
+-  public boolean setReplication(Path src, short replication) throws IOException {
+-    return super.setReplication(swizzleParamPath(src), replication);
+-  }
+-
+-  @Override
+-  public boolean rename(Path src, Path dst) throws IOException {
+-    return super.rename(swizzleParamPath(src), swizzleParamPath(dst));
+-  }
+-  
+-  @Override
+-  public boolean delete(Path f, boolean recursive) throws IOException {
+-    return super.delete(swizzleParamPath(f), recursive);
+-  }
+-
+-  @Override
+-  public boolean deleteOnExit(Path f) throws IOException {
+-    return super.deleteOnExit(swizzleParamPath(f));
+-  }    
+-    
+-  @Override
+-  public FileStatus[] listStatus(Path f) throws IOException {
+-    FileStatus[] orig = super.listStatus(swizzleParamPath(f));
+-    FileStatus[] ret = new FileStatus [orig.length];
+-    for (int i=0; i<orig.length; i++) {
+-      ret[i] = swizzleFileStatus(orig[i], false);
+-    }
+-    return ret;
+-  }
+-  
+-  @Override
+-  public Path getHomeDirectory() {
+-    return swizzleReturnPath(super.getHomeDirectory());
+-  }
+-
+-  @Override
+-  public void setWorkingDirectory(Path newDir) {
+-    super.setWorkingDirectory(swizzleParamPath(newDir));
+-  }
+-  
+-  @Override
+-  public Path getWorkingDirectory() {
+-    return swizzleReturnPath(super.getWorkingDirectory());
+-  }
+-  
+-  @Override
+-  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+-    return super.mkdirs(swizzleParamPath(f), permission);
+-  }
+-
+-  @Override
+-  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+-    throws IOException {
+-    super.copyFromLocalFile(delSrc, swizzleParamPath(src), swizzleParamPath(dst));
+-  }
+-
+-  @Override
+-  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
+-                                Path[] srcs, Path dst)
+-    throws IOException {
+-    super.copyFromLocalFile(delSrc, overwrite, srcs, swizzleParamPath(dst));
+-  }
+-  
+-  @Override
+-  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
+-                                Path src, Path dst)
+-    throws IOException {
+-    super.copyFromLocalFile(delSrc, overwrite, src, swizzleParamPath(dst));
+-  }
+-
+-  @Override
+-  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+-    throws IOException {
+-    super.copyToLocalFile(delSrc, swizzleParamPath(src), dst);
+-  }
+-
+-  @Override
+-  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+-    throws IOException {
+-    return super.startLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
+-  }
+-
+-  @Override
+-  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+-    throws IOException {
+-    super.completeLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
+-  }
+-
+-  @Override
+-  public ContentSummary getContentSummary(Path f) throws IOException {
+-    return super.getContentSummary(swizzleParamPath(f));
+-  }
+-
+-  @Override
+-  public FileStatus getFileStatus(Path f) throws IOException {
+-    return swizzleFileStatus(super.getFileStatus(swizzleParamPath(f)), false);
+-  }
+-
+-  @Override
+-  public FileChecksum getFileChecksum(Path f) throws IOException {
+-    return super.getFileChecksum(swizzleParamPath(f));
+-  }
+-  
+-  @Override
+-  public void setOwner(Path p, String username, String groupname
+-      ) throws IOException {
+-    super.setOwner(swizzleParamPath(p), username, groupname);
+-  }
+-
+-  @Override
+-  public void setTimes(Path p, long mtime, long atime
+-      ) throws IOException {
+-    super.setTimes(swizzleParamPath(p), mtime, atime);
+-  }
+-
+-  @Override
+-  public void setPermission(Path p, FsPermission permission
+-      ) throws IOException {
+-    super.setPermission(swizzleParamPath(p), permission);
+-  }
+-}
+-  
 Index: shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
 ===================================================================
---- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java	(revision 1196775)
+--- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java	(revision 1203794)
 +++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java	(working copy)
 @@ -35,6 +35,7 @@
  import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
@@ -80,7 +424,7 @@ Index: shims/src/0.20/java/org/apache/ha
  }
 Index: shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
 ===================================================================
---- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java	(revision 1196775)
+--- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java	(revision 1203794)
 +++ shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java	(working copy)
 @@ -38,6 +38,7 @@
  import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
@@ -139,11 +483,11 @@ Index: shims/src/0.20S/java/org/apache/h
 +    return new org.apache.hadoop.mapreduce.JobContext(job.getConfiguration(), job.getJobID());
 +  }
  }
-Index: shims/src/0.23/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
+Index: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector23.java
 ===================================================================
---- shims/src/0.23/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java	(revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java	(revision 0)
-@@ -0,0 +1,61 @@
+--- shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector23.java	(revision 0)
++++ shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector23.java	(revision 0)
+@@ -0,0 +1,33 @@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
@@ -162,60 +506,95 @@ Index: shims/src/0.23/java/org/apache/ha
 + * limitations under the License.
 + */
 +
-+package org.apache.hadoop.fs;
++package org.apache.hadoop.hive.thrift;
 +
-+import java.io.*;
-+import java.net.URI;
-+import java.net.URISyntaxException;
++import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 +
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.permission.FsPermission;
-+import org.apache.hadoop.util.Progressable;
++/**
++ * A delegation token that is specialized for Hive
++ */
 +
-+/****************************************************************
-+ * A Proxy for LocalFileSystem
++public class DelegationTokenSelector23
++    extends AbstractDelegationTokenSelector<DelegationTokenIdentifier23>{
++
++  public DelegationTokenSelector23() {
++    super(DelegationTokenIdentifier23.HIVE_DELEGATION_KIND);
++  }
++}
+
+Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector23.java
+___________________________________________________________________
+Added: svn:eol-style
+   + native
+
+Index: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier23.java
+===================================================================
+--- shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier23.java	(revision 0)
++++ shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier23.java	(revision 0)
+@@ -0,0 +1,52 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
 + *
-+ * Serves uri's corresponding to 'pfile:///' namespace with using
-+ * a LocalFileSystem 
-+ *****************************************************************/
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +
-+public class ProxyLocalFileSystem extends FilterFileSystem {
++package org.apache.hadoop.hive.thrift;
 +
-+  protected LocalFileSystem localFs;
++import org.apache.hadoop.io.Text;
++import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 +
-+  public ProxyLocalFileSystem() {
-+    localFs = new LocalFileSystem();
++/**
++ * A delegation token identifier that is specific to Hive.
++ */
++public class DelegationTokenIdentifier23
++    extends AbstractDelegationTokenIdentifier {
++  public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN");
++
++  /**
++   * Create an empty delegation token identifier for reading into.
++   */
++  public DelegationTokenIdentifier23() {
 +  }
 +
-+  public ProxyLocalFileSystem(FileSystem fs) {
-+    throw new RuntimeException ("Unsupported Constructor");
++  /**
++   * Create a new delegation token identifier
++   * @param owner the effective username of the token owner
++   * @param renewer the username of the renewer
++   * @param realUser the real username of the token owner
++   */
++  public DelegationTokenIdentifier23(Text owner, Text renewer, Text realUser) {
++    super(owner, renewer, realUser);
 +  }
 +
 +  @Override
-+  public void initialize(URI name, Configuration conf) throws IOException {
-+    // create a proxy for the local filesystem
-+    // the scheme/authority serving as the proxy is derived
-+    // from the supplied URI
-+
-+    String scheme = name.getScheme();
-+    String authority = name.getAuthority() != null ? name.getAuthority() : "";
-+    String proxyUriString = name + "://" + authority + "/";
-+    fs = new ProxyFileSystem(localFs, URI.create(proxyUriString));
-+
-+    fs.initialize(name, conf);
++  public Text getKind() {
++    return HIVE_DELEGATION_KIND;
 +  }
++
 +}
 
-Property changes on: shims/src/0.23/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
+Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier23.java
 ___________________________________________________________________
 Added: svn:eol-style
    + native
 
-Index: shims/src/0.23/java/org/apache/hadoop/fs/ProxyFileSystem.java
+Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
 ===================================================================
---- shims/src/0.23/java/org/apache/hadoop/fs/ProxyFileSystem.java	(revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/fs/ProxyFileSystem.java	(revision 0)
-@@ -0,0 +1,273 @@
+--- shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java	(revision 0)
++++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java	(revision 0)
+@@ -0,0 +1,546 @@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
@@ -233,434 +612,546 @@ Index: shims/src/0.23/java/org/apache/ha
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
++package org.apache.hadoop.hive.shims;
 +
-+package org.apache.hadoop.fs;
-+
++import java.io.DataInput;
++import java.io.DataOutput;
 +import java.io.IOException;
-+import java.net.URI;
-+import java.net.URISyntaxException;
++import java.lang.reflect.Constructor;
++import java.util.ArrayList;
++import java.util.List;
++
++import javax.security.auth.login.LoginException;
 +
 +import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.permission.FsPermission;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.PathFilter;
++import org.apache.hadoop.hdfs.MiniDFSCluster;
++import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
++import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
++import org.apache.hadoop.hive.shims.HadoopShims.JobTrackerState;
++import org.apache.hadoop.hive.thrift.DelegationTokenSelector23;
++import org.apache.hadoop.io.Text;
++import org.apache.hadoop.mapred.ClusterStatus;
++import org.apache.hadoop.mapred.FileInputFormat;
++import org.apache.hadoop.mapred.InputFormat;
++import org.apache.hadoop.mapred.InputSplit;
++import org.apache.hadoop.mapred.JobConf;
++import org.apache.hadoop.mapred.JobContext;
++import org.apache.hadoop.mapred.JobStatus;
++import org.apache.hadoop.mapred.OutputCommitter;
++import org.apache.hadoop.mapred.RecordReader;
++import org.apache.hadoop.mapred.Reporter;
++import org.apache.hadoop.mapred.RunningJob;
++import org.apache.hadoop.mapred.TaskAttemptContext;
++import org.apache.hadoop.mapred.TaskCompletionEvent;
++import org.apache.hadoop.mapred.TaskID;
++import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
++import org.apache.hadoop.mapred.lib.CombineFileSplit;
++import org.apache.hadoop.mapred.lib.NullOutputFormat;
++import org.apache.hadoop.mapreduce.Job;
++import org.apache.hadoop.mapreduce.TaskAttemptID;
++import org.apache.hadoop.mapreduce.task.JobContextImpl;
++import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
++import org.apache.hadoop.security.UserGroupInformation;
++import org.apache.hadoop.security.token.Token;
++import org.apache.hadoop.security.token.TokenIdentifier;
++import org.apache.hadoop.security.token.TokenSelector;
++import org.apache.hadoop.tools.HadoopArchives;
 +import org.apache.hadoop.util.Progressable;
++import org.apache.hadoop.util.ToolRunner;
 +
-+/****************************************************************
-+ * A FileSystem that can serve a given scheme/authority using some
-+ * other file system. In that sense, it serves as a proxy for the
-+ * real/underlying file system
-+ *****************************************************************/
-+
-+public class ProxyFileSystem extends FilterFileSystem {
-+
-+  protected String myScheme;
-+  protected String myAuthority;
-+  protected URI myUri;
-+
-+  protected String realScheme;
-+  protected String realAuthority;
-+  protected URI realUri;
-+
-+  
-+
-+  private Path swizzleParamPath(Path p) {
-+    return new Path (realScheme, realAuthority, p.toUri().getPath());
++/**
++ * Implemention of shims against Hadoop 0.23.0.
++ */
++public class Hadoop23Shims implements HadoopShims {
++  public boolean usesJobShell() {
++    return false;
 +  }
 +
-+  private Path swizzleReturnPath(Path p) {
-+    return new Path (myScheme, myAuthority, p.toUri().getPath());
++  public boolean fileSystemDeleteOnExit(FileSystem fs, Path path)
++      throws IOException {
++
++    return fs.deleteOnExit(path);
 +  }
 +
-+  private FileStatus swizzleFileStatus(FileStatus orig, boolean isParam) {
-+    FileStatus ret =
-+      new FileStatus(orig.getLen(), orig.isDir(), orig.getReplication(),
-+                     orig.getBlockSize(), orig.getModificationTime(),
-+                     orig.getAccessTime(), orig.getPermission(),
-+                     orig.getOwner(), orig.getGroup(),
-+                     isParam ? swizzleParamPath(orig.getPath()) :
-+                     swizzleReturnPath(orig.getPath()));
-+    return ret;
++  public void inputFormatValidateInput(InputFormat fmt, JobConf conf)
++      throws IOException {
++    // gone in 0.18+
 +  }
 +
-+  public ProxyFileSystem() {
-+    throw new RuntimeException ("Unsupported constructor");
++  public boolean isJobPreparing(RunningJob job) throws IOException {
++    return job.getJobState() == JobStatus.PREP;
 +  }
-+  
-+  public ProxyFileSystem(FileSystem fs) {
-+    throw new RuntimeException ("Unsupported constructor");
++  /**
++   * Workaround for hadoop-17 - jobclient only looks at commandlineconfig.
++   */
++  public void setTmpFiles(String prop, String files) {
++    // gone in 20+
++  }
++
++  public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf,
++      int numDataNodes,
++      boolean format,
++      String[] racks) throws IOException {
++    return new MiniDFSShim(new MiniDFSCluster(conf, numDataNodes, format, racks));
 +  }
 +
 +  /**
-+   * Create a proxy file system for fs.
-+   * 
-+   * @param fs FileSystem to create proxy for
-+   * @param myUri URI to use as proxy. Only the scheme and authority from
-+   *              this are used right now
++   * MiniDFSShim.
++   *
 +   */
-+  public ProxyFileSystem(FileSystem fs, URI myUri) {
-+    super(fs);
++  public class MiniDFSShim implements HadoopShims.MiniDFSShim {
++    private final MiniDFSCluster cluster;
 +
-+    URI realUri = fs.getUri();
-+    this.realScheme = realUri.getScheme();
-+    this.realAuthority=realUri.getAuthority();
-+    this.realUri = realUri;
++    public MiniDFSShim(MiniDFSCluster cluster) {
++      this.cluster = cluster;
++    }
 +
-+    this.myScheme = myUri.getScheme();
-+    this.myAuthority=myUri.getAuthority();
-+    this.myUri = myUri;
-+  }
++    public FileSystem getFileSystem() throws IOException {
++      return cluster.getFileSystem();
++    }
 +
-+  @Override
-+  public void initialize(URI name, Configuration conf) throws IOException {
-+    try {
-+      URI realUri = new URI (realScheme, realAuthority,
-+                            name.getPath(), name.getQuery(), name.getFragment());
-+      super.initialize(realUri, conf);
-+    } catch (URISyntaxException e) {
-+      throw new RuntimeException(e);
++    public void shutdown() {
++      cluster.shutdown();
 +    }
 +  }
 +
-+  @Override
-+  public URI getUri() {
-+    return myUri;
++  /**
++   * We define this function here to make the code compatible between
++   * hadoop 0.17 and hadoop 0.20.
++   *
++   * Hive binary that compiled Text.compareTo(Text) with hadoop 0.20 won't
++   * work with hadoop 0.17 because in hadoop 0.20, Text.compareTo(Text) is
++   * implemented in org.apache.hadoop.io.BinaryComparable, and Java compiler
++   * references that class, which is not available in hadoop 0.17.
++   */
++  public int compareText(Text a, Text b) {
++    return a.compareTo(b);
 +  }
 +
 +  @Override
-+  public String getName() {
-+    return getUri().toString();
++  public long getAccessTime(FileStatus file) {
++    return file.getAccessTime();
 +  }
 +
-+  @Override
-+  public Path makeQualified(Path path) {
-+    return swizzleReturnPath(super.makeQualified(swizzleParamPath(path)));
++  public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() {
++    return new CombineFileInputFormatShim() {
++      @Override
++      public RecordReader getRecordReader(InputSplit split,
++          JobConf job, Reporter reporter) throws IOException {
++        throw new IOException("CombineFileInputFormat.getRecordReader not needed.");
++      }
++    };
 +  }
 +
++  public static class InputSplitShim extends CombineFileSplit implements HadoopShims.InputSplitShim {
++    long shrinkedLength;
++    boolean _isShrinked;
++    public InputSplitShim() {
++      super();
++      _isShrinked = false;
++    }
 +
-+  @Override
-+  protected void checkPath(Path path) {
-+    super.checkPath(swizzleParamPath(path));
-+  }
++    public InputSplitShim(CombineFileSplit old) throws IOException {
++      super(old);
++      _isShrinked = false;
++    }
 +
-+  @Override
-+  public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
-+    long len) throws IOException {
-+    return super.getFileBlockLocations(swizzleFileStatus(file, true),
-+                                       start, len);
-+  }
++    @Override
++    public void shrinkSplit(long length) {
++      _isShrinked = true;
++      shrinkedLength = length;
++    }
 +
-+  @Override
-+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-+    return super.open(swizzleParamPath(f), bufferSize);
-+  }
++    public boolean isShrinked() {
++      return _isShrinked;
++    }
 +
-+  @Override
-+  public FSDataOutputStream append(Path f, int bufferSize,
-+      Progressable progress) throws IOException {
-+    return super.append(swizzleParamPath(f), bufferSize, progress);
-+  }
++    public long getShrinkedLength() {
++      return shrinkedLength;
++    }
 +
-+  @Override
-+  public FSDataOutputStream create(Path f, FsPermission permission,
-+      boolean overwrite, int bufferSize, short replication, long blockSize,
-+      Progressable progress) throws IOException {
-+    return super.create(swizzleParamPath(f), permission,
-+        overwrite, bufferSize, replication, blockSize, progress);
-+  }
++    @Override
++    public void readFields(DataInput in) throws IOException {
++      super.readFields(in);
++      _isShrinked = in.readBoolean();
++      if (_isShrinked) {
++        shrinkedLength = in.readLong();
++      }
++    }
 +
-+  @Override
-+  public boolean setReplication(Path src, short replication) throws IOException {
-+    return super.setReplication(swizzleParamPath(src), replication);
++    @Override
++    public void write(DataOutput out) throws IOException {
++      super.write(out);
++      out.writeBoolean(_isShrinked);
++      if (_isShrinked) {
++        out.writeLong(shrinkedLength);
++      }
++    }
 +  }
 +
-+  @Override
-+  public boolean rename(Path src, Path dst) throws IOException {
-+    return super.rename(swizzleParamPath(src), swizzleParamPath(dst));
-+  }
-+  
-+  @Override
-+  public boolean delete(Path f, boolean recursive) throws IOException {
-+    return super.delete(swizzleParamPath(f), recursive);
-+  }
++  /* This class should be replaced with org.apache.hadoop.mapred.lib.CombineFileRecordReader class, once
++   * https://issues.apache.org/jira/browse/MAPREDUCE-955 is fixed. This code should be removed - it is a copy
++   * of org.apache.hadoop.mapred.lib.CombineFileRecordReader
++   */
++  public static class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
 +
-+  @Override
-+  public boolean deleteOnExit(Path f) throws IOException {
-+    return super.deleteOnExit(swizzleParamPath(f));
-+  }    
++    static final Class[] constructorSignature = new Class[] {
++        InputSplit.class,
++        Configuration.class,
++        Reporter.class,
++        Integer.class
++        };
++
++    protected CombineFileSplit split;
++    protected JobConf jc;
++    protected Reporter reporter;
++    protected Class<RecordReader<K, V>> rrClass;
++    protected Constructor<RecordReader<K, V>> rrConstructor;
++    protected FileSystem fs;
++
++    protected int idx;
++    protected long progress;
++    protected RecordReader<K, V> curReader;
++    protected boolean isShrinked;
++    protected long shrinkedLength;
 +    
-+  @Override
-+  public FileStatus[] listStatus(Path f) throws IOException {
-+    FileStatus[] orig = super.listStatus(swizzleParamPath(f));
-+    FileStatus[] ret = new FileStatus [orig.length];
-+    for (int i=0; i<orig.length; i++) {
-+      ret[i] = swizzleFileStatus(orig[i], false);
++    public boolean next(K key, V value) throws IOException {
++
++      while ((curReader == null)
++          || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
++              value)) {
++        if (!initNextRecordReader(key)) {
++          return false;
++        }
++      }
++      return true;
 +    }
-+    return ret;
-+  }
-+  
-+  @Override
-+  public Path getHomeDirectory() {
-+    return swizzleReturnPath(super.getHomeDirectory());
-+  }
 +
-+  @Override
-+  public void setWorkingDirectory(Path newDir) {
-+    super.setWorkingDirectory(swizzleParamPath(newDir));
-+  }
-+  
-+  @Override
-+  public Path getWorkingDirectory() {
-+    return swizzleReturnPath(super.getWorkingDirectory());
-+  }
-+  
-+  @Override
-+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-+    return super.mkdirs(swizzleParamPath(f), permission);
-+  }
++    public K createKey() {
++      K newKey = curReader.createKey();
++      return (K)(new CombineHiveKey(newKey));
++    }
 +
-+  @Override
-+  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
-+    throws IOException {
-+    super.copyFromLocalFile(delSrc, swizzleParamPath(src), swizzleParamPath(dst));
-+  }
++    public V createValue() {
++      return curReader.createValue();
++    }
 +
-+  @Override
-+  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
-+                                Path[] srcs, Path dst)
-+    throws IOException {
-+    super.copyFromLocalFile(delSrc, overwrite, srcs, swizzleParamPath(dst));
-+  }
-+  
-+  @Override
-+  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
-+                                Path src, Path dst)
-+    throws IOException {
-+    super.copyFromLocalFile(delSrc, overwrite, src, swizzleParamPath(dst));
-+  }
++    /**
++     * Return the amount of data processed.
++     */
++    public long getPos() throws IOException {
++      return progress;
++    }
 +
-+  @Override
-+  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
-+    throws IOException {
-+    super.copyToLocalFile(delSrc, swizzleParamPath(src), dst);
-+  }
++    public void close() throws IOException {
++      if (curReader != null) {
++        curReader.close();
++        curReader = null;
++      }
++    }
 +
-+  @Override
-+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
-+    throws IOException {
-+    return super.startLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
-+  }
++    /**
++     * Return progress based on the amount of data processed so far.
++     */
++    public float getProgress() throws IOException {
++      return Math.min(1.0f, progress / (float) (split.getLength()));
++    }
 +
-+  @Override
-+  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
-+    throws IOException {
-+    super.completeLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
-+  }
++    /**
++     * A generic RecordReader that can hand out different recordReaders
++     * for each chunk in the CombineFileSplit.
++     */
++    public CombineFileRecordReader(JobConf job, CombineFileSplit split,
++        Reporter reporter,
++        Class<RecordReader<K, V>> rrClass)
++        throws IOException {
++      this.split = split;
++      this.jc = job;
++      this.rrClass = rrClass;
++      this.reporter = reporter;
++      this.idx = 0;
++      this.curReader = null;
++      this.progress = 0;
 +
-+  @Override
-+  public ContentSummary getContentSummary(Path f) throws IOException {
-+    return super.getContentSummary(swizzleParamPath(f));
-+  }
++      isShrinked = false;
 +
-+  @Override
-+  public FileStatus getFileStatus(Path f) throws IOException {
-+    return swizzleFileStatus(super.getFileStatus(swizzleParamPath(f)), false);
-+  }
++      assert (split instanceof InputSplitShim);
++      if (((InputSplitShim) split).isShrinked()) {
++        isShrinked = true;
++        shrinkedLength = ((InputSplitShim) split).getShrinkedLength();
++      }
 +
-+  @Override
-+  public FileChecksum getFileChecksum(Path f) throws IOException {
-+    return super.getFileChecksum(swizzleParamPath(f));
-+  }
-+  
-+  @Override
-+  public void setOwner(Path p, String username, String groupname
-+      ) throws IOException {
-+    super.setOwner(swizzleParamPath(p), username, groupname);
-+  }
++      try {
++        rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
++        rrConstructor.setAccessible(true);
++      } catch (Exception e) {
++        throw new RuntimeException(rrClass.getName() +
++            " does not have valid constructor", e);
++      }
++      initNextRecordReader(null);
++    }
++    
++    /**
++     * do next and handle exception inside it. 
++     * @param key
++     * @param value
++     * @return
++     * @throws IOException
++     */
++    private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
++      try {
++        return curReader.next(key, value);
++      } catch (Exception e) {
++        return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jc);
++      }
++    }
 +
-+  @Override
-+  public void setTimes(Path p, long mtime, long atime
-+      ) throws IOException {
-+    super.setTimes(swizzleParamPath(p), mtime, atime);
-+  }
++    /**
++     * Get the record reader for the next chunk in this CombineFileSplit.
++     */
++    protected boolean initNextRecordReader(K key) throws IOException {
 +
-+  @Override
-+  public void setPermission(Path p, FsPermission permission
-+      ) throws IOException {
-+    super.setPermission(swizzleParamPath(p), permission);
-+  }
-+}
-+  
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/fs/ProxyFileSystem.java
-___________________________________________________________________
-Added: svn:eol-style
-   + native
-
-Index: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java
-===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java	(revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java	(revision 0)
-@@ -0,0 +1,52 @@
-+/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
++      if (curReader != null) {
++        curReader.close();
++        curReader = null;
++        if (idx > 0) {
++          progress += split.getLength(idx - 1); // done processing so far
++        }
++      }
 +
-+package org.apache.hadoop.hive.thrift;
++      // if all chunks have been processed or reached the length, nothing more to do.
++      if (idx == split.getNumPaths() || (isShrinked && progress > shrinkedLength)) {
++        return false;
++      }
 +
-+import org.apache.hadoop.io.Text;
-+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
++      // get a record reader for the idx-th chunk
++      try {
++        curReader = rrConstructor.newInstance(new Object[]
++            {split, jc, reporter, Integer.valueOf(idx)});
 +
-+/**
-+ * A delegation token identifier that is specific to Hive.
-+ */
-+public class DelegationTokenIdentifier
-+    extends AbstractDelegationTokenIdentifier {
-+  public static final Text HIVE_DELEGATION_KIND = new Text("HIVE_DELEGATION_TOKEN");
++        // change the key if need be
++        if (key != null) {
++          K newKey = curReader.createKey();
++          ((CombineHiveKey)key).setKey(newKey);
++        }
 +
-+  /**
-+   * Create an empty delegation token identifier for reading into.
-+   */
-+  public DelegationTokenIdentifier() {
++        // setup some helper config variables.
++        jc.set("map.input.file", split.getPath(idx).toString());
++        jc.setLong("map.input.start", split.getOffset(idx));
++        jc.setLong("map.input.length", split.getLength(idx));
++      } catch (Exception e) {
++        curReader=HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jc);
++      }
++      idx++;
++      return true;
++    }
 +  }
 +
-+  /**
-+   * Create a new delegation token identifier
-+   * @param owner the effective username of the token owner
-+   * @param renewer the username of the renewer
-+   * @param realUser the real username of the token owner
-+   */
-+  public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
-+    super(owner, renewer, realUser);
-+  }
++  public abstract static class CombineFileInputFormatShim<K, V> extends
++      CombineFileInputFormat<K, V>
++      implements HadoopShims.CombineFileInputFormatShim<K, V> {
 +
-+  @Override
-+  public Text getKind() {
-+    return HIVE_DELEGATION_KIND;
-+  }
++    public Path[] getInputPathsShim(JobConf conf) {
++      try {
++        return FileInputFormat.getInputPaths(conf);
++      } catch (Exception e) {
++        throw new RuntimeException(e);
++      }
++    }
 +
-+}
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenIdentifier.java
-___________________________________________________________________
-Added: svn:eol-style
-   + native
-
-Index: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java
-===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java	(revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java	(revision 0)
-@@ -0,0 +1,87 @@
-+/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
++    @Override
++    public void createPool(JobConf conf, PathFilter... filters) {
++      super.createPool(conf, filters);
++    }
 +
-+package org.apache.hadoop.hive.thrift;
++    @Override
++    public InputSplitShim[] getSplits(JobConf job, int numSplits) throws IOException {
++      long minSize = job.getLong("mapred.min.split.size", 0);
 +
-+import java.io.IOException;
++      // For backward compatibility, let the above parameter be used
++      if (job.getLong("mapred.min.split.size.per.node", 0) == 0) {
++        super.setMinSplitSizeNode(minSize);
++      }
 +
-+import org.apache.hadoop.io.Text;
-+import org.apache.hadoop.security.UserGroupInformation;
-+import org.apache.hadoop.security.token.Token;
-+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
++      if (job.getLong("mapred.min.split.size.per.rack", 0) == 0) {
++        super.setMinSplitSizeRack(minSize);
++      }
 +
-+/**
-+ * A Hive specific delegation token secret manager.
-+ * The secret manager is responsible for generating and accepting the password
-+ * for each token.
-+ */
-+public class DelegationTokenSecretManager
-+    extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
++      if (job.getLong("mapred.max.split.size", 0) == 0) {
++        super.setMaxSplitSize(minSize);
++      }
 +
-+  /**
-+   * Create a secret manager
-+   * @param delegationKeyUpdateInterval the number of seconds for rolling new
-+   *        secret keys.
-+   * @param delegationTokenMaxLifetime the maximum lifetime of the delegation
-+   *        tokens
-+   * @param delegationTokenRenewInterval how often the tokens must be renewed
-+   * @param delegationTokenRemoverScanInterval how often the tokens are scanned
-+   *        for expired tokens
-+   */
-+  public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
-+                                      long delegationTokenMaxLifetime,
-+                                      long delegationTokenRenewInterval,
-+                                      long delegationTokenRemoverScanInterval) {
-+    super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
-+          delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
-+  }
++      CombineFileSplit[] splits = (CombineFileSplit[]) super.getSplits(job, numSplits);
 +
-+  @Override
-+  public DelegationTokenIdentifier createIdentifier() {
-+    return new DelegationTokenIdentifier();
-+  }
++      InputSplitShim[] isplits = new InputSplitShim[splits.length];
++      for (int pos = 0; pos < splits.length; pos++) {
++        isplits[pos] = new InputSplitShim(splits[pos]);
++      }
 +
-+  public synchronized void cancelDelegationToken(String tokenStrForm) throws IOException {
-+    Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
-+    t.decodeFromUrlString(tokenStrForm);
-+    String user = UserGroupInformation.getCurrentUser().getUserName();
-+    cancelToken(t, user);
-+  }
++      return isplits;
++    }
 +
-+  public synchronized long renewDelegationToken(String tokenStrForm) throws IOException {
-+    Token<DelegationTokenIdentifier> t= new Token<DelegationTokenIdentifier>();
-+    t.decodeFromUrlString(tokenStrForm);
-+    String user = UserGroupInformation.getCurrentUser().getUserName();
-+    return renewToken(t, user);
-+  }
++    public InputSplitShim getInputSplitShim() throws IOException {
++      return new InputSplitShim();
++    }
++
++    public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split,
++        Reporter reporter,
++        Class<RecordReader<K, V>> rrClass)
++        throws IOException {
++      CombineFileSplit cfSplit = (CombineFileSplit) split;
++      return new CombineFileRecordReader(job, cfSplit, reporter, rrClass);
++    }
 +
-+  public synchronized String getDelegationToken(String renewer) throws IOException {
-+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-+    Text owner = new Text(ugi.getUserName());
-+    Text realUser = null;
-+    if (ugi.getRealUser() != null) {
-+      realUser = new Text(ugi.getRealUser().getUserName());
-+    }
-+    DelegationTokenIdentifier ident =
-+      new DelegationTokenIdentifier(owner, new Text(renewer), realUser);
-+    Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>(
-+        ident, this);
-+    return t.encodeToUrlString();
 +  }
-+}
 +
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSecretManager.java
++  public String getInputFormatClassName() {
++    return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat";
++  }
++
++  String[] ret = new String[2];
++
++  @Override
++  public String[] getTaskJobIDs(TaskCompletionEvent t) {
++    TaskID tid = t.getTaskAttemptId().getTaskID();
++    ret[0] = tid.toString();
++    ret[1] = tid.getJobID().toString();
++    return ret;
++  }
++
++  public void setFloatConf(Configuration conf, String varName, float val) {
++    conf.setFloat(varName, val);
++  }
++
++  @Override
++  public int createHadoopArchive(Configuration conf, Path sourceDir, Path destDir,
++      String archiveName) throws Exception {
++
++    HadoopArchives har = new HadoopArchives(conf);
++    List<String> args = new ArrayList<String>();
++
++    if (conf.get("hive.archive.har.parentdir.settable") == null) {
++      throw new RuntimeException("hive.archive.har.parentdir.settable is not set");
++    }
++    boolean parentSettable =
++      conf.getBoolean("hive.archive.har.parentdir.settable", false);
++
++    if (parentSettable) {
++      args.add("-archiveName");
++      args.add(archiveName);
++      args.add("-p");
++      args.add(sourceDir.toString());
++      args.add(destDir.toString());
++    } else {
++      args.add("-archiveName");
++      args.add(archiveName);
++      args.add(sourceDir.toString());
++      args.add(destDir.toString());
++    }
++
++    return ToolRunner.run(har, args.toArray(new String[0]));
++  }
++
++  public static class NullOutputCommitter extends OutputCommitter {
++    @Override
++    public void setupJob(JobContext jobContext) { }
++    @Override
++    public void cleanupJob(JobContext jobContext) { }
++
++    @Override
++    public void setupTask(TaskAttemptContext taskContext) { }
++    @Override
++    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
++      return false;
++    }
++    @Override
++    public void commitTask(TaskAttemptContext taskContext) { }
++    @Override
++    public void abortTask(TaskAttemptContext taskContext) { }
++  }
++
++  public void setNullOutputFormat(JobConf conf) {
++    conf.setOutputFormat(NullOutputFormat.class);
++    conf.setOutputCommitter(Hadoop23Shims.NullOutputCommitter.class);
++
++    // option to bypass job setup and cleanup was introduced in hadoop-21 (MAPREDUCE-463)
++    // but can be backported. So we disable setup/cleanup in all versions >= 0.19
++    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
++
++    // option to bypass task cleanup task was introduced in hadoop-23 (MAPREDUCE-2206)
++    // but can be backported. So we disable setup/cleanup in all versions >= 0.19
++    conf.setBoolean("mapreduce.job.committer.task.cleanup.needed", false);
++  }
++
++  @Override
++  public UserGroupInformation getUGIForConf(Configuration conf) throws IOException {
++    return UserGroupInformation.getCurrentUser();
++  }
++
++  @Override
++  public boolean isSecureShimImpl() {
++    return true;
++  }
++
++  @Override
++  public String getShortUserName(UserGroupInformation ugi) {
++    return ugi.getShortUserName();
++  }
++
++  @Override
++  public String getTokenStrForm(String tokenSignature) throws IOException {
++    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
++    TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector23();
++
++    Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
++        tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens());
++    return token != null ? token.encodeToUrlString() : null;
++  }
++  
++  @Override
++  public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception {
++    JobTrackerState state;
++    switch (clusterStatus.getJobTrackerStatus()) {
++    case INITIALIZING:
++      return JobTrackerState.INITIALIZING;
++    case RUNNING:
++      return JobTrackerState.RUNNING;
++    default:
++      String errorMsg = "Unrecognized JobTracker state: " + clusterStatus.getJobTrackerStatus();
++      throw new Exception(errorMsg);
++    }
++  }
++  
++  @Override
++  public org.apache.hadoop.mapreduce.TaskAttemptContext newTaskAttemptContext(Configuration conf, final Progressable progressable) {
++    return new TaskAttemptContextImpl(conf, new TaskAttemptID()) {
++      @Override
++      public void progress() {
++        progressable.progress();
++      }
++    };
++  }
++
++  @Override
++  public org.apache.hadoop.mapreduce.JobContext newJobContext(Job job) {
++    return new JobContextImpl(job.getConfiguration(), job.getJobID());
++  }
++}
+
+Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
 ___________________________________________________________________
 Added: svn:eol-style
    + native
 
-Index: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
+Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java
 ===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java	(revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java	(revision 0)
-@@ -0,0 +1,33 @@
+--- shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java	(revision 0)
++++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java	(revision 0)
+@@ -0,0 +1,56 @@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
@@ -678,48 +1169,56 @@ Index: shims/src/0.23/java/org/apache/ha
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
++package org.apache.hadoop.hive.shims;
 +
-+package org.apache.hadoop.hive.thrift;
++import java.io.IOException;
 +
-+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
++import org.mortbay.jetty.bio.SocketConnector;
++import org.mortbay.jetty.handler.RequestLogHandler;
++import org.mortbay.jetty.webapp.WebAppContext;
 +
 +/**
-+ * A delegation token that is specialized for Hive
++ * Jetty23Shims.
++ *
 + */
++public class Jetty23Shims implements JettyShims {
++  public Server startServer(String listen, int port) throws IOException {
++    Server s = new Server();
++    s.setupListenerHostPort(listen, port);
++    return s;
++  }
++
++  private static class Server extends org.mortbay.jetty.Server implements JettyShims.Server {
++    public void addWar(String war, String contextPath) {
++      WebAppContext wac = new WebAppContext();
++      wac.setContextPath(contextPath);
++      wac.setWar(war);
++      RequestLogHandler rlh = new RequestLogHandler();
++      rlh.setHandler(wac);
++      this.addHandler(rlh);
++    }
 +
-+public class DelegationTokenSelector
-+    extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
++    public void setupListenerHostPort(String listen, int port)
++        throws IOException {
 +
-+  public DelegationTokenSelector() {
-+    super(DelegationTokenIdentifier.HIVE_DELEGATION_KIND);
++      SocketConnector connector = new SocketConnector();
++      connector.setPort(port);
++      connector.setHost(listen);
++      this.addConnector(connector);
++    }
 +  }
 +}
 
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/thrift/DelegationTokenSelector.java
-___________________________________________________________________
-Added: svn:eol-style
-   + native
-
-Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/EmptyShim.java
-===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/shims/EmptyShim.java	(revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/shims/EmptyShim.java	(revision 0)
-@@ -0,0 +1,4 @@
-+package org.apache.hadoop.hive.shims;
-+
-+class EmptyShim {
-+}
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/EmptyShim.java
+Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java
 ___________________________________________________________________
 Added: svn:eol-style
    + native
 
-Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+Index: shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
 ===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java	(revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java	(revision 0)
-@@ -0,0 +1,546 @@
+--- shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java	(revision 0)
++++ shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java	(revision 0)
+@@ -0,0 +1,61 @@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
@@ -737,688 +1236,343 @@ Index: shims/src/0.23/java/org/apache/ha
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
-+package org.apache.hadoop.hive.shims;
 +
-+import java.io.DataInput;
-+import java.io.DataOutput;
-+import java.io.IOException;
-+import java.lang.reflect.Constructor;
-+import java.util.ArrayList;
-+import java.util.List;
++package org.apache.hadoop.fs;
 +
-+import javax.security.auth.login.LoginException;
++import java.io.*;
++import java.net.URI;
++import java.net.URISyntaxException;
 +
 +import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.FileStatus;
-+import org.apache.hadoop.fs.FileSystem;
-+import org.apache.hadoop.fs.Path;
-+import org.apache.hadoop.fs.PathFilter;
-+import org.apache.hadoop.hdfs.MiniDFSCluster;
-+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain;
-+import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
-+import org.apache.hadoop.hive.shims.HadoopShims.JobTrackerState;
-+import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
-+import org.apache.hadoop.io.Text;
-+import org.apache.hadoop.mapred.ClusterStatus;
-+import org.apache.hadoop.mapred.FileInputFormat;
-+import org.apache.hadoop.mapred.InputFormat;
-+import org.apache.hadoop.mapred.InputSplit;
-+import org.apache.hadoop.mapred.JobConf;
-+import org.apache.hadoop.mapred.JobContext;
-+import org.apache.hadoop.mapred.JobStatus;
-+import org.apache.hadoop.mapred.OutputCommitter;
-+import org.apache.hadoop.mapred.RecordReader;
-+import org.apache.hadoop.mapred.Reporter;
-+import org.apache.hadoop.mapred.RunningJob;
-+import org.apache.hadoop.mapred.TaskAttemptContext;
-+import org.apache.hadoop.mapred.TaskCompletionEvent;
-+import org.apache.hadoop.mapred.TaskID;
-+import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
-+import org.apache.hadoop.mapred.lib.CombineFileSplit;
-+import org.apache.hadoop.mapred.lib.NullOutputFormat;
-+import org.apache.hadoop.mapreduce.Job;
-+import org.apache.hadoop.mapreduce.TaskAttemptID;
-+import org.apache.hadoop.mapreduce.task.JobContextImpl;
-+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-+import org.apache.hadoop.security.UserGroupInformation;
-+import org.apache.hadoop.security.token.Token;
-+import org.apache.hadoop.security.token.TokenIdentifier;
-+import org.apache.hadoop.security.token.TokenSelector;
-+import org.apache.hadoop.tools.HadoopArchives;
-+import org.apache.hadoop.util.Progressable;
-+import org.apache.hadoop.util.ToolRunner;
-+
-+/**
-+ * Implemention of shims against Hadoop 0.23.0.
-+ */
-+public class Hadoop23Shims implements HadoopShims {
-+  public boolean usesJobShell() {
-+    return false;
-+  }
-+
-+  public boolean fileSystemDeleteOnExit(FileSystem fs, Path path)
-+      throws IOException {
-+
-+    return fs.deleteOnExit(path);
-+  }
-+
-+  public void inputFormatValidateInput(InputFormat fmt, JobConf conf)
-+      throws IOException {
-+    // gone in 0.18+
-+  }
-+
-+  public boolean isJobPreparing(RunningJob job) throws IOException {
-+    return job.getJobState() == JobStatus.PREP;
-+  }
-+  /**
-+   * Workaround for hadoop-17 - jobclient only looks at commandlineconfig.
-+   */
-+  public void setTmpFiles(String prop, String files) {
-+    // gone in 20+
-+  }
-+
-+  public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf,
-+      int numDataNodes,
-+      boolean format,
-+      String[] racks) throws IOException {
-+    return new MiniDFSShim(new MiniDFSCluster(conf, numDataNodes, format, racks));
-+  }
-+
-+  /**
-+   * MiniDFSShim.
-+   *
-+   */
-+  public class MiniDFSShim implements HadoopShims.MiniDFSShim {
-+    private final MiniDFSCluster cluster;
-+
-+    public MiniDFSShim(MiniDFSCluster cluster) {
-+      this.cluster = cluster;
-+    }
-+
-+    public FileSystem getFileSystem() throws IOException {
-+      return cluster.getFileSystem();
-+    }
-+
-+    public void shutdown() {
-+      cluster.shutdown();
-+    }
-+  }
-+
-+  /**
-+   * We define this function here to make the code compatible between
-+   * hadoop 0.17 and hadoop 0.20.
-+   *
-+   * Hive binary that compiled Text.compareTo(Text) with hadoop 0.20 won't
-+   * work with hadoop 0.17 because in hadoop 0.20, Text.compareTo(Text) is
-+   * implemented in org.apache.hadoop.io.BinaryComparable, and Java compiler
-+   * references that class, which is not available in hadoop 0.17.
-+   */
-+  public int compareText(Text a, Text b) {
-+    return a.compareTo(b);
-+  }
-+
-+  @Override
-+  public long getAccessTime(FileStatus file) {
-+    return file.getAccessTime();
-+  }
-+
-+  public HadoopShims.CombineFileInputFormatShim getCombineFileInputFormat() {
-+    return new CombineFileInputFormatShim() {
-+      @Override
-+      public RecordReader getRecordReader(InputSplit split,
-+          JobConf job, Reporter reporter) throws IOException {
-+        throw new IOException("CombineFileInputFormat.getRecordReader not needed.");
-+      }
-+    };
-+  }
-+
-+  public static class InputSplitShim extends CombineFileSplit implements HadoopShims.InputSplitShim {
-+    long shrinkedLength;
-+    boolean _isShrinked;
-+    public InputSplitShim() {
-+      super();
-+      _isShrinked = false;
-+    }
-+
-+    public InputSplitShim(CombineFileSplit old) throws IOException {
-+      super(old);
-+      _isShrinked = false;
-+    }
-+
-+    @Override
-+    public void shrinkSplit(long length) {
-+      _isShrinked = true;
-+      shrinkedLength = length;
-+    }
-+
-+    public boolean isShrinked() {
-+      return _isShrinked;
-+    }
-+
-+    public long getShrinkedLength() {
-+      return shrinkedLength;
-+    }
-+
-+    @Override
-+    public void readFields(DataInput in) throws IOException {
-+      super.readFields(in);
-+      _isShrinked = in.readBoolean();
-+      if (_isShrinked) {
-+        shrinkedLength = in.readLong();
-+      }
-+    }
-+
-+    @Override
-+    public void write(DataOutput out) throws IOException {
-+      super.write(out);
-+      out.writeBoolean(_isShrinked);
-+      if (_isShrinked) {
-+        out.writeLong(shrinkedLength);
-+      }
-+    }
-+  }
-+
-+  /* This class should be replaced with org.apache.hadoop.mapred.lib.CombineFileRecordReader class, once
-+   * https://issues.apache.org/jira/browse/MAPREDUCE-955 is fixed. This code should be removed - it is a copy
-+   * of org.apache.hadoop.mapred.lib.CombineFileRecordReader
-+   */
-+  public static class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
-+
-+    static final Class[] constructorSignature = new Class[] {
-+        InputSplit.class,
-+        Configuration.class,
-+        Reporter.class,
-+        Integer.class
-+        };
-+
-+    protected CombineFileSplit split;
-+    protected JobConf jc;
-+    protected Reporter reporter;
-+    protected Class<RecordReader<K, V>> rrClass;
-+    protected Constructor<RecordReader<K, V>> rrConstructor;
-+    protected FileSystem fs;
-+
-+    protected int idx;
-+    protected long progress;
-+    protected RecordReader<K, V> curReader;
-+    protected boolean isShrinked;
-+    protected long shrinkedLength;
-+    
-+    public boolean next(K key, V value) throws IOException {
-+
-+      while ((curReader == null)
-+          || !doNextWithExceptionHandler((K) ((CombineHiveKey) key).getKey(),
-+              value)) {
-+        if (!initNextRecordReader(key)) {
-+          return false;
-+        }
-+      }
-+      return true;
-+    }
-+
-+    public K createKey() {
-+      K newKey = curReader.createKey();
-+      return (K)(new CombineHiveKey(newKey));
-+    }
-+
-+    public V createValue() {
-+      return curReader.createValue();
-+    }
-+
-+    /**
-+     * Return the amount of data processed.
-+     */
-+    public long getPos() throws IOException {
-+      return progress;
-+    }
-+
-+    public void close() throws IOException {
-+      if (curReader != null) {
-+        curReader.close();
-+        curReader = null;
-+      }
-+    }
-+
-+    /**
-+     * Return progress based on the amount of data processed so far.
-+     */
-+    public float getProgress() throws IOException {
-+      return Math.min(1.0f, progress / (float) (split.getLength()));
-+    }
-+
-+    /**
-+     * A generic RecordReader that can hand out different recordReaders
-+     * for each chunk in the CombineFileSplit.
-+     */
-+    public CombineFileRecordReader(JobConf job, CombineFileSplit split,
-+        Reporter reporter,
-+        Class<RecordReader<K, V>> rrClass)
-+        throws IOException {
-+      this.split = split;
-+      this.jc = job;
-+      this.rrClass = rrClass;
-+      this.reporter = reporter;
-+      this.idx = 0;
-+      this.curReader = null;
-+      this.progress = 0;
-+
-+      isShrinked = false;
-+
-+      assert (split instanceof InputSplitShim);
-+      if (((InputSplitShim) split).isShrinked()) {
-+        isShrinked = true;
-+        shrinkedLength = ((InputSplitShim) split).getShrinkedLength();
-+      }
-+
-+      try {
-+        rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
-+        rrConstructor.setAccessible(true);
-+      } catch (Exception e) {
-+        throw new RuntimeException(rrClass.getName() +
-+            " does not have valid constructor", e);
-+      }
-+      initNextRecordReader(null);
-+    }
-+    
-+    /**
-+     * do next and handle exception inside it. 
-+     * @param key
-+     * @param value
-+     * @return
-+     * @throws IOException
-+     */
-+    private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
-+      try {
-+        return curReader.next(key, value);
-+      } catch (Exception e) {
-+        return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jc);
-+      }
-+    }
-+
-+    /**
-+     * Get the record reader for the next chunk in this CombineFileSplit.
-+     */
-+    protected boolean initNextRecordReader(K key) throws IOException {
-+
-+      if (curReader != null) {
-+        curReader.close();
-+        curReader = null;
-+        if (idx > 0) {
-+          progress += split.getLength(idx - 1); // done processing so far
-+        }
-+      }
++import org.apache.hadoop.fs.permission.FsPermission;
++import org.apache.hadoop.util.Progressable;
 +
-+      // if all chunks have been processed or reached the length, nothing more to do.
-+      if (idx == split.getNumPaths() || (isShrinked && progress > shrinkedLength)) {
-+        return false;
-+      }
++/****************************************************************
++ * A Proxy for LocalFileSystem
++ *
++ * Serves uri's corresponding to 'pfile:///' namespace with using
++ * a LocalFileSystem 
++ *****************************************************************/
 +
-+      // get a record reader for the idx-th chunk
-+      try {
-+        curReader = rrConstructor.newInstance(new Object[]
-+            {split, jc, reporter, Integer.valueOf(idx)});
++public class ProxyLocalFileSystem extends FilterFileSystem {
 +
-+        // change the key if need be
-+        if (key != null) {
-+          K newKey = curReader.createKey();
-+          ((CombineHiveKey)key).setKey(newKey);
-+        }
++  protected LocalFileSystem localFs;
 +
-+        // setup some helper config variables.
-+        jc.set("map.input.file", split.getPath(idx).toString());
-+        jc.setLong("map.input.start", split.getOffset(idx));
-+        jc.setLong("map.input.length", split.getLength(idx));
-+      } catch (Exception e) {
-+        curReader=HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jc);
-+      }
-+      idx++;
-+      return true;
-+    }
++  public ProxyLocalFileSystem() {
++    localFs = new LocalFileSystem();
 +  }
 +
-+  public abstract static class CombineFileInputFormatShim<K, V> extends
-+      CombineFileInputFormat<K, V>
-+      implements HadoopShims.CombineFileInputFormatShim<K, V> {
++  public ProxyLocalFileSystem(FileSystem fs) {
++    throw new RuntimeException ("Unsupported Constructor");
++  }
 +
-+    public Path[] getInputPathsShim(JobConf conf) {
-+      try {
-+        return FileInputFormat.getInputPaths(conf);
-+      } catch (Exception e) {
-+        throw new RuntimeException(e);
-+      }
-+    }
++  @Override
++  public void initialize(URI name, Configuration conf) throws IOException {
++    // create a proxy for the local filesystem
++    // the scheme/authority serving as the proxy is derived
++    // from the supplied URI
 +
-+    @Override
-+    public void createPool(JobConf conf, PathFilter... filters) {
-+      super.createPool(conf, filters);
-+    }
++    String scheme = name.getScheme();
++    String authority = name.getAuthority() != null ? name.getAuthority() : "";
++    String proxyUriString = name + "://" + authority + "/";
++    fs = new ProxyFileSystem(localFs, URI.create(proxyUriString));
 +
-+    @Override
-+    public InputSplitShim[] getSplits(JobConf job, int numSplits) throws IOException {
-+      long minSize = job.getLong("mapred.min.split.size", 0);
++    fs.initialize(name, conf);
++  }
++}
+
+Property changes on: shims/src/common/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
+___________________________________________________________________
+Added: svn:eol-style
+   + native
+
+Index: shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java
+===================================================================
+--- shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java	(revision 0)
++++ shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java	(revision 0)
+@@ -0,0 +1,273 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +
-+      // For backward compatibility, let the above parameter be used
-+      if (job.getLong("mapred.min.split.size.per.node", 0) == 0) {
-+        super.setMinSplitSizeNode(minSize);
-+      }
++package org.apache.hadoop.fs;
 +
-+      if (job.getLong("mapred.min.split.size.per.rack", 0) == 0) {
-+        super.setMinSplitSizeRack(minSize);
-+      }
++import java.io.IOException;
++import java.net.URI;
++import java.net.URISyntaxException;
 +
-+      if (job.getLong("mapred.max.split.size", 0) == 0) {
-+        super.setMaxSplitSize(minSize);
-+      }
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.permission.FsPermission;
++import org.apache.hadoop.util.Progressable;
 +
-+      CombineFileSplit[] splits = (CombineFileSplit[]) super.getSplits(job, numSplits);
++/****************************************************************
++ * A FileSystem that can serve a given scheme/authority using some
++ * other file system. In that sense, it serves as a proxy for the
++ * real/underlying file system
++ *****************************************************************/
 +
-+      InputSplitShim[] isplits = new InputSplitShim[splits.length];
-+      for (int pos = 0; pos < splits.length; pos++) {
-+        isplits[pos] = new InputSplitShim(splits[pos]);
-+      }
++public class ProxyFileSystem extends FilterFileSystem {
 +
-+      return isplits;
-+    }
++  protected String myScheme;
++  protected String myAuthority;
++  protected URI myUri;
 +
-+    public InputSplitShim getInputSplitShim() throws IOException {
-+      return new InputSplitShim();
-+    }
++  protected String realScheme;
++  protected String realAuthority;
++  protected URI realUri;
 +
-+    public RecordReader getRecordReader(JobConf job, HadoopShims.InputSplitShim split,
-+        Reporter reporter,
-+        Class<RecordReader<K, V>> rrClass)
-+        throws IOException {
-+      CombineFileSplit cfSplit = (CombineFileSplit) split;
-+      return new CombineFileRecordReader(job, cfSplit, reporter, rrClass);
-+    }
++  
 +
++  private Path swizzleParamPath(Path p) {
++    return new Path (realScheme, realAuthority, p.toUri().getPath());
 +  }
 +
-+  public String getInputFormatClassName() {
-+    return "org.apache.hadoop.hive.ql.io.CombineHiveInputFormat";
++  private Path swizzleReturnPath(Path p) {
++    return new Path (myScheme, myAuthority, p.toUri().getPath());
 +  }
 +
-+  String[] ret = new String[2];
-+
-+  @Override
-+  public String[] getTaskJobIDs(TaskCompletionEvent t) {
-+    TaskID tid = t.getTaskAttemptId().getTaskID();
-+    ret[0] = tid.toString();
-+    ret[1] = tid.getJobID().toString();
++  private FileStatus swizzleFileStatus(FileStatus orig, boolean isParam) {
++    FileStatus ret =
++      new FileStatus(orig.getLen(), orig.isDir(), orig.getReplication(),
++                     orig.getBlockSize(), orig.getModificationTime(),
++                     orig.getAccessTime(), orig.getPermission(),
++                     orig.getOwner(), orig.getGroup(),
++                     isParam ? swizzleParamPath(orig.getPath()) :
++                     swizzleReturnPath(orig.getPath()));
 +    return ret;
 +  }
 +
-+  public void setFloatConf(Configuration conf, String varName, float val) {
-+    conf.setFloat(varName, val);
++  public ProxyFileSystem() {
++    throw new RuntimeException ("Unsupported constructor");
++  }
++  
++  public ProxyFileSystem(FileSystem fs) {
++    throw new RuntimeException ("Unsupported constructor");
 +  }
 +
-+  @Override
-+  public int createHadoopArchive(Configuration conf, Path sourceDir, Path destDir,
-+      String archiveName) throws Exception {
++  /**
++   * Create a proxy file system for fs.
++   * 
++   * @param fs FileSystem to create proxy for
++   * @param myUri URI to use as proxy. Only the scheme and authority from
++   *              this are used right now
++   */
++  public ProxyFileSystem(FileSystem fs, URI myUri) {
++    super(fs);
 +
-+    HadoopArchives har = new HadoopArchives(conf);
-+    List<String> args = new ArrayList<String>();
++    URI realUri = fs.getUri();
++    this.realScheme = realUri.getScheme();
++    this.realAuthority=realUri.getAuthority();
++    this.realUri = realUri;
 +
-+    if (conf.get("hive.archive.har.parentdir.settable") == null) {
-+      throw new RuntimeException("hive.archive.har.parentdir.settable is not set");
-+    }
-+    boolean parentSettable =
-+      conf.getBoolean("hive.archive.har.parentdir.settable", false);
++    this.myScheme = myUri.getScheme();
++    this.myAuthority=myUri.getAuthority();
++    this.myUri = myUri;
++  }
 +
-+    if (parentSettable) {
-+      args.add("-archiveName");
-+      args.add(archiveName);
-+      args.add("-p");
-+      args.add(sourceDir.toString());
-+      args.add(destDir.toString());
-+    } else {
-+      args.add("-archiveName");
-+      args.add(archiveName);
-+      args.add(sourceDir.toString());
-+      args.add(destDir.toString());
++  @Override
++  public void initialize(URI name, Configuration conf) throws IOException {
++    try {
++      URI realUri = new URI (realScheme, realAuthority,
++                            name.getPath(), name.getQuery(), name.getFragment());
++      super.initialize(realUri, conf);
++    } catch (URISyntaxException e) {
++      throw new RuntimeException(e);
 +    }
++  }
 +
-+    return ToolRunner.run(har, args.toArray(new String[0]));
++  @Override
++  public URI getUri() {
++    return myUri;
 +  }
 +
-+  public static class NullOutputCommitter extends OutputCommitter {
-+    @Override
-+    public void setupJob(JobContext jobContext) { }
-+    @Override
-+    public void cleanupJob(JobContext jobContext) { }
++  @Override
++  public String getName() {
++    return getUri().toString();
++  }
 +
-+    @Override
-+    public void setupTask(TaskAttemptContext taskContext) { }
-+    @Override
-+    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
-+      return false;
-+    }
-+    @Override
-+    public void commitTask(TaskAttemptContext taskContext) { }
-+    @Override
-+    public void abortTask(TaskAttemptContext taskContext) { }
++  @Override
++  public Path makeQualified(Path path) {
++    return swizzleReturnPath(super.makeQualified(swizzleParamPath(path)));
 +  }
 +
-+  public void setNullOutputFormat(JobConf conf) {
-+    conf.setOutputFormat(NullOutputFormat.class);
-+    conf.setOutputCommitter(Hadoop23Shims.NullOutputCommitter.class);
 +
-+    // option to bypass job setup and cleanup was introduced in hadoop-21 (MAPREDUCE-463)
-+    // but can be backported. So we disable setup/cleanup in all versions >= 0.19
-+    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
++  @Override
++  protected void checkPath(Path path) {
++    super.checkPath(swizzleParamPath(path));
++  }
 +
-+    // option to bypass task cleanup task was introduced in hadoop-23 (MAPREDUCE-2206)
-+    // but can be backported. So we disable setup/cleanup in all versions >= 0.19
-+    conf.setBoolean("mapreduce.job.committer.task.cleanup.needed", false);
++  @Override
++  public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
++    long len) throws IOException {
++    return super.getFileBlockLocations(swizzleFileStatus(file, true),
++                                       start, len);
 +  }
 +
 +  @Override
-+  public UserGroupInformation getUGIForConf(Configuration conf) throws IOException {
-+    return UserGroupInformation.getCurrentUser();
++  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
++    return super.open(swizzleParamPath(f), bufferSize);
 +  }
 +
 +  @Override
-+  public boolean isSecureShimImpl() {
-+    return true;
++  public FSDataOutputStream append(Path f, int bufferSize,
++      Progressable progress) throws IOException {
++    return super.append(swizzleParamPath(f), bufferSize, progress);
 +  }
 +
 +  @Override
-+  public String getShortUserName(UserGroupInformation ugi) {
-+    return ugi.getShortUserName();
++  public FSDataOutputStream create(Path f, FsPermission permission,
++      boolean overwrite, int bufferSize, short replication, long blockSize,
++      Progressable progress) throws IOException {
++    return super.create(swizzleParamPath(f), permission,
++        overwrite, bufferSize, replication, blockSize, progress);
 +  }
 +
 +  @Override
-+  public String getTokenStrForm(String tokenSignature) throws IOException {
-+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-+    TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector();
++  public boolean setReplication(Path src, short replication) throws IOException {
++    return super.setReplication(swizzleParamPath(src), replication);
++  }
 +
-+    Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
-+        tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens());
-+    return token != null ? token.encodeToUrlString() : null;
++  @Override
++  public boolean rename(Path src, Path dst) throws IOException {
++    return super.rename(swizzleParamPath(src), swizzleParamPath(dst));
 +  }
 +  
 +  @Override
-+  public JobTrackerState getJobTrackerState(ClusterStatus clusterStatus) throws Exception {
-+    JobTrackerState state;
-+    switch (clusterStatus.getJobTrackerStatus()) {
-+    case INITIALIZING:
-+      return JobTrackerState.INITIALIZING;
-+    case RUNNING:
-+      return JobTrackerState.RUNNING;
-+    default:
-+      String errorMsg = "Unrecognized JobTracker state: " + clusterStatus.getJobTrackerStatus();
-+      throw new Exception(errorMsg);
++  public boolean delete(Path f, boolean recursive) throws IOException {
++    return super.delete(swizzleParamPath(f), recursive);
++  }
++
++  @Override
++  public boolean deleteOnExit(Path f) throws IOException {
++    return super.deleteOnExit(swizzleParamPath(f));
++  }    
++    
++  @Override
++  public FileStatus[] listStatus(Path f) throws IOException {
++    FileStatus[] orig = super.listStatus(swizzleParamPath(f));
++    FileStatus[] ret = new FileStatus [orig.length];
++    for (int i=0; i<orig.length; i++) {
++      ret[i] = swizzleFileStatus(orig[i], false);
 +    }
++    return ret;
 +  }
 +  
 +  @Override
-+  public org.apache.hadoop.mapreduce.TaskAttemptContext newTaskAttemptContext(Configuration conf, final Progressable progressable) {
-+    return new TaskAttemptContextImpl(conf, new TaskAttemptID()) {
-+      @Override
-+      public void progress() {
-+        progressable.progress();
-+      }
-+    };
++  public Path getHomeDirectory() {
++    return swizzleReturnPath(super.getHomeDirectory());
 +  }
 +
 +  @Override
-+  public org.apache.hadoop.mapreduce.JobContext newJobContext(Job job) {
-+    return new JobContextImpl(job.getConfiguration(), job.getJobID());
++  public void setWorkingDirectory(Path newDir) {
++    super.setWorkingDirectory(swizzleParamPath(newDir));
 +  }
-+}
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
-___________________________________________________________________
-Added: svn:eol-style
-   + native
-
-Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java
-===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java	(revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java	(revision 0)
-@@ -0,0 +1,56 @@
-+/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+package org.apache.hadoop.hive.shims;
-+
-+import java.io.IOException;
-+
-+import org.mortbay.jetty.bio.SocketConnector;
-+import org.mortbay.jetty.handler.RequestLogHandler;
-+import org.mortbay.jetty.webapp.WebAppContext;
-+
-+/**
-+ * Jetty23Shims.
-+ *
-+ */
-+public class Jetty23Shims implements JettyShims {
-+  public Server startServer(String listen, int port) throws IOException {
-+    Server s = new Server();
-+    s.setupListenerHostPort(listen, port);
-+    return s;
++  
++  @Override
++  public Path getWorkingDirectory() {
++    return swizzleReturnPath(super.getWorkingDirectory());
++  }
++  
++  @Override
++  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
++    return super.mkdirs(swizzleParamPath(f), permission);
 +  }
 +
-+  private static class Server extends org.mortbay.jetty.Server implements JettyShims.Server {
-+    public void addWar(String war, String contextPath) {
-+      WebAppContext wac = new WebAppContext();
-+      wac.setContextPath(contextPath);
-+      wac.setWar(war);
-+      RequestLogHandler rlh = new RequestLogHandler();
-+      rlh.setHandler(wac);
-+      this.addHandler(rlh);
-+    }
++  @Override
++  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
++    throws IOException {
++    super.copyFromLocalFile(delSrc, swizzleParamPath(src), swizzleParamPath(dst));
++  }
 +
-+    public void setupListenerHostPort(String listen, int port)
-+        throws IOException {
++  @Override
++  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
++                                Path[] srcs, Path dst)
++    throws IOException {
++    super.copyFromLocalFile(delSrc, overwrite, srcs, swizzleParamPath(dst));
++  }
++  
++  @Override
++  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
++                                Path src, Path dst)
++    throws IOException {
++    super.copyFromLocalFile(delSrc, overwrite, src, swizzleParamPath(dst));
++  }
 +
-+      SocketConnector connector = new SocketConnector();
-+      connector.setPort(port);
-+      connector.setHost(listen);
-+      this.addConnector(connector);
-+    }
++  @Override
++  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
++    throws IOException {
++    super.copyToLocalFile(delSrc, swizzleParamPath(src), dst);
 +  }
-+}
-
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/Jetty23Shims.java
-___________________________________________________________________
-Added: svn:eol-style
-   + native
-
-Index: shims/src/0.23/java/org/apache/hadoop/hive/shims/HiveHarFileSystem.java
-===================================================================
---- shims/src/0.23/java/org/apache/hadoop/hive/shims/HiveHarFileSystem.java	(revision 0)
-+++ shims/src/0.23/java/org/apache/hadoop/hive/shims/HiveHarFileSystem.java	(revision 0)
-@@ -0,0 +1,66 @@
-+/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
 +
-+package org.apache.hadoop.hive.shims;
++  @Override
++  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
++    throws IOException {
++    return super.startLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
++  }
 +
-+import java.io.IOException;
++  @Override
++  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
++    throws IOException {
++    super.completeLocalOutput(swizzleParamPath(fsOutputFile), tmpLocalFile);
++  }
 +
-+import org.apache.hadoop.fs.BlockLocation;
-+import org.apache.hadoop.fs.ContentSummary;
-+import org.apache.hadoop.fs.FileStatus;
-+import org.apache.hadoop.fs.HarFileSystem;
-+import org.apache.hadoop.fs.Path;
++  @Override
++  public ContentSummary getContentSummary(Path f) throws IOException {
++    return super.getContentSummary(swizzleParamPath(f));
++  }
 +
-+/**
-+ * HiveHarFileSystem - fixes issues with Hadoop's HarFileSystem
-+ *
-+ */
-+public class HiveHarFileSystem extends HarFileSystem {
++  @Override
++  public FileStatus getFileStatus(Path f) throws IOException {
++    return swizzleFileStatus(super.getFileStatus(swizzleParamPath(f)), false);
++  }
 +
 +  @Override
-+  public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
-+      long len) throws IOException {
++  public FileChecksum getFileChecksum(Path f) throws IOException {
++    return super.getFileChecksum(swizzleParamPath(f));
++  }
++  
++  @Override
++  public void setOwner(Path p, String username, String groupname
++      ) throws IOException {
++    super.setOwner(swizzleParamPath(p), username, groupname);
++  }
 +
-+    // In some places (e.g. FileInputFormat) this BlockLocation is used to
-+    // figure out sizes/offsets and so a completely blank one will not work.
-+    String [] hosts = {"DUMMY_HOST"};
-+    return new BlockLocation[]{new BlockLocation(null, hosts, 0, file.getLen())};
++  @Override
++  public void setTimes(Path p, long mtime, long atime
++      ) throws IOException {
++    super.setTimes(swizzleParamPath(p), mtime, atime);
 +  }
 +
 +  @Override
-+  public ContentSummary getContentSummary(Path f) throws IOException {
-+    // HarFileSystem has a bug where this method does not work properly
-+    // if the underlying FS is HDFS. See MAPREDUCE-1877 for more
-+    // information. This method is from FileSystem.
-+    FileStatus status = getFileStatus(f);
-+    if (!status.isDir()) {
-+      // f is a file
-+      return new ContentSummary(status.getLen(), 1, 0);
-+    }
-+    // f is a directory
-+    long[] summary = {0, 0, 1};
-+    for(FileStatus s : listStatus(f)) {
-+      ContentSummary c = s.isDir() ? getContentSummary(s.getPath()) :
-+                                     new ContentSummary(s.getLen(), 1, 0);
-+      summary[0] += c.getLength();
-+      summary[1] += c.getFileCount();
-+      summary[2] += c.getDirectoryCount();
-+    }
-+    return new ContentSummary(summary[0], summary[1], summary[2]);
++  public void setPermission(Path p, FsPermission permission
++      ) throws IOException {
++    super.setPermission(swizzleParamPath(p), permission);
 +  }
 +}
++  
 
-Property changes on: shims/src/0.23/java/org/apache/hadoop/hive/shims/HiveHarFileSystem.java
+Property changes on: shims/src/common/java/org/apache/hadoop/fs/ProxyFileSystem.java

[... 398 lines stripped ...]


Mime
View raw message