flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-3933] [streaming API] Add AbstractDeserializationSchema that handles produced type extraction.
Date Fri, 20 May 2016 08:00:43 GMT
Repository: flink
Updated Branches:
  refs/heads/master 010265b64 -> 92e1c82cc


[FLINK-3933] [streaming API] Add AbstractDeserializationSchema that handles produced type
extraction.

The AbstractDeserializationSchema creates the produced type information automatically from
the
generic parameters.

This closes #2010


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92e1c82c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92e1c82c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92e1c82c

Branch: refs/heads/master
Commit: 92e1c82cc545b80c3f82e01a97708aa8d70b3806
Parents: 010265b
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu May 19 12:37:05 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri May 20 09:56:31 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/kafka.md         |  19 ++-
 .../flink/api/java/typeutils/TypeExtractor.java |   4 +-
 .../AbstractDeserializationSchema.java          |  68 +++++++++++
 .../serialization/DeserializationSchema.java    |   3 +
 .../util/serialization/SimpleStringSchema.java  |   3 +-
 .../util/AbstractDeserializationSchemaTest.java | 116 +++++++++++++++++++
 6 files changed, 203 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/docs/apis/streaming/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kafka.md b/docs/apis/streaming/connectors/kafka.md
index da3f86c..45ec6ad 100644
--- a/docs/apis/streaming/connectors/kafka.md
+++ b/docs/apis/streaming/connectors/kafka.md
@@ -87,13 +87,13 @@ Then, import the connector in your maven project:
 
 Note that the streaming connectors are currently not part of the binary distribution. See
how to link with them for cluster execution [here]({{ site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 
-#### Installing Apache Kafka
+### Installing Apache Kafka
 
 * Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart)
to download the code and launch a server (launching a Zookeeper and a Kafka server is required
every time before starting the application).
 * On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in)
problem may occur.
 * If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name`
setting in the `config/server.properties` file must be set to the machine's IP address.
 
-#### Kafka Consumer
+### Kafka Consumer
 
 Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 0.9.0.x versions).
It provides access to one or more Kafka topics.
 
@@ -142,18 +142,25 @@ for querying the list of topics and partitions.
 For this to work, the consumer needs to be able to access the consumers from the machine
submitting the job to the Flink cluster.
 If you experience any issues with the Kafka consumer on the client side, the client log might
contain information about failed requests, etc.
 
-##### The `DeserializationSchema`
+#### The `DeserializationSchema`
 
-The `FlinkKafkaConsumer08` needs to know how to turn the data in Kafka into Java objects.
The 
+The Flink Kafka Consumer needs to know how to turn the binary data in Kafka into Java/Scala
objects. The 
 `DeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[]
message)`
 method gets called for each Kafka message, passing the value from Kafka.
 
+It is usually helpful to start from the `AbstractDeserializationSchema`, which takes care
of describing the
+produced Java/Scala type to Flink's type system. Users that implement a vanilla `DeserializationSchema`
need
+to implement the `getProducedType(...)` method themselves.
+
 For accessing both the key and value of the Kafka message, the `KeyedDeserializationSchema`
has
 the following deserialize method ` T deserialize(byte[] messageKey, byte[] message, String
topic, int partition, long offset)`.
 
 For convenience, Flink provides the following schemas:
+
 1. `TypeInformationSerializationSchema` (and `TypeInformationKeyValueSerializationSchema`)
which creates 
-    a schema based on a Flink `TypeInformation`.
+    a schema based on a Flink's `TypeInformation`. This is useful if the data is both written
and read by Flink.
+    This schema is a performant Flink-specific alternative to other generic serialization
approaches.
+ 
 2. `JsonDeserializationSchema` (and `JSONKeyValueDeserializationSchema`) which turns the
serialized JSON 
     into an ObjectNode object, from which fields can be accessed using objectNode.get("field").as(Int/String/...)().

     The KeyValue objectNode contains a "key" and "value" field which contain all fields,
as well as 
@@ -191,7 +198,7 @@ Flink on YARN supports automatic restart of lost YARN containers.
 
 If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets
to Zookeeper.
 
-#### Kafka Producer
+### Kafka Producer
 
 The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom
partitioner that assigns
 records to partitions.

http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index f2b9fd2..45420a2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -272,7 +272,7 @@ public class TypeExtractor {
 		}
 		return new TypeExtractor().privateCreateTypeInfo(InputFormat.class, inputFormatInterface.getClass(),
0, null, null);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Generic extraction methods
 	// --------------------------------------------------------------------------------------------
@@ -596,7 +596,7 @@ public class TypeExtractor {
 			}
 			
 			if(curT == Tuple0.class) {
-				return new TupleTypeInfo(Tuple0.class, new TypeInformation<?>[0]);
+				return new TupleTypeInfo(Tuple0.class);
 			}
 			
 			// check if immediate child of Tuple has generics

http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
new file mode 100644
index 0000000..77e76e5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema describes how to turn the byte messages delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
+ * processed by Flink.
+ * 
+ * <p>This base variant of the deserialization schema produces the type information
+ * automatically by extracting it from the generic class arguments.
+ * 
+ * @param <T> The type created by the deserialization schema.
+ */
+public abstract class AbstractDeserializationSchema<T> implements DeserializationSchema<T>
{
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * De-serializes the byte message.
+	 *
+	 * @param message The message, as a byte array.
+	 * @return The de-serialized message as an object.
+	 */
+	@Override
+	public abstract T deserialize(byte[] message) throws IOException;
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted.
+	 * 
+	 * <p>This default implementation returns always false, meaning the stream is interpreted
+	 * to be unbounded.
+	 *
+	 * @param nextElement The element to test for the end-of-stream signal.
+	 * @return True, if the element signals end of stream, false otherwise.
+	 */
+	@Override
+	public boolean isEndOfStream(T nextElement) {
+		return false;
+	}
+	
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 0,
null, null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index b376d69..2e27ba6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -28,6 +28,9 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
  * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
  * processed by Flink.
  * 
+ * <p>Note: In most cases, one should start from {@link AbstractDeserializationSchema},
which
+ * takes care of producing the return type information automatically.
+ * 
  * @param <T> The type created by the deserialization schema.
  */
 @Public

http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index a051d32..2de4c01 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -25,8 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
  * Very simple serialization schema for strings.
  */
 @PublicEvolving
-public class SimpleStringSchema implements DeserializationSchema<String>,
-		SerializationSchema<String> {
+public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String>
{
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92e1c82c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
new file mode 100644
index 0000000..220c1cd
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
+
+import org.codehaus.jackson.map.util.JSONPObject;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class AbstractDeserializationSchemaTest {
+
+	@Test
+	public void testTypeExtractionTuple() {
+		TypeInformation<Tuple2<byte[], byte[]>> type = new TupleSchema().getProducedType();
+		TypeInformation<Tuple2<byte[], byte[]>> expected = TypeInformation.of(new TypeHint<Tuple2<byte[],
byte[]>>(){});
+		assertEquals(expected, type);
+	}
+	
+	@Test
+	public void testTypeExtractionTupleAnonymous() {
+		TypeInformation<Tuple2<byte[], byte[]>> type = new AbstractDeserializationSchema<Tuple2<byte[],
byte[]>>() {
+			@Override
+			public Tuple2<byte[], byte[]> deserialize(byte[] message) throws IOException {
+				throw new UnsupportedOperationException();
+			}
+		}.getProducedType();
+		
+		TypeInformation<Tuple2<byte[], byte[]>> expected = TypeInformation.of(new TypeHint<Tuple2<byte[],
byte[]>>(){});
+		assertEquals(expected, type);
+	}
+
+	@Test
+	public void testTypeExtractionGeneric() {
+		TypeInformation<JSONPObject> type = new JsonSchema().getProducedType();
+		TypeInformation<JSONPObject> expected = TypeInformation.of(new TypeHint<JSONPObject>(){});
+		assertEquals(expected, type);
+	}
+
+	@Test
+	public void testTypeExtractionGenericAnonymous() {
+		TypeInformation<JSONPObject> type = new AbstractDeserializationSchema<JSONPObject>()
{
+			@Override
+			public JSONPObject deserialize(byte[] message) throws IOException {
+				throw new UnsupportedOperationException();
+			}
+		}.getProducedType();
+
+		TypeInformation<JSONPObject> expected = TypeInformation.of(new TypeHint<JSONPObject>(){});
+		assertEquals(expected, type);
+	}
+
+	@Test
+	public void testTypeExtractionRawException() {
+		try {
+			new RawSchema().getProducedType();
+			fail();
+		} catch (InvalidTypesException e) {
+			// expected
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Test types
+	// ------------------------------------------------------------------------
+
+	private static class TupleSchema extends AbstractDeserializationSchema<Tuple2<byte[],
byte[]>> {
+
+		@Override
+		public Tuple2<byte[], byte[]> deserialize(byte[] message) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
+	
+	private static class JsonSchema extends AbstractDeserializationSchema<JSONPObject>
{
+
+		@Override
+		public JSONPObject deserialize(byte[] message) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	@SuppressWarnings("rawtypes")
+	private static class RawSchema extends AbstractDeserializationSchema {
+
+		@Override
+		public Object deserialize(byte[] message) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
+}


Mime
View raw message