flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject flink git commit: [FLINK-3309] [py] Resolve Maven warnings
Date Tue, 08 Mar 2016 10:52:55 GMT
Repository: flink
Updated Branches:
  refs/heads/master d3808c7bc -> 945fc023a


[FLINK-3309] [py] Resolve Maven warnings


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/945fc023
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/945fc023
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/945fc023

Branch: refs/heads/master
Commit: 945fc023acc724cceeac500cca05fa446f88adec
Parents: d3808c7
Author: zentol <s.motsu@web.de>
Authored: Mon Feb 1 12:07:50 2016 +0100
Committer: zentol <s.motsu@web.de>
Committed: Tue Mar 8 11:52:26 2016 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonPlanBinder.java      | 36 +++++++++++++++-----
 .../api/functions/util/NestedKeyDiscarder.java  |  1 +
 .../api/functions/util/SerializerMap.java       |  3 +-
 .../api/streaming/data/PythonReceiver.java      |  2 +-
 .../python/api/streaming/data/PythonSender.java |  2 ++
 .../api/streaming/plan/PythonPlanSender.java    |  3 +-
 .../api/streaming/util/SerializationUtils.java  |  3 +-
 7 files changed, 38 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/945fc023/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 289c84b..f91237f 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -91,7 +91,7 @@ public class PythonPlanBinder {
 	private static String FLINK_HDFS_PATH = "hdfs:/tmp";
 	public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator
+ "flink_data";
 
-	private HashMap<Integer, Object> sets = new HashMap();
+	private HashMap<Integer, Object> sets = new HashMap<>();
 	public ExecutionEnvironment env;
 	private PythonPlanStreamer streamer;
 
@@ -386,6 +386,7 @@ public class PythonPlanBinder {
 		return info.parallelism == -1 ? env.getParallelism() : info.parallelism;
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createCsvSource(PythonOperationInfo info) throws IOException {
 		if (!(info.types instanceof TupleTypeInfo)) {
 			throw new RuntimeException("The output type of a csv source has to be a tuple. The derived
type is " + info);
@@ -395,36 +396,39 @@ public class PythonPlanBinder {
 		String fieldD = info.fieldDelimiter;
 		TupleTypeInfo<?> types = (TupleTypeInfo) info.types;
 		sets.put(info.setID, env.createInput(new TupleCsvInputFormat(path, lineD, fieldD, types),
info.types).setParallelism(getParallelism(info)).name("CsvSource")
-				.map(new SerializerMap()).setParallelism(getParallelism(info)).name("CsvSourcePostStep"));
+				.map(new SerializerMap<>()).setParallelism(getParallelism(info)).name("CsvSourcePostStep"));
 	}
 
 	private void createTextSource(PythonOperationInfo info) throws IOException {
 		sets.put(info.setID, env.readTextFile(info.path).setParallelism(getParallelism(info)).name("TextSource")
-				.map(new SerializerMap()).setParallelism(getParallelism(info)).name("TextSourcePostStep"));
+				.map(new SerializerMap<String>()).setParallelism(getParallelism(info)).name("TextSourcePostStep"));
 	}
 
 	private void createValueSource(PythonOperationInfo info) throws IOException {
 		sets.put(info.setID, env.fromElements(info.values).setParallelism(getParallelism(info)).name("ValueSource")
-				.map(new SerializerMap()).setParallelism(getParallelism(info)).name("ValueSourcePostStep"));
+				.map(new SerializerMap<>()).setParallelism(getParallelism(info)).name("ValueSourcePostStep"));
 	}
 
 	private void createSequenceSource(PythonOperationInfo info) throws IOException {
 		sets.put(info.setID, env.generateSequence(info.from, info.to).setParallelism(getParallelism(info)).name("SequenceSource")
-				.map(new SerializerMap()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep"));
+				.map(new SerializerMap<Long>()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep"));
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createCsvSink(PythonOperationInfo info) throws IOException {
 		DataSet parent = (DataSet) sets.get(info.parentID);
 		parent.map(new StringTupleDeserializerMap()).setParallelism(getParallelism(info)).name("CsvSinkPreStep")
 				.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(getParallelism(info)).name("CsvSink");
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createTextSink(PythonOperationInfo info) throws IOException {
 		DataSet parent = (DataSet) sets.get(info.parentID);
 		parent.map(new StringDeserializerMap()).setParallelism(getParallelism(info))
 			.writeAsText(info.path, info.writeMode).setParallelism(getParallelism(info)).name("TextSink");
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createPrintSink(PythonOperationInfo info) throws IOException {
 		DataSet parent = (DataSet) sets.get(info.parentID);
 		parent.map(new StringDeserializerMap()).setParallelism(getParallelism(info)).name("PrintSinkPreStep")
@@ -432,11 +436,11 @@ public class PythonPlanBinder {
 	}
 
 	private void createBroadcastVariable(PythonOperationInfo info) throws IOException {
-		UdfOperator op1 = (UdfOperator) sets.get(info.parentID);
-		DataSet op2 = (DataSet) sets.get(info.otherID);
+		UdfOperator<?> op1 = (UdfOperator) sets.get(info.parentID);
+		DataSet<?> op2 = (DataSet) sets.get(info.otherID);
 
 		op1.withBroadcastSet(op2, info.name);
-		Configuration c = ((UdfOperator) op1).getParameters();
+		Configuration c = op1.getParameters();
 
 		if (c == null) {
 			c = new Configuration();
@@ -460,6 +464,7 @@ public class PythonPlanBinder {
 		sets.put(info.setID, ao.setParallelism(getParallelism(info)).name("Aggregation"));
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createDistinctOperation(PythonOperationInfo info) throws IOException {
 		DataSet op = (DataSet) sets.get(info.parentID);
 		sets.put(info.setID, op.distinct(info.keys).setParallelism(getParallelism(info)).name("Distinct")
@@ -476,6 +481,7 @@ public class PythonPlanBinder {
 		sets.put(info.setID, op1.groupBy(info.keys));
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createHashPartitionOperation(PythonOperationInfo info) throws IOException {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
 		sets.put(info.setID, op1.partitionByHash(info.keys).setParallelism(getParallelism(info))
@@ -498,12 +504,14 @@ public class PythonPlanBinder {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createUnionOperation(PythonOperationInfo info) throws IOException {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
 		DataSet op2 = (DataSet) sets.get(info.otherID);
 		sets.put(info.setID, op1.union(op2).setParallelism(getParallelism(info)).name("Union"));
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createCoGroupOperation(PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
 		DataSet op2 = (DataSet) sets.get(info.otherID);
@@ -513,6 +521,7 @@ public class PythonPlanBinder {
 		sets.put(info.setID, new CoGroupRawOperator(op1, op2, key1, key2, pcg, info.types, info.name).setParallelism(getParallelism(info)));
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createCrossOperation(DatasizeHint mode, PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
 		DataSet op2 = (DataSet) sets.get(info.otherID);
@@ -540,11 +549,13 @@ public class PythonPlanBinder {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createFilterOperation(PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
 		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name));
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createFlatMapOperation(PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
 		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name));
@@ -565,21 +576,25 @@ public class PythonPlanBinder {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	private DataSet applyGroupReduceOperation(DataSet op1, PythonOperationInfo info) {
 		return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(info))
 				.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name);
 	}
 
+	@SuppressWarnings("unchecked")
 	private DataSet applyGroupReduceOperation(UnsortedGrouping op1, PythonOperationInfo info)
{
 		return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
 				.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name);
 	}
 
+	@SuppressWarnings("unchecked")
 	private DataSet applyGroupReduceOperation(SortedGrouping op1, PythonOperationInfo info)
{
 		return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
 				.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name);
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createJoinOperation(DatasizeHint mode, PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
 		DataSet op2 = (DataSet) sets.get(info.otherID);
@@ -592,6 +607,7 @@ public class PythonPlanBinder {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	private DataSet createDefaultJoin(DataSet op1, DataSet op2, String[] firstKeys, String[]
secondKeys, DatasizeHint mode, int parallelism) {
 		switch (mode) {
 			case NONE:
@@ -608,11 +624,13 @@ public class PythonPlanBinder {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createMapOperation(PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
 		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name));
 	}
 
+	@SuppressWarnings("unchecked")
 	private void createMapPartitionOperation(PythonOperationInfo info) {
 		DataSet op1 = (DataSet) sets.get(info.parentID);
 		sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name));
@@ -629,11 +647,13 @@ public class PythonPlanBinder {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	private DataSet applyReduceOperation(DataSet op1, PythonOperationInfo info) {
 		return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
 				.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name);
 	}
 
+	@SuppressWarnings("unchecked")
 	private DataSet applyReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) {
 		return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
 				.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name);

http://git-wip-us.apache.org/repos/asf/flink/blob/945fc023/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
index d59eb73..4c8511e 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java
@@ -23,6 +23,7 @@ Utility function to extract values from 2 Key-Value Tuples after a DefaultJoin.
 @ForwardedFields("f0.f1->f0; f1.f1->f1")
 public class NestedKeyDiscarder<IN> implements MapFunction<IN, Tuple2<byte[],
byte[]>> {
 	@Override
+	@SuppressWarnings("unchecked")
 	public Tuple2<byte[], byte[]> map(IN value) throws Exception {
 		Tuple2<Tuple2<Tuple, byte[]>, Tuple2<Tuple, byte[]>> x = (Tuple2<Tuple2<Tuple,
byte[]>, Tuple2<Tuple, byte[]>>) value;
 		return new Tuple2<>(x.f0.f1, x.f1.f1);

http://git-wip-us.apache.org/repos/asf/flink/blob/945fc023/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
index fba83f9..9c39e5f 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java
@@ -20,9 +20,10 @@ import org.apache.flink.python.api.streaming.util.SerializationUtils.Serializer;
 Utility function to serialize values, usually directly from data sources.
 */
 public class SerializerMap<IN> implements MapFunction<IN, byte[]> {
-	private Serializer serializer = null;
+	private Serializer<IN> serializer = null;
 
 	@Override
+	@SuppressWarnings("unchecked")
 	public byte[] map(IN value) throws Exception {
 		if (serializer == null) {
 			serializer = SerializationUtils.getSerializer(value);

http://git-wip-us.apache.org/repos/asf/flink/blob/945fc023/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
index 3ee0fde..83de746 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
@@ -124,7 +124,7 @@ public class PythonReceiver implements Serializable {
 			}
 			byte[] value = new byte[fileBuffer.getInt()];
 			fileBuffer.get(value);
-			return new Tuple2(keys, value);
+			return new Tuple2<>(keys, value);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/945fc023/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
index 3cd5f4d..c371e9d 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
@@ -95,6 +95,7 @@ public class PythonSender<IN> implements Serializable {
 	 * @return size of the written buffer
 	 * @throws IOException
 	 */
+	@SuppressWarnings("unchecked")
 	public int sendRecord(Object value) throws IOException {
 		fileBuffer.clear();
 		int group = 0;
@@ -126,6 +127,7 @@ public class PythonSender<IN> implements Serializable {
 	 * @return size of the written buffer
 	 * @throws IOException
 	 */
+	@SuppressWarnings("unchecked")
 	public int sendBuffer(Iterator i, int group) throws IOException {
 		fileBuffer.clear();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/945fc023/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
index 7b6b68a..2f95822 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
@@ -28,8 +28,9 @@ public class PythonPlanSender implements Serializable {
 		this.output = new DataOutputStream(output);
 	}
 
+	@SuppressWarnings("unchecked")
 	public void sendRecord(Object record) throws IOException {
 		byte[] data = SerializationUtils.getSerializer(record).serialize(record);
 		output.write(data);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/945fc023/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
index 6c83a61..fce69fa 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java
@@ -244,9 +244,10 @@ public class SerializationUtils {
 			}
 		}
 
+		@SuppressWarnings("unchecked")
 		@Override
 		public byte[] serializeWithoutTypeInfo(Tuple value) {
-			ArrayList<byte[]> bits = new ArrayList();
+			ArrayList<byte[]> bits = new ArrayList<>();
 
 			int totalSize = 0;
 			for (int x = 0; x < serializer.length; x++) {


Mime
View raw message