Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 59827 invoked from network); 4 Mar 2011 04:17:28 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 04:17:28 -0000 Received: (qmail 57048 invoked by uid 500); 4 Mar 2011 04:17:28 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 56727 invoked by uid 500); 4 Mar 2011 04:17:27 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 56707 invoked by uid 99); 4 Mar 2011 04:17:27 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:17:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:17:17 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id AEBE32388C64; Fri, 4 Mar 2011 04:16:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077456 [10/10] - in /hadoop/common/branches/branch-0.20-security-patches/src: c++/pipes/ c++/pipes/impl/ c++/utils/ c++/utils/m4/ examples/pipes/ mapred/org/apache/hadoop/mapred/pipes/ Date: Fri, 04 Mar 2011 04:16:54 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304041655.AEBE32388C64@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java?rev=1077456&r1=1077455&r2=1077456&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/Application.java Fri Mar 4 04:16:53 2011 @@ -26,11 +26,18 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; + +import javax.crypto.SecretKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -41,6 +48,11 @@ import org.apache.hadoop.mapred.RecordRe import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskLog; +import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -82,6 +94,18 @@ class Application jobToken = TokenCache.getJobToken(conf + .getCredentials()); + // This password is used as shared secret key between this application and + // child pipes process + byte[] password = jobToken.getPassword(); + String localPasswordFile = conf.getJobLocalDir() + Path.SEPARATOR + + "jobTokenPassword"; + writePasswordToLocalFile(localPasswordFile, password, conf); + env.put("hadoop.pipes.shared.secret.location", localPasswordFile); + List cmd = new ArrayList(); String interpretor = conf.get("hadoop.pipes.executable.interpretor"); if (interpretor != null) { @@ -107,17 +131,52 @@ class Application(output, reporter, recordReader); + + String challenge = getSecurityChallenge(); + String digestToSend = createDigest(password, challenge); + String digestExpected = createDigest(password, digestToSend); + + handler = new OutputHandler(output, reporter, recordReader, + digestExpected); K2 outputKey = (K2) ReflectionUtils.newInstance(outputKeyClass, conf); V2 outputValue = (V2) ReflectionUtils.newInstance(outputValueClass, conf); downlink = new BinaryProtocol(clientSocket, handler, outputKey, outputValue, conf); + + downlink.authenticate(digestToSend, challenge); + waitForAuthentication(); + LOG.debug("Authentication succeeded"); downlink.start(); downlink.setJobConf(conf); } + private String getSecurityChallenge() { + Random rand = new Random(System.currentTimeMillis()); + //Use 4 random integers so as to have 16 random bytes. + StringBuilder strBuilder = new StringBuilder(); + strBuilder.append(rand.nextInt(0x7fffffff)); + strBuilder.append(rand.nextInt(0x7fffffff)); + strBuilder.append(rand.nextInt(0x7fffffff)); + strBuilder.append(rand.nextInt(0x7fffffff)); + return strBuilder.toString(); + } + + private void writePasswordToLocalFile(String localPasswordFile, + byte[] password, JobConf conf) throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Path localPath = new Path(localPasswordFile); + if (localFs.isFile(localPath)) { + LOG.debug("Password file is already created by previous path"); + return; + } + FSDataOutputStream out = FileSystem.create(localFs, localPath, + new FsPermission("400")); + out.write(password); + out.close(); + } + /** * Get the downward protocol object that can send commands down to the * application. @@ -126,7 +185,19 @@ class Application getDownlink() { return downlink; } - + + /** + * Wait for authentication response. + * @throws IOException + * @throws InterruptedException + */ + void waitForAuthentication() throws IOException, + InterruptedException { + downlink.flush(); + LOG.debug("Waiting for authentication response"); + handler.waitForAuthentication(); + } + /** * Wait for the application to finish * @return did the application finish correctly? @@ -190,5 +261,11 @@ class Application handler; private K2 key; private V2 value; + private boolean authPending = true; public UplinkReaderThread(InputStream stream, UpwardProtocol handler, @@ -113,7 +120,14 @@ class BinaryProtocol { /** + * request authentication + * @throws IOException + */ + void authenticate(String digest, String challenge) throws IOException; + + /** * Start communication * @throws IOException */ Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java?rev=1077456&r1=1077455&r2=1077456&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java Fri Mar 4 04:16:53 2011 @@ -44,21 +44,26 @@ class OutputHandler collector; private float progressValue = 0.0f; private boolean done = false; + private Throwable exception = null; RecordReader recordReader = null; private Map registeredCounters = new HashMap(); + private String expectedDigest = null; + private boolean digestReceived = false; /** * Create a handler that will handle any records output from the application. * @param collector the "real" collector that takes the output * @param reporter the reporter for reporting progress */ public OutputHandler(OutputCollector collector, Reporter reporter, - RecordReader recordReader) { + RecordReader recordReader, + String expectedDigest) { this.reporter = reporter; this.collector = collector; this.recordReader = recordReader; + this.expectedDigest = expectedDigest; } /** @@ -155,5 +160,32 @@ class OutputHandler