Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 82095 invoked from network); 26 Apr 2007 20:59:37 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 26 Apr 2007 20:59:37 -0000 Received: (qmail 73158 invoked by uid 500); 26 Apr 2007 20:59:43 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 73143 invoked by uid 500); 26 Apr 2007 20:59:43 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 73120 invoked by uid 99); 26 Apr 2007 20:59:43 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Apr 2007 13:59:43 -0700 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Apr 2007 13:59:35 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 015D01A9838; Thu, 26 Apr 2007 13:59:14 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r532859 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/mapred/lib/ src/test/org/apache/hadoop/mapred/ Date: Thu, 26 Apr 2007 20:59:14 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070426205915.015D01A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Thu Apr 26 13:59:13 2007 New Revision: 532859 URL: http://svn.apache.org/viewvc?view=rev&rev=532859 Log: HADOOP-1284. In contrib/streaming, permit flexible specification of field delimiter and fields for partitioning and sorting. Contributed by Runping. Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/ValueCountReduce.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestFieldSelection.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=532859&r1=532858&r2=532859 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Apr 26 13:59:13 2007 @@ -263,6 +263,10 @@ completed jobs by order of completion, not submission. (Arun C Murthy via cutting) +79. HADOOP-1284. In contrib/streaming, permit flexible specification + of field delimiter and fields for partitioning and sorting. + (Runping Qi via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=532859&r1=532858&r2=532859 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Thu Apr 26 13:59:13 2007 @@ -64,6 +64,10 @@ */ abstract String getKeyColPropName(); + abstract char getFieldSeparator(); + + abstract int getNumOfKeyFields(); + /** Write output as side-effect files rather than as map outputs. This is useful to do "Map" tasks rather than "MapReduce" tasks. */ boolean getUseSideEffect() { @@ -208,7 +212,13 @@ } else { sideFs_ = fs_; } - + String mapOutputFieldSeparator = job_.get("stream.map.output.field.separator", "\t"); + String reduceOutputFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t"); + this.mapOutputFieldSeparator = mapOutputFieldSeparator.charAt(0); + this.reduceOutFieldSeparator = reduceOutputFieldSeparator.charAt(0); + this.numOfMapOutputKeyFields = job_.getInt("stream.num.map.output.key.fields", 1); + this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1); + if (debug_) { System.out.println("kind :" + this.getClass()); System.out.println("split :" + StreamUtil.getCurrentSplit(job_)); @@ -466,8 +476,12 @@ void splitKeyVal(byte[] line, Text key, Text val) throws IOException { int pos = -1; if (keyCols_ != ALL_COLS) { - pos = UTF8ByteArrayUtils.findTab(line); + pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields()); } + LOG.info("FieldSeparator: " + this.getFieldSeparator()); + LOG.info("NumOfKeyFields: " + this.getNumOfKeyFields()); + LOG.info("Line: " + new String (line)); + LOG.info("Pos: " + pos); try { if (pos == -1) { key.set(line); @@ -730,4 +744,10 @@ String LOGNAME; PrintStream log_; + protected char mapOutputFieldSeparator = '\t'; + protected char reduceOutFieldSeparator = '\t'; + protected int numOfMapOutputKeyFields = 1; + protected int numOfMapOutputPartitionFields = 1; + protected int numOfReduceOutputKeyFields = 1; + } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?view=diff&rev=532859&r1=532858&r2=532859 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Thu Apr 26 13:59:13 2007 @@ -116,4 +116,14 @@ mapRedFinished(); } + @Override + char getFieldSeparator() { + return super.mapOutputFieldSeparator; + } + + @Override + int getNumOfKeyFields() { + return super.numOfMapOutputKeyFields; + } + } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?view=diff&rev=532859&r1=532858&r2=532859 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Thu Apr 26 13:59:13 2007 @@ -102,4 +102,14 @@ mapRedFinished(); } + @Override + char getFieldSeparator() { + return super.reduceOutFieldSeparator; + } + + @Override + int getNumOfKeyFields() { + return super.numOfReduceOutputKeyFields; + } + } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?view=diff&rev=532859&r1=532858&r2=532859 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java Thu Apr 26 13:59:13 2007 @@ -18,11 +18,14 @@ package org.apache.hadoop.streaming; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.PushbackInputStream; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.LineRecordReader; /** * General utils for byte array containing UTF-8 encoded strings @@ -45,6 +48,56 @@ } return -1; } + + /** + * Find the first occurrence of the given byte b in a UTF-8 encoded string + * @param utf a byte array containing a UTF-8 encoded string + * @param start starting offset + * @param end ending position + * @param b the byte to find + * @return position that first byte occures otherwise -1 + */ + public static int findByte(byte [] utf, int start, int end, byte b) { + for(int i=start; i fieldList) { + int allFieldsFrom = -1; + int i = 0; + int j = 0; + int pos = -1; + String fieldSpec = null; + for (i = 0; i < fieldListSpec.length; i++) { + fieldSpec = fieldListSpec[i]; + if (fieldSpec.length() == 0) { + continue; + } + pos = fieldSpec.indexOf('-'); + if (pos < 0) { + Integer fn = new Integer(fieldSpec); + fieldList.add(fn); + } else { + String start = fieldSpec.substring(0, pos); + String end = fieldSpec.substring(pos + 1); + if (start.length() == 0) { + start = "0"; + } + if (end.length() == 0) { + allFieldsFrom = Integer.parseInt(start); + continue; + } + int startPos = Integer.parseInt(start); + int endPos = Integer.parseInt(end); + for (j = startPos; j <= endPos; j++) { + fieldList.add(new Integer(j)); + } + } + } + return allFieldsFrom; + } + + private void parseOutputKeyValueSpec() { + String[] mapKeyValSpecs = mapOutputKeyValueSpec.split(":", -1); + String[] mapKeySpec = mapKeyValSpecs[0].split(","); + String[] mapValSpec = new String[0]; + if (mapKeyValSpecs.length > 1) { + mapValSpec = mapKeyValSpecs[1].split(","); + } + + int i = 0; + ArrayList fieldList = new ArrayList(); + extractFields(mapKeySpec, fieldList); + this.mapOutputKeyFieldList = new int[fieldList.size()]; + for (i = 0; i < fieldList.size(); i++) { + this.mapOutputKeyFieldList[i] = fieldList.get(i).intValue(); + } + + fieldList = new ArrayList(); + allMapValueFieldsFrom = extractFields(mapValSpec, fieldList); + this.mapOutputValueFieldList = new int[fieldList.size()]; + for (i = 0; i < fieldList.size(); i++) { + this.mapOutputValueFieldList[i] = fieldList.get(i).intValue(); + } + + String[] reduceKeyValSpecs = reduceOutputKeyValueSpec.split(":", -1); + String[] reduceKeySpec = reduceKeyValSpecs[0].split(","); + String[] reduceValSpec = new String[0]; + if (reduceKeyValSpecs.length > 1) { + reduceValSpec = reduceKeyValSpecs[1].split(","); + } + + fieldList = new ArrayList(); + extractFields(reduceKeySpec, fieldList); + this.reduceOutputKeyFieldList = new int[fieldList.size()]; + for (i = 0; i < fieldList.size(); i++) { + this.reduceOutputKeyFieldList[i] = fieldList.get(i).intValue(); + } + + fieldList = new ArrayList(); + allReduceValueFieldsFrom = extractFields(reduceValSpec, fieldList); + this.reduceOutputValueFieldList = new int[fieldList.size()]; + for (i = 0; i < fieldList.size(); i++) { + this.reduceOutputValueFieldList[i] = fieldList.get(i).intValue(); + } + } + + public void configure(JobConf job) { + this.fieldSeparator = job.get("mapred.data.field.separator", "\t"); + this.mapOutputKeyValueSpec = job.get("map.output.key.value.fields.spec", + "0-:"); + this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals( + job.getInputFormat().getClass().getCanonicalName()); + this.reduceOutputKeyValueSpec = job.get( + "reduce.output.key.value.fields.spec", "0-:"); + parseOutputKeyValueSpec(); + LOG.info(specToString()); + } + + public void close() throws IOException { + // TODO Auto-generated method stub + + } + + private static String selectFields(String[] fields, int[] fieldList, + int allFieldsFrom, String separator) { + String retv = null; + int i = 0; + StringBuffer sb = null; + if (fieldList != null && fieldList.length > 0) { + if (sb == null) { + sb = new StringBuffer(); + } + for (i = 0; i < fieldList.length; i++) { + if (fieldList[i] < fields.length) { + sb.append(fields[fieldList[i]]); + } + sb.append(separator); + } + } + if (allFieldsFrom >= 0) { + if (sb == null) { + sb = new StringBuffer(); + } + for (i = allFieldsFrom; i < fields.length; i++) { + sb.append(fields[i]).append(separator); + } + } + if (sb != null) { + retv = sb.toString(); + if (retv.length() > 0) { + retv = retv.substring(0, retv.length() - 1); + } + } + return retv; + } + + public void reduce(WritableComparable key, Iterator values, + OutputCollector output, Reporter reporter) throws IOException { + + String keyStr = key.toString() + this.fieldSeparator; + while (values.hasNext()) { + String valStr = values.next().toString(); + valStr = keyStr + valStr; + String[] fields = valStr.split(this.fieldSeparator); + String newKey = selectFields(fields, reduceOutputKeyFieldList, -1, + fieldSeparator); + String newVal = selectFields(fields, reduceOutputValueFieldList, + allReduceValueFieldsFrom, fieldSeparator); + Text newTextKey = null; + if (newKey != null) { + newTextKey = new Text(newKey); + } + Text newTextVal = null; + if (newVal != null) { + newTextVal = new Text(newVal); + } + output.collect(newTextKey, newTextVal); + } + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java?view=auto&rev=532859 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java Thu Apr 26 13:59:13 2007 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.lib; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; + +public class KeyFieldBasedPartitioner implements Partitioner { + + private int numOfPartitionFields; + + private String keyFieldSeparator; + + public void configure(JobConf job) { + this.keyFieldSeparator = job.get("map.output.key.field.separator", "\t"); + this.numOfPartitionFields = job.getInt("num.key.fields.for.partition", 0); + } + + /** Use {@link Object#hashCode()} to partition. */ + public int getPartition(WritableComparable key, Writable value, + int numReduceTasks) { + String partitionKeyStr = key.toString(); + String[] fields = partitionKeyStr.split(this.keyFieldSeparator); + if (this.numOfPartitionFields > 0 + && this.numOfPartitionFields < fields.length) { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < this.numOfPartitionFields; i++) { + sb.append(fields[i]).append(this.keyFieldSeparator); + } + partitionKeyStr = sb.toString(); + if (partitionKeyStr.length() > 0) { + partitionKeyStr = partitionKeyStr.substring(0, + partitionKeyStr.length() - 1); + } + } + return (partitionKeyStr.hashCode() & Integer.MAX_VALUE) % numReduceTasks; + } +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestFieldSelection.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestFieldSelection.java?view=auto&rev=532859 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestFieldSelection.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestFieldSelection.java Thu Apr 26 13:59:13 2007 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.lib.*; +import junit.framework.TestCase; +import java.io.*; +import java.util.*; +import java.text.NumberFormat; + +public class TestFieldSelection extends TestCase { + +private static NumberFormat idFormat = NumberFormat.getInstance(); + static { + idFormat.setMinimumIntegerDigits(4); + idFormat.setGroupingUsed(false); + } + + public void testFieldSelection() throws Exception { + launch(); + } + + public static void launch() throws Exception { + JobConf conf = new JobConf(TestFieldSelection.class); + FileSystem fs = FileSystem.get(conf); + int numOfInputLines = 10; + + Path OUTPUT_DIR = new Path("build/test/output_for_field_selection_test"); + Path INPUT_DIR = new Path("build/test/input_for_field_selection_test"); + String inputFile = "input.txt"; + fs.delete(INPUT_DIR); + fs.mkdirs(INPUT_DIR); + fs.delete(OUTPUT_DIR); + + StringBuffer inputData = new StringBuffer(); + StringBuffer expectedOutput = new StringBuffer(); + + FSDataOutputStream fileOut = fs.create(new Path(INPUT_DIR, inputFile)); + for (int i = 0; i < numOfInputLines; i++) { + inputData.append(idFormat.format(i)); + inputData.append("-").append(idFormat.format(i+1)); + inputData.append("-").append(idFormat.format(i+2)); + inputData.append("-").append(idFormat.format(i+3)); + inputData.append("-").append(idFormat.format(i+4)); + inputData.append("-").append(idFormat.format(i+5)); + inputData.append("-").append(idFormat.format(i+6)); + inputData.append("\n"); + + + expectedOutput.append(idFormat.format(i+3)); + expectedOutput.append("-" ).append (idFormat.format(i+2)); + expectedOutput.append("-" ).append (idFormat.format(i+1)); + expectedOutput.append("-" ).append (idFormat.format(i+5)); + expectedOutput.append("-" ).append (idFormat.format(i+6)); + + expectedOutput.append("-" ).append (idFormat.format(i+6)); + expectedOutput.append("-" ).append (idFormat.format(i+5)); + expectedOutput.append("-" ).append (idFormat.format(i+1)); + expectedOutput.append("-" ).append (idFormat.format(i+2)); + expectedOutput.append("-" ).append (idFormat.format(i+3)); + + expectedOutput.append("-" ).append (idFormat.format(i+0)); + expectedOutput.append("-" ).append (idFormat.format(i+1)); + expectedOutput.append("-" ).append (idFormat.format(i+2)); + expectedOutput.append("-" ).append (idFormat.format(i+3)); + expectedOutput.append("-" ).append (idFormat.format(i+4)); + expectedOutput.append("-" ).append (idFormat.format(i+5)); + expectedOutput.append("-" ).append (idFormat.format(i+6)); + expectedOutput.append("\n"); + } + fileOut.write(inputData.toString().getBytes("utf-8")); + fileOut.close(); + + System.out.println("inputData:"); + System.out.println(inputData.toString()); + JobConf job = new JobConf(conf, TestFieldSelection.class); + job.setInputPath(INPUT_DIR); + job.setInputFormat(TextInputFormat.class); + job.setMapperClass(FieldSelectionMapReduce.class); + job.setReducerClass(FieldSelectionMapReduce.class); + + job.setOutputPath(OUTPUT_DIR); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setOutputFormat(TextOutputFormat.class); + job.setNumReduceTasks(1); + + job.set("mapred.data.field.separator", "-"); + job.set("map.output.key.value.fields.spec", "6,5,1-3:0-"); + job.set("reduce.output.key.value.fields.spec", ":4,3,2,1,0,0-"); + + JobClient.runJob(job); + + // + // Finally, we compare the reconstructed answer key with the + // original one. Remember, we need to ignore zero-count items + // in the original key. + // + boolean success = true; + Path outPath = new Path(OUTPUT_DIR, "part-00000"); + String outdata = TestMiniMRWithDFS.readOutput(outPath,job); + + assertEquals(expectedOutput.toString(),outdata); + fs.delete(OUTPUT_DIR); + fs.delete(INPUT_DIR); + } + + /** + * Launches all the tasks in order. + */ + public static void main(String[] argv) throws Exception { + launch(); + } +}