flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ChengXiangLi <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-7] [Runtime] Enable Range Partitioner.
Date Tue, 27 Oct 2015 06:45:15 GMT
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1255#discussion_r43088529
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -1223,6 +1230,51 @@ public long count() throws Exception {
     		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor,
getType());
     		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T,
K>(clean(keyExtractor), this.getType(), keyType), Utils.getCallLocationName());
     	}
    +
    +	/**
    +	 * Range-partitions a DataSet using the specified KeySelector.
    +	 * <p>
    +	 * <b>Important:</b>This operation shuffles the whole DataSet over the network
and can take significant amount of time.
    +	 *
    +	 * @param keySelector The KeySelector with which the DataSet is range-partitioned.
    +	 * @return The partitioned DataSet.
    +	 *
    +	 * @see KeySelector
    +	 */
    +	public <K extends Comparable<K>> DataSet<T> partitionByRange(KeySelector<T,
K> keySelector) {
    +		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector,
getType());
    +		String callLocation = Utils.getCallLocationName();
    +
    +		// Extract key from input element by keySelector.
    +		KeyExtractorMapper<T, K> keyExtractorMapper = new KeyExtractorMapper<T, K>(keySelector);
    --- End diff --
    
    Yes, it's very low level job abstraction, not sure whether i can get everything required,
i didn't find any precedent of this, but it deserve a try. 
    Besides, everything required(ship strategy type / target parallelism) is available at
`OptimizedPlan` level, so i think it should be better to inject the sampling and partitionID
assignment code by modification of `OptimizedPlan` at the  begining of `JobGraphGenerator::compileJobGraph`
instead of the previous inject point as the next comment mentioned. The previous inject point
is at the middle stage of building `JobGraph`, and require rewriting of `JobGraph`,  even
lower level than `OptimizedPlan`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message