tez-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bikas Saha (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (TEZ-2001) Support pipelined data transfer for ordered output
Date Wed, 11 Feb 2015 00:47:11 GMT

    [ https://issues.apache.org/jira/browse/TEZ-2001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14315301#comment-14315301

Bikas Saha commented on TEZ-2001:

Summarizing an offline email thread on this jira in comment format.

From: Bikas

It wasn’t clear in which scenarios it would be useful. 
Because it looked like it would be useful when there is a single wave of running mappers and
all the reducers are also running. Such jobs would very likely be small (due to small data)
and the mappers would not really be spilling in that case. So there would be no pipelining
of spills. Other cases are likely taken care of by slow start.
But looking at it in the light of skew can provide a different perspective. Lets say all mappers
but a small number N have finished and all reducers have been slow started and have fetched
the results of the other completed mappers. These N skewed mappers somehow had lots of data
and so are slow and also spilling. This situation now looks like a single wave of mappers
with all reducers running. Thus pipelining of shuffles would be useful in this case to overlay
intermediate data pull with the mappers execution time. The effectiveness of the gains would
be determined by the time taken by the mapper to run vs the time taken to fetch.
Is the above understanding correct? If so, then should we plan to implement the pipelining
scheme in a manner that fits it better. That would mean, not pipelining all mappers but having
the shuffle vertex manager monitor stats on the mappers. Then the shuffle vertex manager could
figure out which mappers are stragglers due to data (and spilling). Then it would send them
an event to initiate pipelining. The shuffle output would work as usual until it receives
such a signal. Upon receiving this signal it would send a DM event for the already spilled
fragments and continue to send more events for future spills until done. This way we will
not pay the cost of pipelining for every mapper whilst being able to benefit from it for stragglers.
Btw, the change also improves shuffle output to not do a final merge but let the reducers
do the merge (which they are doing anyways across mappers). Given the ability of the shuffle
service to pipeline multiple chunks in bulk on the same connection, this would be a net win.
This win is going to be beneficial to all mappers. So perhaps we can make that change separate
from the pipelining change so that’s its independently tested and production ready.

From: Rajesh

Right, in the patch posted for ordered case, not all mappers would end up in pipelined mode.
 It works pretty much in the mode you have described.
1.	User sets up tez.runtime.pipelined-shuffle.enable (true/false)
2.	All mappers behave in normal way, until a spill (including final spill) is done in pipelinedsorter.
1.	This is similar to what you mentioned as "This way we will not pay the cost of pipelining
for every mapper whilst being able to benefit from it for stragglers."
3.	When a spill happens in specific mappers (stragglers),
1.	DM event is sent out with spill details
2.	Final merge is completely avoided in mapper side for pipelined transfers. Less IO.
3.	Fetcher pulls the data in incremental fashion.
4.	It is quite possible that the final_event could be fetched a lot faster than the other
spill events. Hence spill_id is maintained in DM events for pipelined cases and appropriate
checks are done in Shuffle side.
4.	However, certain changes need to be done in consumer/fetcher side.
1.	What should we do when a source went down after downloading parts of the spills.  These
spills could have been merged already in the consumer side (as merger thread runs in parallel).
If we make changes to ShuffleVertexManager (I.e event getting triggered in VM to do pipelining),
#4 mentioned above applies.  Plz let me know if I am missing something here.
Shuffle vertex manager monitoring stats on the mappers can be extremely beneficial to prioritize
scheduling fetchers (I.e, identifying the mapper which is churning out lots of data &
using it to download the data much earlier)

From: Bikas

I see what you are saying. However if there is a large (ETL) job with lots of data per mapper
then its likely they will all spill to some extent thus triggering extra DM events for pipelining
without being able to benefit from it. In the worst case, we may not be able to turn this
awesome feature on by default due to false positives.
My suggestion to turn this on by events to specific mappers instead of all mappers via config
option would allow us to turn this on always (after testing) such that it does magic when
needed without affecting the rest of the jobs.
Case 2) below. Does this mean that final merge avoidance in the mapper requires the use of
pipelined DM events and cannot be done without them? Can that be changed by introducing a
new encoding in the fetch URL that gives the same information as the different spills do?
Even without overloading the URL, the payload of the DM event for shuffle could be enhanced
to give all the info about spills in one final event so as to avoid the final merge in the
mapper. Essentially the fetch mechanism becomes entirely spill based with the difference between
pipelining and non-pipelining being sending all spill info in multiple events vs one event.
Case 4) below. Failures can only be handled in our re-execution based fault tolerance by making
the compute deterministic. So the question is whether our spill is deterministic? If the same
mapper runs on the same data and produces the same output then would we produce the same spills?
If yes, we are good because the DM event spill encoding and reducer side checks would ensure
that we don’t fetch any spills that have already been fetched. This would be similar to
existing logic that does not fetch a mapper’s output if the mapper is rerun if that output
has already been fetched from the previous mapper version. Essentially, the shuffle fetch
side logic becomes mapper+spills dependent vs being just mapper dependent.
On the other hand, if spills are not deterministic then the solution would be to find out
how to make them so. I would expect our spills to be deterministic because they are based
on fixed sizes and for deterministic output records the size based calculations should trigger
identical spills. Right? Alternatively, if we move to the mapper+spills model then the reducer
would know if its pulling all spills or partial spills based on the metadata in the DM event.
Partial spills could be fetched and merged separately from complete spills. In any case, the
final KVs to the reducer is a view over multiple merged files. So we can afford to have multiple
merge files. The partial spill merge file can be dropped based on retries of mappers. However,
the better solution would be to make the spill deterministic.
This is in fact needed, because if pipelining is targeting the skew/straggler scenario, then
it is very likely that a speculative instance will be launched for the straggling mapper.
Thus we cannot limit the versions of such mappers to 1 and also the reducer needs to be able
to fetch the results from the speculated mapper if that finishes first (while its also sending
pipelined events). Thus not being able to handle multiple retries of the mapper would make
the feature a non-starter for its primary use case.

From: Rajesh

Case 2)
- For pipelining, we definitely need additional DM events in order to communicate to fetchers
that some partial data/spill is available.  If we encode this information in url, fetchers
can begin pulling the data only after the source has finished. This would in a way, defeat
the purpose of pipelining.
- But I see your point where in, we can avoid final merge even without pipelining + some additional
changes to fetcher side.
- Let PipelinedSorter spill (assume 4 spills)
- Spills are stored in different directories locally.
- Skip final merge.
- Add number of spills in fetcher URL
- Final DM event (single event) is sent out with URL details
- Need to handle empty-partitions case, where we don’t set URL in DM event.
- Fetcher parses URL & spill details (as > 1)
- Fetcher behaves as if it got 4 different events and enqueues 4 items to its fetcher queue.
- When all the 4 parts are received, it sets SUCCESS flag for that attempt.
- If we have to enable pipelining (I.e to let fetchers pull spills early), we need to probably
add more info to URL to indicate that more events would be coming through.
Case 4)
- I am not 100% sure if spills can be deterministic.  For majority of usecases it would be.
 However, there can be other weird scenarios in Hive wherein this is not deterministic (may
be sampling usecase or so).  Even if a mapper fails, it might trigger re-execution for the
same data in another machine, but based on the random/sample function, it might end up generating
different spill boundaries.  It might end up having different results in case of failures,
but might be acceptable for whatever reasons. Gopal is the best person for providing more
corner cases. :) 
- Idea was to start supporting partial fetch failures in follow up JIRA. At a high level,
we can mark the partially fetched outputs so that they are not qualified for merging in parallel.
 So, even when source dies down, we can end up fetching the entire set again without issues
(but some amount of network wastage has happened in the previous fetching).
If we prioritize the fixes (i.e fixing first “avoid partially downloaded spills from getting
merged”), then skipping final-merge for mappers would be possible and production ready in
pipelinedsorter. :)

From: Bikas

If the task is non-deterministic then Tez cannot guarantee deterministic results under failures.
So if task non-determinism is making spills non-deterministic then IMO, that’s a won’t
fix/unsupported. If spills are deterministic for deterministic tasks then we can make use
that assumption to make a much better solution than setting aside partial merges and re-fetching
the same data again from the re-run.

From: Gopal

Tasks are logically deterministic within any partition/reducer output.

That’s an axiom, but there is no forcing function to ensure they are byte-for-byte deterministic
because the spill indexes needn’t match up, because the shuffle handler does the mapping
from index -> offsets. 

The current Hive model triggers 10% off-loads off the in-memory aggregations, to get a higher
hit-rate for reductions instead collecting all keys as they are encountered, the 10% fit entirely
depends on the traversal order of the buckets in an in-memory hash table.

We can revert to the slow-path of MRv2 and turn off all map-side in-memory reductions in Hive
to get to predictable spilling, but it’s not the way we want to go. 

FYI, this is a non-issue right now, if you merely test out the code base but the hashtable
traversal is not deterministically forced on JDK8  due to the removal of jdk.map.althashing.threshold.

From: Siddharth

I'm not sure this should be unsupported. If tasks generated non deterministic output - either
byte for byte, or the final output across different attempts (which is possible with some
Hive non-deterministic operations like random) - then the number of attempts can just be set
to 1. Once we support vertex level configuration, this will no longer apply to the entire

There can be different strategies for handling failure - avoiding merge, keeping track of
merged spills and not re-fetching them. The same applies to the Unordered case as well - and
can be configured per vertex.

Using the VertexManager is nice - but seems like a follow on item, along with failure handling

> Support pipelined data transfer for ordered output
> --------------------------------------------------
>                 Key: TEZ-2001
>                 URL: https://issues.apache.org/jira/browse/TEZ-2001
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-2001.1.patch, TEZ-2001.2.patch

This message was sent by Atlassian JIRA

View raw message