flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Conrad <con...@math.fu-berlin.de>
Subject Re: Best way to process data in many files? (FLINK-BATCH)
Date Wed, 24 Feb 2016 16:16:57 GMT
Dear Till and others.

I solved the issue by using the strategy suggested by Till like this:

         List<String> fileListOfSpectra = ...
         SplittableList<String> fileListOfSpectraSplitable = new 
SplittableList<String>( fileListOfSpectra );
         DataSource<String> fileListOfSpectraDataSource = 
env.fromParallelCollection( fileListOfSpectraSplitable, String.class );

and then - as before -

  DataSet<Peaklist> peakLists = fileListOfSpectraDataSource
                 .flatMap(new ReadDataFromFile())
...

(Find the source for the class "SplittableList" below). Now FLINK 
distributes the tasks to all available FLINK nodes.

Thanks for the help!

Cheers
Tim



On 24.02.2016 16:30, Till Rohrmann wrote:
>
> If I’m not mistaken, then this shouldn’t solve the scheduling 
> peculiarity of Flink. Flink will still deploy the tasks of the flat 
> map operation to the machine where the source task is running. Only 
> after this machine has no more slots left, other machines will be used 
> as well.
>
> I think that you don’t need an explicit |rebalance()| method here. 
> Flink will automatically insert the |PartitionMethod.REBALANCE| strategy.
>
> Cheers,
> Till
>
> ​

|import org.apache.flink.util.SplittableIterator; import 
java.util.Iterator; import java.util.List; public class 
SplittableList<T> extends SplittableIterator<T> { private List<T> list;

private int cursor; public SplittableList(List<T> list) { this.cursor = 
0; this.list = list; } @Override public Iterator<T>[] split(int 
numPartitions) { if (numPartitions < 1) { throw new 
IllegalArgumentException("The number of partitions must be at least 
1."); } Iterator<T>[] iters = new Iterator[numPartitions]; if 
(numPartitions == 1) { iters[0] = new SplittableList(list); return 
iters; } int partSize = (int) Math.floor((double) list.size() / 
numPartitions); for (int i = 0; i < (numPartitions - 1); i++) { List<T> 
subFileList = list.subList(i * partSize, (i + 1) * partSize); iters[i] = 
new SplittableList(subFileList); } List<T> subFileList = 
list.subList((numPartitions - 1) * partSize, list.size()); 
iters[numPartitions - 1] = new SplittableList(subFileList); return 
iters; } @Override public int getMaximumNumberOfSplits() { return 
list.size(); } public boolean hasNext() { return (cursor < list.size()); 
} public T next() { T item = list.get(cursor); cursor++; return item; } 
public void remove() { throw new IllegalArgumentException("Remove not 
implemented yet."); } }|



Mime
View raw message