hadoop-mapreduce-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Douglas (JIRA)" <j...@apache.org>
Subject [jira] Commented: (MAPREDUCE-326) The lowest level map-reduce APIs should be byte oriented
Date Tue, 16 Feb 2010 19:29:30 GMT

    [ https://issues.apache.org/jira/browse/MAPREDUCE-326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12834398#action_12834398

Chris Douglas commented on MAPREDUCE-326:

bq. Tom> The proposal does not introduce extra buffer copies. The example in section 3.3
is really just a reorganization of the current code, since the serializers are constructed
with DataOutputStream objects - just as they are now. The point is that the serialization
is moved out of the kernel. There is no extra copying going on.

Let's be more explicit. This is the code in question:
public void write(K key, V value) throws IOException, InterruptedException {
  int partition = partitioner.getPartition(key, value, partitions);
  rawMapOutputCollector.collect(keyBuffer, valueBuffer, partition);
What is backing {{keySerializer}} and {{valSerializer}}? What are {{keyBuffer}} and {{valueBuffer}}
if not intermediate structures separate from the collection buffer? If backed by the collection
buffer as in the current code, then the call to collect is redundant, because the bytes have
already been written; {{collect}} would merely note that the range of bytes just written belongs
to that partition, which is just confused. And each serializer is backed by the internal {{BlockingBuffer}}
that manages the writes into the shared collection buffer; it subclasses {{DataOutputStream}}
because it used to require {{DataOutput}} because of the {{Writable}} contract, not because
it has an intermediate {{DataOutputStream}} for the serializers.

What the proposed section says is that {{keySerializer}} and {{valSerializer}} write into
separate {{DataOutputBuffer}} instances ({{keyBuffer}} and {{valueBuffer}}) then the serialized
bytes are copied from the byte arrays backing those structures into the collection buffer.
There is absolutely an extra copy in this scheme. It would not be reasonable to read it otherwise.

bq. Tom> At least, this was true until MAPREDUCE-64 was committed (around the time I was
writing the proposal). Now it uses a different buffer object (BlockingBuffer, which is a DataOutputStream),
so really DataOutputBuffer should be replaced by DataOutputStream in the write() method signature.

It has worked this way since HADOOP-2919 in 0.17... and again: if both {{keySerializer}} and
{{valSerializer}} are backed by the collection buffer, then what is {{collect}} doing? What
are {{keyBuffer}} and {{valueBuffer}}?

bq. Tom> With a binary API it would be possible to bypass the BytesWritable and write directly
into the MapOutputBuffer.

You're confusing buffering with unnecessary copying. The transport between streaming/pipes
and the collection buffer could admissibly write directly from its source into the collection
buffer without an intermediate {{BytesWritable}} iff no backtracking is required. One can
only write directly into the buffer if the framework guarantees that it will not flush the
stream, as Doug's {{Channel}} API acknowledges by requiring that the bytes be retained in
memory while being broken into records.

bq. Tom> Keys and values sent to MapOutputBuffer.collect() ultimately find their way to
IFile.append() (in MapOutputBuffer's sortAndSpill() method). I'm just remarking that these
become binary level interfaces, so there are no generic types in these interfaces in this

There should be no type overhead in the intermediate data pipeline, unless one is using a
combiner or has not defined a {{RawComparator}}. Even there, since context-specific serialization
has been admitted as an admissible use case, I do not think one is forced to make a copy of
backing storage to run the combiner. If the generic types are {{ByteBuffer}} or {{Object}},
I'm not sure what overhead is incurred; certainly there is no safety.


bq. Doug> Perhaps this might instead look something [like|https://issues.apache.org/jira/browse/MAPREDUCE-326?focusedCommentId=12833907&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12833907]:

So the goal is to read directly into the collection buffer? Note that this could be constraining
for future framework dev: the current design is free to relocate the offset of its next record,
but a design that lends backing storage to user code may need to avoid and track segments
of unreleased data not yet in records. This may introduce internal fragmentation into the
intermediate buffering and- if frameworks are not careful- may block more often than necessary
by preventing the framework from spilling when it prefers. None of these are fatal, but it's
worth noting that any proposal on these lines cedes a considerable amount of control to frameworks,
complicates the implementation if it retains any intelligence, and denies MapReduce some opportunities
to make optimizations that benefit all.

The API you suggest looks great for the identity map (many jobs do use it) or for a framework
that does its own buffering of emitted records, keeping its own set of offsets as the output
collector does. Is this how Pig and Hive work?

bq. Doug> The goal is to permit the kernel to identify record boundaries (so that it can
compare, sort and transmit records) while at the same time minimize per-record data copying.
Getting this API right without benchmarking might prove difficult. We should benchmark this
under various scenarios: A key/value pair of Writable instances, line-based data from a text
file, and length-delimited, raw binary data.

+1 Winning a benchmark- by a significant margin- should be a prerequisite to committing these

bq. Doug> Can you please elaborate? I don't see the words "pipes" or "streaming" mentioned
in that issue. How does one load Python, Ruby, C++, etc. into Java? MAPREDUCE-1183 seems to
me just to be a different way to encapsulate configuration data, grouping it per extension
point rather than centralizing it in the job config.

The discussion in MAPREDUCE-1183 also includes writing job descriptions for TaskRunners in
different languages. You were the first to raise it [here|https://issues.apache.org/jira/browse/MAPREDUCE-1183?focusedCommentId=12774027&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12774027].
Nothing in Java will be better than an implementation in the native language, someday it will
be written, and MAPREDUCE-1183 makes progress toward it by defining a submission API that
is as versatile as the serialization and not {{Configuration}}. I think it makes more progress
toward polyglotism than the issue at hand.


I proposed earlier that the byte-oriented utilities effecting the work in MapReduce- {{MapOutputBuffer}},
{{IFile}}, {{Merger}}, etc.- be cleaned up and make available to frameworks. I still think
it is sufficient for performance improvements in the near, even foreseeable future.

Again, even one use case for this would be hugely useful to the discussion.

> The lowest level map-reduce APIs should be byte oriented
> --------------------------------------------------------
>                 Key: MAPREDUCE-326
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-326
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>            Reporter: eric baldeschwieler
>         Attachments: MAPREDUCE-326-api.patch, MAPREDUCE-326.pdf
> As discussed here:
> https://issues.apache.org/jira/browse/HADOOP-1986#action_12551237
> The templates, serializers and other complexities that allow map-reduce to use arbitrary
types complicate the design and lead to lots of object creates and other overhead that a byte
oriented design would not suffer.  I believe the lowest level implementation of hadoop map-reduce
should have byte string oriented APIs (for keys and values).  This API would be more performant,
simpler and more easily cross language.
> The existing API could be maintained as a thin layer on top of the leaner API.

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

View raw message