flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3702) DataStream API PojoFieldAccessor doesn't support nested POJOs
Date Tue, 23 Aug 2016 17:01:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433202#comment-15433202
] 

ASF GitHub Bot commented on FLINK-3702:
---------------------------------------

Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2094#discussion_r75908195
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
---
    @@ -0,0 +1,280 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import static org.junit.Assert.*;
    +
    +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.junit.Test;
    +
    +public class FieldAccessorTest {
    +
    +	// Note, that AggregationFunctionTest indirectly also tests FieldAccessors.
    +	// ProductFieldAccessor is tested in CaseClassTypeInfoTest.
    +
    +	@Test
    +	public void testFlatTuple() {
    +		Tuple2<String, Integer> t = Tuple2.of("aa", 5);
    +		TupleTypeInfo<Tuple2<String, Integer>> tpeInfo =
    +				(TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(t);
    +
    +		FieldAccessor<Tuple2<String, Integer>, String> f0 = tpeInfo.getFieldAccessor("f0",
null);
    +		assertEquals("aa", f0.get(t));
    +		assertEquals("aa", t.f0);
    +		t = f0.set(t, "b");
    +		assertEquals("b", f0.get(t));
    +		assertEquals("b", t.f0);
    +
    +		FieldAccessor<Tuple2<String, Integer>, Integer> f1 = tpeInfo.getFieldAccessor("f1",
null);
    +		assertEquals(5, (int) f1.get(t));
    +		assertEquals(5, (int) t.f1);
    +		t = f1.set(t, 7);
    +		assertEquals(7, (int) f1.get(t));
    +		assertEquals(7, (int) t.f1);
    +		assertEquals("b", f0.get(t));
    +		assertEquals("b", t.f0);
    +
    +
    +		FieldAccessor<Tuple2<String, Integer>, Integer> f1n = tpeInfo.getFieldAccessor(1,
null);
    +		assertEquals(7, (int) f1n.get(t));
    +		assertEquals(7, (int) t.f1);
    +		t = f1n.set(t, 10);
    +		assertEquals(10, (int) f1n.get(t));
    +		assertEquals(10, (int) f1.get(t));
    +		assertEquals(10, (int) t.f1);
    +		assertEquals("b", f0.get(t));
    +		assertEquals("b", t.f0);
    +	}
    +
    +	@Test
    +	public void testTupleInTuple() {
    +		Tuple2<String, Tuple3<Integer, Long, Double>> t = Tuple2.of("aa", Tuple3.of(5,
9L, 2.0));
    +		TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>> tpeInfo
=
    +				(TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>>)TypeExtractor.getForObject(t);
    +
    +		FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, String>
f0 = tpeInfo.getFieldAccessor("f0", null);
    +		assertEquals("aa", f0.get(t));
    +		assertEquals("aa", t.f0);
    +
    +		FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, Double>
f1f2 = tpeInfo.getFieldAccessor("f1.f2", null);
    +		assertEquals(2.0, f1f2.get(t), 0);
    +		assertEquals(2.0, t.f1.f2, 0);
    +		t = f1f2.set(t, 3.0);
    +		assertEquals(3.0, f1f2.get(t), 0);
    +		assertEquals(3.0, t.f1.f2, 0);
    +		assertEquals("aa", f0.get(t));
    +		assertEquals("aa", t.f0);
    +
    +		FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, Tuple3<Integer,
Long, Double>> f1 = tpeInfo.getFieldAccessor("f1", null);
    +		assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t));
    +		assertEquals(Tuple3.of(5, 9L, 3.0), t.f1);
    +		t = f1.set(t, Tuple3.of(8, 12L, 4.0));
    +		assertEquals(Tuple3.of(8, 12L, 4.0), f1.get(t));
    +		assertEquals(Tuple3.of(8, 12L, 4.0), t.f1);
    +		assertEquals("aa", f0.get(t));
    +		assertEquals("aa", t.f0);
    +
    +		FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, Tuple3<Integer,
Long, Double>> f1n = tpeInfo.getFieldAccessor(1, null);
    +		assertEquals(Tuple3.of(8, 12L, 4.0), f1n.get(t));
    +		assertEquals(Tuple3.of(8, 12L, 4.0), t.f1);
    +		t = f1n.set(t, Tuple3.of(10, 13L, 5.0));
    +		assertEquals(Tuple3.of(10, 13L, 5.0), f1n.get(t));
    +		assertEquals(Tuple3.of(10, 13L, 5.0), f1.get(t));
    +		assertEquals(Tuple3.of(10, 13L, 5.0), t.f1);
    +		assertEquals("aa", f0.get(t));
    +		assertEquals("aa", t.f0);
    +	}
    +
    +	@Test
    +	@SuppressWarnings("unchecked")
    +	public void tupleFieldAccessorOutOfBoundsTest() {
    +		try {
    +			FieldAccessor<Tuple2<Integer, Integer>, Integer> fieldAccessor =
    +					(FieldAccessor<Tuple2<Integer, Integer>, Integer>) (Object)
    +							TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class)
    +									.getFieldAccessor(2, null);
    +			fail();
    +		} catch (IndexOutOfBoundsException e) {
    +			// Nothing to do here
    +		}
    +	}
    +
    +	public static class Foo {
    +		public int x;
    +		public Tuple2<String, Long> t;
    +		public Short y;
    +
    +		public Foo() {}
    +
    +		public Foo(int x, Tuple2<String, Long> t, Short y) {
    +			this.x = x;
    +			this.t = t;
    +			this.y = y;
    +		}
    +	}
    +
    +	@Test
    +	public void testTupleInPojoInTuple() {
    +		Tuple2<String, Foo> t = Tuple2.of("aa", new Foo(8, Tuple2.of("ddd", 9L), (short)
2));
    +		TupleTypeInfo<Tuple2<String, Foo>> tpeInfo =
    +				(TupleTypeInfo<Tuple2<String, Foo>>)TypeExtractor.getForObject(t);
    +
    +		FieldAccessor<Tuple2<String, Foo>, Long> f1tf1 = tpeInfo.getFieldAccessor("f1.t.f1",
null);
    +		assertEquals(9L, (long) f1tf1.get(t));
    +		assertEquals(9L, (long) t.f1.t.f1);
    +		t = f1tf1.set(t, 12L);
    +		assertEquals(12L, (long) f1tf1.get(t));
    +		assertEquals(12L, (long) t.f1.t.f1);
    +
    +		FieldAccessor<Tuple2<String, Foo>, String> f1tf0 = tpeInfo.getFieldAccessor("f1.t.f0",
null);
    +		assertEquals("ddd", f1tf0.get(t));
    +		assertEquals("ddd", t.f1.t.f0);
    +		t = f1tf0.set(t, "alma");
    +		assertEquals("alma", f1tf0.get(t));
    +		assertEquals("alma", t.f1.t.f0);
    +
    +		FieldAccessor<Tuple2<String, Foo>, Foo> f1 = tpeInfo.getFieldAccessor("f1",
null);
    +		FieldAccessor<Tuple2<String, Foo>, Foo> f1n = tpeInfo.getFieldAccessor(1,
null);
    +		assertEquals(Tuple2.of("alma", 12L), f1.get(t).t);
    +		assertEquals(Tuple2.of("alma", 12L), f1n.get(t).t);
    +		assertEquals(Tuple2.of("alma", 12L), t.f1.t);
    +		Foo newFoo = new Foo(8, Tuple2.of("ddd", 9L), (short) 2);
    +		f1.set(t, newFoo);
    +		assertEquals(newFoo, f1.get(t));
    +		assertEquals(newFoo, f1n.get(t));
    +		assertEquals(newFoo, t.f1);
    +	}
    +
    +
    +	public static class Inner {
    +		public long x;
    +		public boolean b;
    +
    +		public Inner(){}
    +
    +		public Inner(long x) {
    +			this.x = x;
    +		}
    +
    +		public Inner(long x, boolean b) {
    +			this.x = x;
    +			this.b = b;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return ((Long)x).toString() + ", " + b;
    +		}
    +	}
    +
    +	public static class Outer {
    +		public int a;
    +		public Inner i;
    +		public short b;
    +
    +		public Outer(){}
    +
    +		public Outer(int a, Inner i, short b) {
    +			this.a = a;
    +			this.i = i;
    +			this.b = b;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return a+", "+i.toString()+", "+b;
    +		}
    +	}
    +
    +	@Test
    +	public void testPojoInPojo() {
    +		Outer o = new Outer(10, new Inner(4L), (short)12);
    +		PojoTypeInfo<Outer> tpeInfo = (PojoTypeInfo<Outer>)TypeInformation.of(Outer.class);
    +
    +		FieldAccessor<Outer, Long> fix = tpeInfo.getFieldAccessor("i.x", null);
    --- End diff --
    
    Unfortunately, we do need it. `ProductFieldAccessor` uses a serializer for copying (which
is needed because of the immutability), and `createSerializer` expects the `ExecutionConfig`.


> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -------------------------------------------------------------
>
>                 Key: FLINK-3702
>                 URL: https://issues.apache.org/jira/browse/FLINK-3702
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>    Affects Versions: 1.0.0
>            Reporter: Robert Metzger
>            Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar methods) doesn't
support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message