flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Partition problem
Date Mon, 25 Apr 2016 10:09:55 GMT
Hi Andrew,

I might be wrong, but I think this problem is caused by an assumption of
how Flink reads input data.
In Flink, each InputSplit is not read by a new task and a split does not
correspond to a partition. This is different from how Hadoop MR and Spark
handle InputSplits.

Instead, Flink creates as many DataSource tasks as specified by the task
parallelism and lazily assigns InputSplits to its subtasks. Idle DataSource
subtasks request InputSplits from the JobManager and the assignment happens
first-come-first-serve.
Hence, the subtask ID (or partition ID) of an InputSplit is not
deterministic and a DataSource might read more than one or also no split at
all (such as in your case).

If you need the split ID in your program, you can implement an InputFormat,
which wraps another IF and assigns the ID of the current InputSplit to the
read data, i.e., converts the DataType from T to Tuple2[Int, T].

Hope this helps,
Fabian


2016-04-25 11:27 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:

> Hi Andrew,
>
> I think the problem is that you assume that both matrices have the same
> partitioning. If you guarantee that this is the case, then you can use the
> subtask index as the block index. But in the general case this is not true,
> and then you have to calculate the blocks by first assigning a block index
> (e.g. rows with 0-9 index are assigned to block 0, rows with 10-19 assigned
> to block 1, etc.) and then create the blocks by reducing on this block
> index. That's because the distribution of the individual rows in the
> cluster is not necessarily the same between two matrices.
>
> Cheers,
> Till
>
> On Mon, Apr 25, 2016 at 1:40 AM, Andrew Palumbo <ap.dev@outlook.com>
> wrote:
>
> > Hi All,
> >
> >
> > I've run into a problem with empty partitions when the number of elements
> > in a DataSet is less than the Degree of Parallelism.  I've created a gist
> > here to describe it:
> >
> >
> > https://gist.github.com/andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3
> >
> >
> > I have two 2x2 matrices, Matrix A and Matrix B and an execution
> > environment where the degree of parallelism is 4. Both matrices   are
> > blockified in  2 different DataSet s . In this case (the case of a 2x2
> > matrices with 4 partitions) this means that each row goes into a
> partition
> > leaving 2 empty partitions. In Matrix A, the rows go into partitions 0,
> 1.
> > However the rows of Matrix B end up in partitions 1, 2. I assign the
> > ordinal index of the blockified matrix's partition to its block, and then
> > join on that index.
> >
> >
> > However in this case, with differently partitioned matrices of the same
> > geometry, the intersection of the blockified matrices' indices is 1, and
> > partitions 0 and 2 are dropped.
> >
> >
> > I've tried explicitly defining the dop for Matrix B using the count of
> > non-empty partitions in Matrix A, however this changes the order of the
> > DataSet, placing partition 2 into partition 0.
> >
> >
> > Is there a way to make sure that these datasets are partitioned in the
> > same way?
> >
> >
> > Thank you,
> >
> >
> > Andy
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message