flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7) [GitHub] Enable Range Partitioner
Date Tue, 27 Oct 2015 17:36:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14976806#comment-14976806
] 

ASF GitHub Bot commented on FLINK-7:
------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1255#discussion_r43156865
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -1223,6 +1230,51 @@ public long count() throws Exception {
     		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor,
getType());
     		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T,
K>(clean(keyExtractor), this.getType(), keyType), Utils.getCallLocationName());
     	}
    +
    +	/**
    +	 * Range-partitions a DataSet using the specified KeySelector.
    +	 * <p>
    +	 * <b>Important:</b>This operation shuffles the whole DataSet over the network
and can take significant amount of time.
    +	 *
    +	 * @param keySelector The KeySelector with which the DataSet is range-partitioned.
    +	 * @return The partitioned DataSet.
    +	 *
    +	 * @see KeySelector
    +	 */
    +	public <K extends Comparable<K>> DataSet<T> partitionByRange(KeySelector<T,
K> keySelector) {
    +		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector,
getType());
    +		String callLocation = Utils.getCallLocationName();
    +
    +		// Extract key from input element by keySelector.
    +		KeyExtractorMapper<T, K> keyExtractorMapper = new KeyExtractorMapper<T, K>(keySelector);
    --- End diff --
    
    I think you would still have the nodes and all the information of the `OptimizedPlan`
available in `connectJobVertices()`. However, I would also be OK to do it as a preprocessing
step in `compileJobGraph()`. 
    Let me know if you face any obstacles or have any questions.


> [GitHub] Enable Range Partitioner
> ---------------------------------
>
>                 Key: FLINK-7
>                 URL: https://issues.apache.org/jira/browse/FLINK-7
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Runtime
>            Reporter: GitHub Import
>            Assignee: Chengxiang Li
>             Fix For: pre-apache
>
>
> The range partitioner is currently disabled. We need to implement the following aspects:
> 1) Distribution information, if available, must be propagated back together with the
ordering property.
> 2) A generic bucket lookup structure (currently specific to PactRecord).
> Tests to re-enable after fixing this issue:
>  - TeraSortITCase
>  - GlobalSortingITCase
>  - GlobalSortingMixedOrderITCase
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/7
> Created by: [StephanEwen|https://github.com/StephanEwen]
> Labels: core, enhancement, optimizer, 
> Milestone: Release 0.4
> Assignee: [fhueske|https://github.com/fhueske]
> Created at: Fri Apr 26 13:48:24 CEST 2013
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message