flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [07/23] git commit: [FLINK-1110] Fix MapOperator execution and added simple test
Date Fri, 03 Oct 2014 16:25:03 GMT
[FLINK-1110] Fix MapOperator execution and added simple test


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bb71b239
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bb71b239
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bb71b239

Branch: refs/heads/master
Commit: bb71b2398ec4ac7e0d75b70f3eba7186281f272f
Parents: d10d59e
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Sep 22 18:12:58 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Oct 3 16:22:33 2014 +0200

----------------------------------------------------------------------
 .../common/operators/base/MapOperatorBase.java  |   2 +-
 .../common/operators/base/MapOperatorTest.java  | 110 +++++++++++++++++++
 2 files changed, 111 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bb71b239/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
index 81d1fed..d545676 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
@@ -58,8 +58,8 @@ public class MapOperatorBase<IN, OUT, FT extends MapFunction<IN, OUT>>
extends S
 	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext
ctx) throws Exception {
 		MapFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
 		
-		FunctionUtils.openFunction(function, this.parameters);
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
+		FunctionUtils.openFunction(function, this.parameters);
 		
 		ArrayList<OUT> result = new ArrayList<OUT>(inputData.size());
 		for (IN element : inputData) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bb71b239/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
new file mode 100644
index 0000000..bb263ad
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import static org.junit.Assert.*;
+import static java.util.Arrays.asList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class MapOperatorTest implements java.io.Serializable {
+
+	@Test
+	public void testMapPlain() {
+		try {
+			final MapFunction<String, Integer> parser = new MapFunction<String, Integer>()
{
+				@Override
+				public Integer map(String value) {
+					return Integer.parseInt(value);
+				}
+			};
+			
+			MapOperatorBase<String, Integer, MapFunction<String, Integer>> op = new MapOperatorBase<String,
Integer, MapFunction<String,Integer>>(
+					parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO), "TestMapper");
+			
+			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5",
"6"));
+			List<Integer> result = op.executeOnCollections(input, null);
+			
+			assertEquals(asList(1, 2, 3, 4, 5, 6), result);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testMapWithRuntimeContext() {
+		try {
+			final String taskName = "Test Task";
+			final AtomicBoolean opened = new AtomicBoolean();
+			final AtomicBoolean closed = new AtomicBoolean();
+			
+			final MapFunction<String, Integer> parser = new RichMapFunction<String, Integer>()
{
+				
+				@Override
+				public void open(Configuration parameters) throws Exception {
+					opened.set(true);
+					RuntimeContext ctx = getRuntimeContext();
+					assertEquals(0, ctx.getIndexOfThisSubtask());
+					assertEquals(1, ctx.getNumberOfParallelSubtasks());
+					assertEquals(taskName, ctx.getTaskName());
+				}
+				
+				@Override
+				public Integer map(String value) {
+					return Integer.parseInt(value);
+				}
+				
+				@Override
+				public void close() throws Exception {
+					closed.set(true);
+				}
+			};
+			
+			MapOperatorBase<String, Integer, MapFunction<String, Integer>> op = new MapOperatorBase<String,
Integer, MapFunction<String,Integer>>(
+					parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO), taskName);
+			
+			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5",
"6"));
+			List<Integer> result = op.executeOnCollections(input, new RuntimeUDFContext(taskName,
1, 0));
+			
+			assertEquals(asList(1, 2, 3, 4, 5, 6), result);
+			
+			assertTrue(opened.get());
+			assertTrue(closed.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}


Mime
View raw message