flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3665) Range partitioning lacks support to define sort orders
Date Wed, 20 Apr 2016 13:44:25 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249912#comment-15249912
] 

ASF GitHub Bot commented on FLINK-3665:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1848#discussion_r60409063
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
---
    @@ -546,43 +549,264 @@ public void testRangePartitionInIteration() throws Exception {
     		result.collect(); // should fail
     	}
     
    +
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
    +				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
    +			@Override
    +			public Tuple2<Long, Long> map(Long value) throws Exception {
    +				return new Tuple2<>(value / 5000, value % 5000);
    +			}
    +		});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new
LongComparator(true),
    +																			   new LongComparator(false));
    +
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>
collected = dataSet.partitionByRange(0, 1)
    +				.withOrders(Order.ASCENDING, Order.DESCENDING)
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected)
{
    +			if (previousMax == null) {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				previousMax = tuple2.f1;
    +			} else {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				if (previousMax.f0.equals(tuple2.f0.f0)) {
    +					assertEquals(previousMax.f1 - 1, tuple2.f0.f1.longValue());
    +				}
    +				previousMax = tuple2.f1;
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testRangePartitionerOnSequenceNestedDataWithOrders() throws Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		final DataSet<Tuple2<Tuple2<Long, Long>, Long>> dataSet = env.generateSequence(0,
10000)
    +				.map(new MapFunction<Long, Tuple2<Tuple2<Long, Long>, Long>>()
{
    +					@Override
    +					public Tuple2<Tuple2<Long, Long>, Long> map(Long value) throws Exception
{
    +						return new Tuple2<>(new Tuple2<>(value / 5000, value % 5000), value);
    +					}
    +				});
    +
    +		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new
LongComparator(true),
    +				new LongComparator(true));
    +		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
    +
    +		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>
collected = dataSet.partitionByRange(0)
    +				.withOrders(Order.ASCENDING)
    +				.mapPartition(new MapPartitionFunction<Tuple2<Tuple2<Long,Long>,Long>,
Tuple2<Long, Long>>() {
    +					@Override
    +					public void mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>>
values,
    +											 Collector<Tuple2<Long, Long>> out) throws Exception {
    +						for (Tuple2<Tuple2<Long, Long>, Long> value : values) {
    +							out.collect(value.f0);
    +						}
    +					}
    +				})
    +				.mapPartition(minMaxSelector)
    +				.collect();
    +
    +		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
    +
    +		Tuple2<Long, Long> previousMax = null;
    +		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected)
{
    +			if (previousMax == null) {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				previousMax = tuple2.f1;
    +			} else {
    +				assertTrue(tuple2Comparator.compare(tuple2.f0, tuple2.f1) < 0);
    +				if (previousMax.f0.equals(tuple2.f0.f0)) {
    +					assertEquals(previousMax.f1 + 1, tuple2.f0.f1.longValue());
    +				}
    +				previousMax = tuple2.f1;
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws
Exception {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		final DataSet<Tuple2<ComparablePojo, Long>> dataSet = env.generateSequence(0,
10000)
    +				.map(new MapFunction<Long, Tuple2<ComparablePojo, Long>>() {
    +					@Override
    +					public Tuple2<ComparablePojo, Long> map(Long value) throws Exception {
    +						return new Tuple2<>(new ComparablePojo(value / 5000, value % 5000), value);
    +					}
    +				});
    +
    +		final List<Tuple2<ComparablePojo, ComparablePojo>> collected = dataSet
    +				.partitionByRange(new KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>()
{
    +					@Override
    +					public ComparablePojo getKey(Tuple2<ComparablePojo, Long> value) throws Exception
{
    +						return value.f0;
    +					}
    +				})
    +				.withOrders(Order.ASCENDING)
    +				.mapPartition(new MinMaxSelector<>(new ComparablePojoComparator()))
    +				.mapPartition(new ExtractComparablePojo())
    +				.collect();
    +
    +		final Comparator<Tuple2<ComparablePojo, ComparablePojo>> pojoComparator
=
    +				new Comparator<Tuple2<ComparablePojo, ComparablePojo>>() {
    +			@Override
    +			public int compare(Tuple2<ComparablePojo, ComparablePojo> o1,
    +							   Tuple2<ComparablePojo, ComparablePojo> o2) {
    +				return o1.f0.compareTo(o2.f1);
    +			}
    +		};
    +		Collections.sort(collected, pojoComparator);
    +
    +		ComparablePojo previousMax = null;
    +		for (Tuple2<ComparablePojo, ComparablePojo> element : collected) {
    +			if (previousMax == null) {
    +				assertTrue(element.f0.compareTo(element.f1) < 0);
    +				previousMax = element.f1;
    +			} else {
    +				assertTrue(element.f0.compareTo(element.f1) < 0);
    +				if (previousMax.first.equals(element.f0.first)) {
    +					assertEquals(previousMax.second - 1, element.f0.second.longValue());
    +				}
    +				previousMax = element.f1;
    +			}
    +		}
    +	}
    +
    +	private static class ExtractComparablePojo implements MapPartitionFunction<
    +			Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>,
    +			Tuple2<ComparablePojo, ComparablePojo>> {
    +
    +		@Override
    +		public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, Long>,
Tuple2<ComparablePojo, Long>>> values,
    +								 Collector<Tuple2<ComparablePojo, ComparablePojo>> out) throws Exception
{
    +			for (Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>
value : values) {
    +				out.collect(new Tuple2<>(value.f0.f0, value.f1.f0));
    +			}
    +		}
    +	}
    +
    +    private static class ComparablePojoComparator implements Comparator<Tuple2<ComparablePojo,
Long>>, Serializable {
    +
    +		@Override
    +		public int compare(Tuple2<ComparablePojo, Long> o1,
    +						   Tuple2<ComparablePojo, Long> o2) {
    +			return o1.f0.compareTo(o2.f0);
    +		}
    +	}
    +
    +	private static class ComparablePojo implements Comparable<ComparablePojo> {
    +		private Long first;
    +		private Long second;
    +
    +		public Long getFirst() {
    +			return first;
    +		}
    +
    +		public void setFirst(Long first) {
    +			this.first = first;
    +		}
    +
    +		public Long getSecond() {
    +			return second;
    +		}
    +
    +		public void setSecond(Long second) {
    +			this.second = second;
    +		}
    +
    +		public ComparablePojo(Long first,
    +							  Long second) {
    +			this.first = first;
    +			this.second = second;
    +		}
    +
    +		public ComparablePojo() {
    +		}
    +
    +		@Override
    +		public int compareTo(ComparablePojo o) {
    +			final int firstResult = Long.compare(this.first, o.first);
    +			if (firstResult == 0) {
    +				return (-1) * Long.compare(this.second, o.second);
    +			}
    +
    +			return firstResult;
    +		}
    +	}
    +
     	private static class ObjectSelfKeySelector implements KeySelector<Long, Long>
{
     		@Override
     		public Long getKey(Long value) throws Exception {
     			return value;
     		}
     	}
     
    -	private static class MinMaxSelector implements MapPartitionFunction<Long, Tuple2<Long,
Long>> {
    +	private static class MinMaxSelector<T> implements MapPartitionFunction<T, Tuple2<T,
T>> {
    +
    +		private final Comparator<T> comparator;
    +
    +		public MinMaxSelector(Comparator<T> comparator) {
    +			this.comparator = comparator;
    +		}
    +
     		@Override
    -		public void mapPartition(Iterable<Long> values, Collector<Tuple2<Long,
Long>> out) throws Exception {
    -			long max = Long.MIN_VALUE;
    -			long min = Long.MAX_VALUE;
    -			for (long value : values) {
    -				if (value > max) {
    +		public void mapPartition(Iterable<T> values, Collector<Tuple2<T, T>>
out) throws Exception {
    +			Iterator<T> itr = values.iterator();
    +			T min = itr.next();
    +			T max = min;
    +			T value;
    +			while (itr.hasNext()) {
    +				value= itr.next();
    +				if (comparator.compare(value, min) < 0) {
    +					min = value;
    +				}
    +				if (comparator.compare(value, max) > 0) {
     					max = value;
     				}
     
    -				if (value < min) {
    -					min = value;
    -				}
     			}
    -			Tuple2<Long, Long> result = new Tuple2<>(min, max);
    +
    +			Tuple2<T, T> result = new Tuple2<>(min, max);
     			out.collect(result);
     		}
     	}
     
    -	private static class Tuple2Comparator implements Comparator<Tuple2<Long, Long>>
{
    +	private static class Tuple2Comparator<T> implements Comparator<Tuple2<T,
T>>, Serializable {
    +
    +		private final Comparator<T> firstComparator;
    +		private final Comparator<T> secondComparator;
    +
    +		public Tuple2Comparator(Comparator<T> comparator) {
    +			this(comparator, comparator);
    +		}
    +
    +		public Tuple2Comparator(Comparator<T> firstComparator,
    +								Comparator<T> secondComparator) {
    +			this.firstComparator = firstComparator;
    +			this.secondComparator = secondComparator;
    +		}
    +
     		@Override
    -		public int compare(Tuple2<Long, Long> first, Tuple2<Long, Long> second)
{
    -			long result = first.f0 - second.f0;
    +		public int compare(Tuple2<T, T> first, Tuple2<T, T> second) {
    +			long result = firstComparator.compare(first.f0, second.f0);
     			if (result > 0) {
     				return 1;
     			} else if (result < 0) {
     				return -1;
     			}
     
    -			result = first.f1 - second.f1;
    +			result = secondComparator.compare(first.f1, second.f1);
    --- End diff --
    
    Right, but the second case is only applied in sorting. So in case that min-values are
equal will fail on the check that prevMax < curMin.


> Range partitioning lacks support to define sort orders
> ------------------------------------------------------
>
>                 Key: FLINK-3665
>                 URL: https://issues.apache.org/jira/browse/FLINK-3665
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataSet API
>    Affects Versions: 1.0.0
>            Reporter: Fabian Hueske
>             Fix For: 1.1.0
>
>
> {{DataSet.partitionByRange()}} does not allow to specify the sort order of fields. This
is fine if range partitioning is used to reduce skewed partitioning. 
> However, it is not sufficient if range partitioning is used to sort a data set in parallel.

> Since {{DataSet.partitionByRange()}} is {{@Public}} API and cannot be easily changed,
I propose to add a method {{withOrders(Order... orders)}} to {{PartitionOperator}}. The method
should throw an exception if the partitioning method of {{PartitionOperator}} is not range
partitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message