Return-Path: Delivered-To: apmail-incubator-pig-commits-archive@locus.apache.org Received: (qmail 9881 invoked from network); 18 Sep 2008 20:47:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 18 Sep 2008 20:47:51 -0000 Received: (qmail 27100 invoked by uid 500); 18 Sep 2008 20:47:48 -0000 Delivered-To: apmail-incubator-pig-commits-archive@incubator.apache.org Received: (qmail 27082 invoked by uid 500); 18 Sep 2008 20:47:48 -0000 Mailing-List: contact pig-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@incubator.apache.org Delivered-To: mailing list pig-commits@incubator.apache.org Received: (qmail 27073 invoked by uid 99); 18 Sep 2008 20:47:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Sep 2008 13:47:48 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Sep 2008 20:46:57 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 75742238896B; Thu, 18 Sep 2008 13:47:00 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r696795 - in /incubator/pig/trunk: ./ lib/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/datastorage/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/ src/org/apache... Date: Thu, 18 Sep 2008 20:47:00 -0000 To: pig-commits@incubator.apache.org From: olga@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080918204700.75742238896B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: olga Date: Thu Sep 18 13:46:59 2008 New Revision: 696795 URL: http://svn.apache.org/viewvc?rev=696795&view=rev Log: PIG-253: integration with Hadoop 18 Added: incubator/pig/trunk/lib/hadoop18.jar (with props) Modified: incubator/pig/trunk/build.xml incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Modified: incubator/pig/trunk/build.xml URL: http://svn.apache.org/viewvc/incubator/pig/trunk/build.xml?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/build.xml (original) +++ incubator/pig/trunk/build.xml Thu Sep 18 13:46:59 2008 @@ -58,7 +58,7 @@ - + @@ -85,7 +85,7 @@ - + Added: incubator/pig/trunk/lib/hadoop18.jar URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib/hadoop18.jar?rev=696795&view=auto ============================================================================== Binary file - no diff available. Propchange: incubator/pig/trunk/lib/hadoop18.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Modified: incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java (original) +++ incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java Thu Sep 18 13:46:59 2008 @@ -26,7 +26,7 @@ public abstract class ComparisonFunc extends WritableComparator { public ComparisonFunc() { - super(Tuple.class); + super(Tuple.class, true); } public int compare(WritableComparable a, WritableComparable b) { Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java Thu Sep 18 13:46:59 2008 @@ -75,6 +75,6 @@ public SeekableInputStream sopen() throws IOException { return new HSeekableInputStream(fs.getHFS().open(path), - fs.getHFS().getContentLength(path)); + fs.getHFS(). getContentSummary(path).getLength()); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Sep 18 13:46:59 2008 @@ -45,7 +45,6 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobSubmissionProtocol; import org.apache.hadoop.mapred.JobTracker; import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.executionengine.ExecException; @@ -80,7 +79,6 @@ protected DataStorage ds; - protected JobSubmissionProtocol jobTracker; protected JobClient jobClient; // key: the operator key from the logical plan that originated the physical plan @@ -101,7 +99,6 @@ this.ds = null; // to be set in the init method - this.jobTracker = null; this.jobClient = null; } @@ -185,16 +182,6 @@ if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){ log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION)); - if (!LOCAL.equalsIgnoreCase(cluster)) { - try { - jobTracker = (JobSubmissionProtocol) RPC.getProxy( - JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, JobTracker - .getAddress(configuration), configuration); - } catch (IOException e) { - throw new ExecException("Failed to crate job tracker", e); - } - } } try { Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Thu Sep 18 13:46:59 2008 @@ -175,6 +175,9 @@ } if (pom.toCombine != null) { conf.set("pig.combineFunc", ObjectSerializer.serialize(pom.toCombine)); + // this is to make sure that combiner is only called once + // since we can't handle no combine or multiple combines + conf.setCombineOnceOnly(true); } if (pom.groupFuncs != null) { conf.set("pig.groupFuncs", ObjectSerializer.serialize(pom.groupFuncs)); Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Thu Sep 18 13:46:59 2008 @@ -70,7 +70,10 @@ } } - index = PigInputFormat.getActiveSplit().getIndex(); + if (PigInputFormat.getActiveSplit() == null) { + } else { + index = PigInputFormat.getActiveSplit().getIndex(); + } Datum groupName = key.getField(0); finalout.group = key; Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java Thu Sep 18 13:46:59 2008 @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -90,11 +91,12 @@ Set locations = new HashSet(); for (String loc : wrapped.getLocations()) { Path path = new Path(loc); - String hints[][] = fs.getFileCacheHints(path, 0, fs.getFileStatus( - path).getLen()); - for (int i = 0; i < hints.length; i++) { - for (int j = 0; j < hints[i].length; j++) { - locations.add(hints[i][j]); + BlockLocation[] blocks = fs.getFileBlockLocations(path, 0, fs.getFileStatus( + path).getLen()); + for (int i = 0; i < blocks.length; i++) { + String[] hosts = blocks[i].getHosts(); + for (int j = 0; j < hosts.length; j++){ + locations.add(hosts[j]); } } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=696795&r1=696794&r2=696795&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Thu Sep 18 13:46:59 2008 @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce; import org.apache.pig.impl.eval.collector.DataCollector; @@ -169,10 +170,7 @@ */ private boolean writeErrorToHDFS(int limit, String taskId) { if (command.getPersistStderr()) { - // These are hard-coded begin/end offsets a Hadoop *taskid* - int beginIndex = 25, endIndex = 31; - - int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex)); + int tipId = TaskAttemptID.forName(taskId).getTaskID().getId(); return tipId < command.getLogFilesLimit(); } return false; @@ -249,4 +247,4 @@ } } - \ No newline at end of file +