flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2106) Add outer joins to Runtime
Date Wed, 26 Aug 2015 15:53:46 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14714226#comment-14714226
] 

ASF GitHub Bot commented on FLINK-2106:
---------------------------------------

Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1052#discussion_r37999088
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/AbstractOuterJoinTaskExternalITCase.java
---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.flink.runtime.operators;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.functions.FlatJoinFunction;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.IntComparator;
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
    +import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
    +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
    +import org.apache.flink.runtime.operators.testutils.BinaryOperatorTestBase;
    +import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
    +import org.apache.flink.util.Collector;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +public abstract class AbstractOuterJoinTaskExternalITCase extends BinaryOperatorTestBase<FlatJoinFunction<Tuple2<Integer,
Integer>,
    +		Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, Tuple2<Integer,
Integer>, Tuple2<Integer, Integer>> {
    +
    +	private static final long HASH_MEM = 4*1024*1024;
    +
    +	private static final long SORT_MEM = 3*1024*1024;
    +
    +	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
    +
    +	private final double bnljn_frac;
    +
    +	@SuppressWarnings("unchecked")
    +	private final TypeComparator<Tuple2<Integer, Integer>> comparator1 = new
TupleComparator<>(
    +			new int[]{0},
    +			new TypeComparator<?>[] { new IntComparator(true) },
    +			new TypeSerializer<?>[] { IntSerializer.INSTANCE });
    +
    +	@SuppressWarnings("unchecked")
    +	private final TypeComparator<Tuple2<Integer, Integer>> comparator2 = new
TupleComparator<>(
    +			new int[]{0},
    +			new TypeComparator<?>[] { new IntComparator(true) },
    +			new TypeSerializer<?>[] { IntSerializer.INSTANCE });
    +
    +	private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new
TupleSerializer<>(
    +			(Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
    +			new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE });
    +
    +	private final CountingOutputCollector output = new CountingOutputCollector();
    +
    +	private final DriverStrategy driverStrategy;
    +
    +	public AbstractOuterJoinTaskExternalITCase(ExecutionConfig config, DriverStrategy driverStrategy)
{
    +		super(config, HASH_MEM, 2, SORT_MEM);
    +		bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
    +		this.driverStrategy = driverStrategy;
    +	}
    +
    +	@Test
    +	public void testExternalSort1OuterJoinTask() {
    +		final int keyCnt1 = 16384*4;
    +		final int valCnt1 = 2;
    +		
    +		final int keyCnt2 = 8192;
    +		final int valCnt2 = 4*2;
    --- End diff --
    
    need spaces around each operator


> Add outer joins to Runtime
> --------------------------
>
>                 Key: FLINK-2106
>                 URL: https://issues.apache.org/jira/browse/FLINK-2106
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Local Runtime
>            Reporter: Fabian Hueske
>            Assignee: Ricky Pogalz
>            Priority: Minor
>             Fix For: pre-apache
>
>
> Add left/right/full outer join methods to the runtime of Flink.
> Initially, the execution strategy should be a sort-merge outer join (FLINK-2105) but
can later be extended to hash joins for left/right outer joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message