Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8E70F182C2 for ; Thu, 18 Feb 2016 09:57:56 +0000 (UTC) Received: (qmail 67693 invoked by uid 500); 18 Feb 2016 09:57:56 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 67533 invoked by uid 500); 18 Feb 2016 09:57:56 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 67298 invoked by uid 99); 18 Feb 2016 09:57:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Feb 2016 09:57:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C3BE0E0577; Thu, 18 Feb 2016 09:57:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sree@apache.org To: commits@tez.apache.org Date: Thu, 18 Feb 2016 09:58:03 -0000 Message-Id: In-Reply-To: <56a3befb33c349f7bf43e58a8338f007@git.apache.org> References: <56a3befb33c349f7bf43e58a8338f007@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/24] tez git commit: TEZ-2974. Tez tools: TFileRecordReader in tez-tools should support reading >2 GB tfiles (rbalamohan) TEZ-2974. Tez tools: TFileRecordReader in tez-tools should support reading >2 GB tfiles (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/72f56163 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/72f56163 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/72f56163 Branch: refs/heads/TEZ-2980 Commit: 72f561639f828f2d8a815d52460e44fe2ea56d3a Parents: 870972d Author: Rajesh Balamohan Authored: Mon Feb 1 16:44:11 2016 -0800 Committer: Rajesh Balamohan Committed: Mon Feb 1 16:44:11 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/tools/TFileLoader.java | 91 ++++++-------------- .../org/apache/tez/tools/TFileRecordReader.java | 68 +++++++++++---- 3 files changed, 80 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/72f56163/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a550015..6bff146 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2974. Tez tools: TFileRecordReader in tez-tools should support reading >2 GB tfiles. TEZ-3081. Update tez website for trademarks feedback. TEZ-3076. Reduce merge memory overhead to support large number of in-memory mapoutputs TEZ-3079. Fix tez-tfile parser documentation. http://git-wip-us.apache.org/repos/asf/tez/blob/72f56163/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java ---------------------------------------------------------------------- diff --git a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java index 7fbcbf6..18e9940 100644 --- a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java +++ b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileLoader.java @@ -19,16 +19,11 @@ package org.apache.tez.tools; import com.google.common.base.Objects; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.pig.Expression; @@ -36,17 +31,15 @@ import org.apache.pig.FileInputLoadFunc; import org.apache.pig.LoadMetadata; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceStatistics; -import org.apache.pig.StoreFunc; -import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.IOException; -import java.io.StringReader; import java.util.regex.Pattern; /** @@ -57,74 +50,46 @@ public class TFileLoader extends FileInputLoadFunc implements LoadMetadata { private static final Logger LOG = LoggerFactory.getLogger(TFileLoader.class); - private TFileRecordReader recReader = null; + protected TFileRecordReader recReader = null; - private BufferedReader bufReader; private Text currentKey; private final TupleFactory tupleFactory = TupleFactory.getInstance(); private final Pattern PATTERN = Pattern.compile(":"); - /** - * We get one complete TFile per KV read. - * Add a BufferedReader so that we can scan a line at a time. - * - * @throws java.io.IOException - * @throws InterruptedException - */ - //TODO: tasks can sometime throw OOM when single TFile is way too large. Adjust mem accordinly. - private void setupReader() throws IOException, InterruptedException { - if (recReader.nextKeyValue() && bufReader == null) { - currentKey = recReader.getCurrentKey(); - Text val = recReader.getCurrentValue(); - bufReader = new BufferedReader(new StringReader(val.toString())); - } - } - @Override public Tuple getNext() throws IOException { try { - String line = readLine(); - if (line != null) { - //machine, key, line - Tuple tuple = tupleFactory.newTuple(3); - if (currentKey != null) { - String[] data = PATTERN.split(currentKey.toString()); - if (data == null || data.length != 2) { - LOG.warn("unable to parse " + currentKey.toString()); - return null; - } - tuple.set(0, data[0]); - tuple.set(1, data[1]); - } else { - tuple.set(0, ""); - tuple.set(1, ""); + if (!recReader.nextKeyValue()) { + return null; + } + + currentKey = recReader.getCurrentKey(); + String line = recReader.getCurrentValue().toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("currentKey: " + currentKey + + ", line=" + line); + } + //Tuple would be of format: machine, key, line + Tuple tuple = tupleFactory.newTuple(3); + if (currentKey != null) { + String[] data = PATTERN.split(currentKey.toString()); + if (data == null || data.length != 2) { + LOG.warn("unable to parse " + currentKey.toString()); + return null; } - tuple.set(2, line); //line - return tuple; + tuple.set(0, data[0]); + tuple.set(1, data[1]); + } else { + tuple.set(0, ""); + tuple.set(1, ""); } - } catch (IOException e) { - return null; + //set the line field + tuple.set(2, line); + return tuple; } catch (InterruptedException e) { return null; } - return null; - } - - private String readLine() throws IOException, InterruptedException { - String line = null; - if (bufReader == null) { - setupReader(); - } - line = bufReader.readLine(); - if (line == null) { //end of stream. Move to the next reader - bufReader = null; - setupReader(); - if (bufReader != null) { - line = bufReader.readLine(); - } - } - return line; } public static class TFileInputFormat extends http://git-wip-us.apache.org/repos/asf/tez/blob/72f56163/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java ---------------------------------------------------------------------- diff --git a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java index 70c0ee1..4d6c0f2 100644 --- a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java +++ b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java @@ -18,6 +18,7 @@ package org.apache.tez.tools; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,10 +33,14 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import java.io.BufferedReader; +import java.io.EOFException; import java.io.IOException; +import java.io.InputStreamReader; /** - * Simple record reader which reads the TFile and emits it as key, value pair + * Simple record reader which reads the TFile and emits it as key, value pair. + * If value has multiple lines, read one line at a time. */ public class TFileRecordReader extends RecordReader { @@ -43,17 +48,22 @@ public class TFileRecordReader extends RecordReader { private long start, end; - private Path splitPath; + @VisibleForTesting + protected Path splitPath; private FSDataInputStream fin; - private TFile.Reader reader; - private TFile.Reader.Scanner scanner; + + @VisibleForTesting + protected TFile.Reader reader; + @VisibleForTesting + protected TFile.Reader.Scanner scanner; private Text key = new Text(); private Text value = new Text(); - private BytesWritable valueBytesWritable = new BytesWritable(); private BytesWritable keyBytesWritable = new BytesWritable(); + private BufferedReader currentValueReader; + @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) split; @@ -69,22 +79,46 @@ public class TFileRecordReader extends RecordReader { scanner = reader.createScannerByByteRange(start, fileSplit.getLength()); } + private void populateKV(TFile.Reader.Scanner.Entry entry) throws IOException { + entry.getKey(keyBytesWritable); + //splitpath contains the machine name. Create the key as splitPath + realKey + String keyStr = new StringBuilder() + .append(splitPath.getName()).append(":") + .append(new String(keyBytesWritable.getBytes())) + .toString(); + + /** + * In certain cases, values can be huge (files > 2 GB). Stream is + * better to handle such scenarios. + */ + currentValueReader = new BufferedReader( + new InputStreamReader(entry.getValueStream())); + key.set(keyStr); + String line = currentValueReader.readLine(); + value.set((line == null) ? "" : line); + } + @Override public boolean nextKeyValue() throws IOException, InterruptedException { - valueBytesWritable.setSize(0); - if (!scanner.advance()) { + if (currentValueReader != null) { + //Still at the old entry reading line by line + String line = currentValueReader.readLine(); + if (line != null) { + value.set(line); + return true; + } else { + //Read through all lines in the large value stream. Move to next KV. + scanner.advance(); + } + } + + try { + populateKV(scanner.entry()); + return true; + } catch(EOFException eofException) { + key = null; value = null; return false; } - TFile.Reader.Scanner.Entry entry = scanner.entry(); - //populate key, value - entry.getKey(keyBytesWritable); - StringBuilder k = new StringBuilder(); - //split path contains the machine name. Create the key as splitPath + realKey - k.append(splitPath.getName()).append(":").append(new String(keyBytesWritable.getBytes())); - key.set(k.toString()); - entry.getValue(valueBytesWritable); - value.set(valueBytesWritable.getBytes()); - return true; } @Override public Text getCurrentKey() throws IOException, InterruptedException {