hive-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maria <>
Subject Re:Re: How the actual "sample data" are implemented when using tez reduce auto-parallelism
Date Wed, 02 Mar 2016 01:36:47 GMT
 Thank you very very very much for your patiently answers. I got it, this is very helpful
to understand the auto-parallelism Optimization.

At 2016-02-29 12:50:04, "Rajesh Balamohan" <> wrote:

"tez.shuffle-vertex-manager.desired-task-input-size" - Determines the amount of desired task
input size per reduce task. Default is around 100 MB.

"tez.shuffle-vertex-manager.min-task-parallelism" - Min task parallelism that ShuffleVertexManager
should honor. I.e, if the client has set it as 100, ShuffleVertexManager would not try auto-reduce
less than 100 tasks.

"tez.shuffle-vertex-manager.min-src-fraction", "tez.shuffle-vertex-manager.max-src-fraction"
determine the slow-start behavior.

Hive mainly sets "tez.shuffle-vertex-manager.desired-task-input-size" and "tez.shuffle-vertex-manager.min-task-parallelism"
at the time of creating the DAG. Min-task-parallelism is determined internally in Hive by
couple of other parameters like "hive.tez.max.partition.factor / hive.tez.min.partition.factor"
along with data size per reduce task. For instance, assume initial reduce task number is 100
& hive.tez.max.partition.factor=2.0 and hive.tez.min.partition.factor=0.25.  In this
case, Hive would set the reducers to 200 and the hint to tez for its min-task-parallelism
would be 25, so that Tez would not try to auto-reduce below 25 tasks. This serves as a safety

In Tez, When a source task generates output, DataMovementEvent (via RPC) is sent out and its
payload carry details like outputsize.  ShuffleVertexManager keeps aggregating these values
from different source tasks and checks periodically on whether it can determine compute the
value for auto-reduce parallelism. If the aggregated data size is less than configured "desired-task-input-size",
it waits for output stats from more source tasks. It is possible that by this time, the min-src-fraction
reaches it limits. But min-src-fraction config is dynamically overriden as it is better to
wait for data from more tasks to determine more accurate value for auto-parallelism.

There can be scenarios where the auto-reduce computed value is greater than the currently
configured parallelism depending on the amount of data emitted by source tasks.  In such
cases, existing parallelism is used.  

Following method contains details on how parallelism is determined at runtime.

It is also possible for source to send the per-partition stats along with the DataMovementEvent
payload. Retaining all details in the same payload can be fairly expensive. Currently, per-partition
details are bucketted into one of the data range (0, 1, 10, 100, 1000 MB) and are stored in
RoaringBitMap in the payload. This can be a little noisy, but atleast provides better hints
to ShuffleVertexManager. Based on this info, ShuffleVertexManager can schedule the reducer
task which would get the maximum amount of data. This can be enabled via ""
(not enabled by default)


On Sat, Feb 27, 2016 at 11:45 AM, LLBian <> wrote:

Oh,I saw some useful mesage about statistics on data from TEZ_1167.

 now, my main confusions are:

(1) how does the reduce ShuffleVertexManger know how many sample data is enough to  estimate
the whole vertex parallelism.

(2) the relationship between edge and event

I am eager to get your instruction.

Any reply  would be very very grateful.

At 2016-02-27 11:13:48, "LLBian" <> wrote:



>Hello, Respected experts:


>Recently, I am studying  tez reduce auto-parallelism, I read the article "Apache Tez: Dynamic Graph Reconfiguration",TEZ-398 and HIVE-7158.

>I found the HIVE-7158 said that "Tez can optionally sample data from a fraction of the tasks of a vertex and use that information to choose the number of downstream tasks for any given scatter gather edge".

>I know how to use this optimization function,but I was so confused by this:


>" Tez defines a VertexManager event that can be used to send an arbitrary user payload
to the vertex manager of a given vertex. The partitioning tasks (say the Map tasks) use this
event to send statistics such as the size of the output partitions produced to the ShuffleVertexManager
for the reduce vertex. The manager receives these events and tries to model the final output
statistics that would be produced by the all the tasks."


>(1)How the actual "sample data" are implemented?I mean how does the reduce ShuffleVertexManger
know how many sample data is enough to  estimate the whole vertex parallelism, is that relates to
reduce slow-start?  I studied the source code of apache tez-0.7.0, but still not very clear.
Mybe I was too stupid to understood that.

>(2)Is the partitioning tasks proactively send their output data stats to the consumer
ShuffleVertexManger ? The event is sended by RPC or http?


>I am eager to get your instruction.


>Any reply would be very very grateful.






View raw message