flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [39/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package
Date Wed, 21 Oct 2015 09:03:55 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
deleted file mode 100644
index e5fa4c3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.aggregation;
-
-import java.io.Serializable;
-
-public abstract class SumFunction implements Serializable{
-
-	private static final long serialVersionUID = 1L;
-
-	public abstract Object add(Object o1, Object o2);
-
-	public static SumFunction getForClass(Class<?> clazz) {
-
-		if (clazz == Integer.class) {
-			return new IntSum();
-		} else if (clazz == Long.class) {
-			return new LongSum();
-		} else if (clazz == Short.class) {
-			return new ShortSum();
-		} else if (clazz == Double.class) {
-			return new DoubleSum();
-		} else if (clazz == Float.class) {
-			return new FloatSum();
-		} else if (clazz == Byte.class) {
-			return new ByteSum();
-		} else {
-			throw new RuntimeException("DataStream cannot be summed because the class "
-					+ clazz.getSimpleName() + " does not support the + operator.");
-		}
-	}
-
-	public static class IntSum extends SumFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (Integer) value1 + (Integer) value2;
-		}
-	}
-
-	public static class LongSum extends SumFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (Long) value1 + (Long) value2;
-		}
-	}
-
-	public static class DoubleSum extends SumFunction {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (Double) value1 + (Double) value2;
-		}
-	}
-
-	public static class ShortSum extends SumFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (short) ((Short) value1 + (Short) value2);
-		}
-	}
-
-	public static class FloatSum extends SumFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (Float) value1 + (Float) value2;
-		}
-	}
-
-	public static class ByteSum extends SumFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Object add(Object value1, Object value2) {
-			return (byte) ((Byte) value1 + (Byte) value2);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
deleted file mode 100644
index ae11cd9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.util.Collector;
-
-/**
- * A CoFlatMapFunction represents a FlatMap transformation with two different
- * input types.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
-
-	void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
-
-	void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
deleted file mode 100644
index a545282..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * A CoMapFunction represents a Map transformation with two different input
- * types.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
-
-	OUT map1(IN1 value) throws Exception;
-
-	OUT map2(IN2 value) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
deleted file mode 100644
index 6746140..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoFlatMapFunction represents a FlatMap transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
-		CoFlatMapFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
deleted file mode 100644
index e561408..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * A RichCoMapFunction represents a Map transformation with two different input
- * types. In addition to that the user can use the features provided by the
- * {@link RichFunction} interface.
- *
- * @param <IN1>
- *            Type of the first input.
- * @param <IN2>
- *            Type of the second input.
- * @param <OUT>
- *            Output type.
- */
-public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
-		CoMapFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
deleted file mode 100644
index 504bc39..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple implementation of the SinkFunction writing tuples in the specified
- * OutputFormat format. Tuples are collected to a list and written to the file
- * periodically. The target path and the overwrite mode are pre-packaged in
- * format.
- * 
- * @param <IN>
- *            Input type
- */
-public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class);
-	
-	protected ArrayList<IN> tupleList = new ArrayList<IN>();
-	protected volatile OutputFormat<IN> format;
-	protected volatile boolean cleanupCalled = false;
-	protected int indexInSubtaskGroup;
-	protected int currentNumberOfSubtasks;
-
-	public FileSinkFunction(OutputFormat<IN> format) {
-		this.format = format;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		RuntimeContext context = getRuntimeContext();
-		format.configure(parameters);
-		indexInSubtaskGroup = context.getIndexOfThisSubtask();
-		currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
-		format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
-	}
-
-	@Override
-	public void invoke(IN record) throws Exception {
-		tupleList.add(record);
-		if (updateCondition()) {
-			flush();
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		if (!tupleList.isEmpty()) {
-			flush();
-		}
-		try {
-			format.close();
-		} catch (Exception ex) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Error while writing element.", ex);
-			}
-			try {
-				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
-					cleanupCalled = true;
-					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
-				}
-			} catch (Throwable t) {
-				LOG.error("Cleanup on error failed.", t);
-			}
-		}
-	}
-
-	protected void flush() {
-		try {
-			for (IN rec : tupleList) {
-				format.writeRecord(rec);
-			}
-		} catch (Exception ex) {
-			try {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Error while writing element.", ex);
-				}
-				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
-					cleanupCalled = true;
-					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
-				}
-			} catch (Throwable t) {
-				LOG.error("Cleanup on error failed.", t);
-			}
-			throw new RuntimeException(ex);
-		}
-		resetParameters();
-	}
-
-	/**
-	 * Condition for writing the contents of tupleList and clearing it.
-	 * 
-	 * @return value of the updating condition
-	 */
-	protected abstract boolean updateCondition();
-
-	/**
-	 * Statements to be executed after writing a batch goes here.
-	 */
-	protected abstract void resetParameters();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
deleted file mode 100644
index 86bbb53..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import org.apache.flink.api.common.io.OutputFormat;
-
-/**
- * Implementation of FileSinkFunction. Writes tuples to file in every millis
- * milliseconds.
- * 
- * @param <IN>
- *            Input type
- */
-public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private final long millis;
-	private long lastTime;
-
-	public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) {
-		super(format);
-		this.millis = millis;
-		lastTime = System.currentTimeMillis();
-	}
-
-	/**
-	 * Condition for writing the contents of tupleList and clearing it.
-	 * 
-	 * @return value of the updating condition
-	 */
-	@Override
-	protected boolean updateCondition() {
-		return System.currentTimeMillis() - lastTime >= millis;
-	}
-
-	/**
-	 * Statements to be executed after writing a batch goes here.
-	 */
-	@Override
-	protected void resetParameters() {
-		tupleList.clear();
-		lastTime = System.currentTimeMillis();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
deleted file mode 100644
index 93a91cd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.PrintStream;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-/**
- * Implementation of the SinkFunction writing every tuple to the standard
- * output or standard error stream.
- * 
- * @param <IN>
- *            Input record type
- */
-public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private static final boolean STD_OUT = false;
-	private static final boolean STD_ERR = true;
-	
-	private boolean target; 
-	private transient PrintStream stream;
-	private transient String prefix;
-	
-	/**
-	 * Instantiates a print sink function that prints to standard out.
-	 */
-	public PrintSinkFunction() {}
-	
-	/**
-	 * Instantiates a print sink function that prints to standard out.
-	 * 
-	 * @param stdErr True, if the format should print to standard error instead of standard out.
-	 */
-	public PrintSinkFunction(boolean stdErr) {
-		target = stdErr;
-	}
-
-	public void setTargetToStandardOut() {
-		target = STD_OUT;
-	}
-	
-	public void setTargetToStandardErr() {
-		target = STD_ERR;
-	}
-	
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		// get the target stream
-		stream = target == STD_OUT ? System.out : System.err;
-		
-		// set the prefix if we have a >1 parallelism
-		prefix = (context.getNumberOfParallelSubtasks() > 1) ? 
-				((context.getIndexOfThisSubtask() + 1) + "> ") : null;
-	}
-
-	@Override
-	public void invoke(IN record) {
-		if (prefix != null) {
-			stream.println(prefix + record.toString());
-		}
-		else {
-			stream.println(record.toString());
-		}
-	}
-	
-	@Override
-	public void close() {
-		this.stream = null;
-		this.prefix = null;
-	}
-	
-	@Override
-	public String toString() {
-		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
deleted file mode 100644
index 7853758..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	public abstract void invoke(IN value) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
deleted file mode 100644
index 21308ed..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * Interface for implementing user defined sink functionality.
- *
- * @param <IN> Input type parameter.
- */
-public interface SinkFunction<IN> extends Function, Serializable {
-
-	/**
-	 * Function for standard sink behaviour. This function is called for every record.
-	 *
-	 * @param value The input record.
-	 * @throws Exception
-	 */
-	void invoke(IN value) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
deleted file mode 100644
index 1356263..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *	http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
- * <p>
- * The sink can be set to retry message sends after the sending failed.
- * <p>
- * The sink can be set to 'autoflush', in which case the socket stream is flushed after every message. This
- * significantly reduced throughput, but also decreases message latency.
- *
- * @param <IN> data to be written into the Socket.
- */
-public class SocketClientSink<IN> extends RichSinkFunction<IN> {
-
-	private static final long serialVersionUID = 1L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
-
-	private static final int CONNECTION_RETRY_DELAY = 500;
-	
-	
-	private final SerializableObject lock = new SerializableObject();
-	private final SerializationSchema<IN, byte[]> schema;
-	private final String hostName;
-	private final int port;
-	private final int maxNumRetries;
-	private final boolean autoFlush;
-	
-	private transient Socket client;
-	private transient OutputStream outputStream;
-	
-	private int retries;
-
-	private volatile boolean isRunning = true;
-	
-	/**
-	 * Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
-	 * and will not auto-flush the stream.
-	 *
-	 * @param hostName Hostname of the server to connect to.
-	 * @param port Port of the server.
-	 * @param schema Schema used to serialize the data into bytes.
-	 */
-	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
-		this(hostName, port, schema, 0);
-	}
-
-	/**
-	 * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
-	 * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
-	 * The sink will not auto-flush the stream.
-	 *
-	 * @param hostName Hostname of the server to connect to.
-	 * @param port Port of the server.
-	 * @param schema Schema used to serialize the data into bytes.
-	 * @param maxNumRetries The maximum number of retries after a message send failed.
-	 */
-	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, int maxNumRetries) {
-		this(hostName, port, schema, maxNumRetries, false);
-	}
-
-	/**
-	 * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
-	 * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
-	 *
-	 * @param hostName Hostname of the server to connect to.
-	 * @param port Port of the server.
-	 * @param schema Schema used to serialize the data into bytes.
-	 * @param maxNumRetries The maximum number of retries after a message send failed.
-	 * @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
-	 */
-	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema,
-							int maxNumRetries, boolean autoflush)
-	{
-		checkArgument(port > 0 && port < 65536, "port is out of range");
-		checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
-
-		this.hostName = checkNotNull(hostName, "hostname must not be null");
-		this.port = port;
-		this.schema = checkNotNull(schema);
-		this.maxNumRetries = maxNumRetries;
-		this.autoFlush = autoflush;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Life cycle
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Initialize the connection with the Socket in the server.
-	 * @param parameters Configuration.
-	 */
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		try {
-			synchronized (lock) {
-				createConnection();
-			}
-		}
-		catch (IOException e) {
-			throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
-		}
-	}
-	
-	
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Socket.
-	 *
-	 * @param value The value to write to the socket.
-	 */
-	@Override
-	public void invoke(IN value) throws Exception {
-		byte[] msg = schema.serialize(value);
-
-		try {
-			outputStream.write(msg);
-			if (autoFlush) {
-				outputStream.flush();
-			}
-		}
-		catch (IOException e) {
-			// if no re-tries are enable, fail immediately
-			if (maxNumRetries == 0) {
-				throw new IOException("Failed to send message '" + value + "' to socket server at "
-						+ hostName + ":" + port + ". Connection re-tries are not enabled.", e);
-			}
-			
-			LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port + 
-					". Trying to reconnect..." , e);
-			
-			// do the retries in locked scope, to guard against concurrent close() calls
-			// note that the first re-try comes immediately, without a wait!
-		
-			synchronized (lock) {
-				IOException lastException = null;
-				retries = 0;
-				
-				while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {
-					
-					// first, clean up the old resources
-					try {
-						if (outputStream != null) {
-							outputStream.close();
-						}
-					}
-					catch (IOException ee) {
-						LOG.error("Could not close output stream from failed write attempt", ee);
-					}
-					try {
-						if (client != null) {
-							client.close();
-						}
-					}
-					catch (IOException ee) {
-						LOG.error("Could not close socket from failed write attempt", ee);
-					}
-					
-					// try again
-					retries++;
-					
-					try {
-						// initialize a new connection
-						createConnection();
-						
-						// re-try the write
-						outputStream.write(msg);
-						
-						// success!
-						return;
-					}
-					catch (IOException ee) {
-						lastException = ee;
-						LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee);
-					}
-
-					// wait before re-attempting to connect
-					lock.wait(CONNECTION_RETRY_DELAY);
-				}
-				
-				// throw an exception if the task is still running, otherwise simply leave the method
-				if (isRunning) {
-					throw new IOException("Failed to send message '" + value + "' to socket server at "
-							+ hostName + ":" + port + ". Failed after " + retries + " retries.", lastException);
-				}
-			}
-		}
-	}
-	
-	/**
-	 * Closes the connection with the Socket server.
-	 */
-	@Override
-	public void close() throws Exception {
-		// flag this as not running any more
-		isRunning = false;
-		
-		// clean up in locked scope, so there is no concurrent change to the stream and client
-		synchronized (lock) {
-			// we notify first (this statement cannot fail). The notified thread will not continue
-			// anyways before it can re-acquire the lock
-			lock.notifyAll();
-			
-			try {
-				if (outputStream != null) {
-					outputStream.close();
-				}
-			}
-			finally {
-				if (client != null) {
-					client.close();
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-	
-	private void createConnection() throws IOException {
-		client = new Socket(hostName, port);
-		client.setKeepAlive(true);
-		client.setTcpNoDelay(true);
-		
-		outputStream = client.getOutputStream();
-	}
-	
-	// ------------------------------------------------------------------------
-	//  For testing
-	// ------------------------------------------------------------------------
-	
-	int getCurrentNumberOfRetries() {
-		synchronized (lock) {
-			return retries;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
deleted file mode 100644
index 019d35f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-
-/**
- * Abstract class for formatting the output of the writeAsText and writeAsCsv
- * functions.
- *
- * @param <IN>
- *            Input tuple type
- */
-public abstract class WriteFormat<IN> implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Writes the contents of tupleList to the file specified by path.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 * @param tupleList
-	 *            is the list of tuples to be written
-	 */
-	protected abstract void write(String path, ArrayList<IN> tupleList);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
deleted file mode 100644
index bfae653..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Writes tuples in csv format.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	protected void write(String path, ArrayList<IN> tupleList) {
-		try {
-			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
-			for (IN tupleToWrite : tupleList) {
-				String strTuple = tupleToWrite.toString();
-				outStream.println(strTuple.substring(1, strTuple.length() - 1));
-			}
-			outStream.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Exception occured while writing file " + path, e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
deleted file mode 100644
index 03fcb5c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Writes tuples in text format.
- *
- * @param <IN>
- *            Input tuple type
- */
-public class WriteFormatAsText<IN> extends WriteFormat<IN> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void write(String path, ArrayList<IN> tupleList) {
-		try {
-			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
-			for (IN tupleToWrite : tupleList) {
-				outStream.println(tupleToWrite);
-			}
-			outStream.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Exception occured while writing file " + path, e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
deleted file mode 100644
index 27c352f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-
-/**
- * Simple implementation of the SinkFunction writing tuples as simple text to
- * the file specified by path. Tuples are collected to a list and written to the
- * file periodically. The file specified by path is created if it does not
- * exist, cleared if it exists before the writing.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	protected final String path;
-	protected ArrayList<IN> tupleList = new ArrayList<IN>();
-	protected WriteFormat<IN> format;
-
-	public WriteSinkFunction(String path, WriteFormat<IN> format) {
-		this.path = path;
-		this.format = format;
-		cleanFile(path);
-	}
-
-	/**
-	 * Creates target file if it does not exist, cleans it if it exists.
-	 * 
-	 * @param path
-	 *            is the path to the location where the tuples are written
-	 */
-	protected void cleanFile(String path) {
-		try {
-			PrintWriter writer;
-			writer = new PrintWriter(path);
-			writer.print("");
-			writer.close();
-		} catch (FileNotFoundException e) {
-			throw new RuntimeException("An error occurred while cleaning the file: " + e.getMessage(), e);
-		}
-	}
-
-	/**
-	 * Condition for writing the contents of tupleList and clearing it.
-	 * 
-	 * @return value of the updating condition
-	 */
-	protected abstract boolean updateCondition();
-
-	/**
-	 * Statements to be executed after writing a batch goes here.
-	 */
-	protected abstract void resetParameters();
-
-	/**
-	 * Implementation of the invoke method of the SinkFunction class. Collects
-	 * the incoming tuples in tupleList and appends the list to the end of the
-	 * target file if updateCondition() is true or the current tuple is the
-	 * endTuple.
-	 */
-	@Override
-	public void invoke(IN tuple) {
-
-		tupleList.add(tuple);
-		if (updateCondition()) {
-			format.write(path, tupleList);
-			resetParameters();
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
deleted file mode 100644
index 0364174..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink;
-
-/**
- * Implementation of WriteSinkFunction. Writes tuples to file in every millis
- * milliseconds.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private final long millis;
-	private long lastTime;
-
-	public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis) {
-		super(path, format);
-		this.millis = millis;
-		lastTime = System.currentTimeMillis();
-	}
-
-	@Override
-	protected boolean updateCondition() {
-		return System.currentTimeMillis() - lastTime >= millis;
-	}
-
-	@Override
-	protected void resetParameters() {
-		tupleList.clear();
-		lastTime = System.currentTimeMillis();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
deleted file mode 100644
index 0d107f6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT> {
-
-	private static final long serialVersionUID = 1L;
-	
-	protected DeserializationSchema<OUT> schema;
-
-	public ConnectorSource(DeserializationSchema<OUT> schema) {
-		this.schema = schema;
-	}
-
-	@Override
-	public TypeInformation<OUT> getProducedType() {
-		return schema.getProducedType();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
deleted file mode 100644
index ab380d7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.source;
-
-
-/**
- * A marker interface that must be implemented by {@link SourceFunction}s that emit elements with
- * timestamps. The {@link SourceFunction} can extract the timestamp from the data and attach it to
- * the element upon emission.
- *
- * <p>
- * Event-time sources must manually emit
- * {@link org.apache.flink.streaming.api.watermark.Watermark watermarks} to keep track of progress.
- * Automatic emission of watermarks will be suppressed if a source implements this interface.
- *
- * <p>
- * Elements must be emitted using
- * {@link SourceFunction.SourceContext#collectWithTimestamp(Object, long)}
- * and watermarks can be emitted using
- * {@link SourceFunction.SourceContext#emitWatermark(org.apache.flink.streaming.api.watermark.Watermark)}.
- *
- * @param <T> Type of the elements emitted by this source.
- */
-public interface EventTimeSourceFunction<T> extends SourceFunction<T> { }

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
deleted file mode 100644
index a217923..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class);
-
-	public enum WatchType {
-		ONLY_NEW_FILES, // Only new files will be processed.
-		REPROCESS_WITH_APPENDED, // When some files are appended, all contents
-									// of the files will be processed.
-		PROCESS_ONLY_APPENDED // When some files are appended, only appended
-								// contents will be processed.
-	}
-
-	private String path;
-	private long interval;
-	private WatchType watchType;
-
-	private Map<String, Long> offsetOfFiles;
-	private Map<String, Long> modificationTimes;
-
-	private volatile boolean isRunning = true;
-
-	public FileMonitoringFunction(String path, long interval, WatchType watchType) {
-		this.path = path;
-		this.interval = interval;
-		this.watchType = watchType;
-		this.modificationTimes = new HashMap<String, Long>();
-		this.offsetOfFiles = new HashMap<String, Long>();
-	}
-
-	@Override
-	public void run(SourceContext<Tuple3<String, Long, Long>> ctx) throws Exception {
-		FileSystem fileSystem = FileSystem.get(new URI(path));
-
-		while (isRunning) {
-			List<String> files = listNewFiles(fileSystem);
-			for (String filePath : files) {
-				if (watchType == WatchType.ONLY_NEW_FILES
-						|| watchType == WatchType.REPROCESS_WITH_APPENDED) {
-					ctx.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
-					offsetOfFiles.put(filePath, -1L);
-				} else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
-					long offset = 0;
-					long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
-					if (offsetOfFiles.containsKey(filePath)) {
-						offset = offsetOfFiles.get(filePath);
-					}
-
-					ctx.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
-					offsetOfFiles.put(filePath, fileSize);
-
-					LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);
-				}
-			}
-
-			Thread.sleep(interval);
-		}
-	}
-
-	private List<String> listNewFiles(FileSystem fileSystem) throws IOException {
-		List<String> files = new ArrayList<String>();
-
-		FileStatus[] statuses = fileSystem.listStatus(new Path(path));
-
-		if (statuses == null) {
-			LOG.warn("Path does not exist: {}", path);
-		} else {
-			for (FileStatus status : statuses) {
-				Path filePath = status.getPath();
-				String fileName = filePath.getName();
-				long modificationTime = status.getModificationTime();
-
-				if (!isFiltered(fileName, modificationTime)) {
-					files.add(filePath.toString());
-					modificationTimes.put(fileName, modificationTime);
-				}
-			}
-		}
-
-		return files;
-	}
-
-	private boolean isFiltered(String fileName, long modificationTime) {
-
-		if ((watchType == WatchType.ONLY_NEW_FILES && modificationTimes.containsKey(fileName))
-				|| fileName.startsWith(".") || fileName.contains("_COPYING_")) {
-			return true;
-		} else {
-			Long lastModification = modificationTimes.get(fileName);
-			return lastModification != null && lastModification >= modificationTime;
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
deleted file mode 100644
index 4f859e8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.URI;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-
-public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void flatMap(Tuple3<String, Long, Long> value, Collector<String> out) throws Exception {
-		FSDataInputStream stream = FileSystem.get(new URI(value.f0)).open(new Path(value.f0));
-		stream.seek(value.f1);
-
-		BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
-		String line;
-
-		try {
-			while ((line = reader.readLine()) != null && (value.f2 == -1L || stream.getPos() <= value.f2)) {
-				out.collect(line);
-			}
-		} finally {
-			reader.close();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
deleted file mode 100644
index cc3925c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private TypeInformation<OUT> typeInfo;
-	private transient TypeSerializer<OUT> serializer;
-
-	private InputFormat<OUT, InputSplit> format;
-
-	private transient InputSplitProvider provider;
-	private transient Iterator<InputSplit> splitIterator;
-
-	private volatile boolean isRunning = true;
-
-	@SuppressWarnings("unchecked")
-	public FileSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
-		this.format = (InputFormat<OUT, InputSplit>) format;
-		this.typeInfo = typeInfo;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void open(Configuration parameters) throws Exception {
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		this.provider = context.getInputSplitProvider();
-		
-		format.configure(parameters);
-		serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
-
-		splitIterator = getInputSplits();
-		if (splitIterator.hasNext()) {
-			format.open(splitIterator.next());
-		}
-		isRunning = true;
-	}
-
-	@Override
-	public void close() throws Exception {
-		format.close();
-	}
-
-	private Iterator<InputSplit> getInputSplits() {
-
-		return new Iterator<InputSplit>() {
-
-			private InputSplit nextSplit;
-
-			private boolean exhausted;
-
-			@Override
-			public boolean hasNext() {
-				if (exhausted) {
-					return false;
-				}
-
-				if (nextSplit != null) {
-					return true;
-				}
-
-				InputSplit split = provider.getNextInputSplit();
-
-				if (split != null) {
-					this.nextSplit = split;
-					return true;
-				} else {
-					exhausted = true;
-					return false;
-				}
-			}
-
-			@Override
-			public InputSplit next() {
-				if (this.nextSplit == null && !hasNext()) {
-					throw new NoSuchElementException();
-				}
-
-				final InputSplit tmp = this.nextSplit;
-				this.nextSplit = null;
-				return tmp;
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
-	}
-
-	@Override
-	public void run(SourceContext<OUT> ctx) throws Exception {
-		while (isRunning) {
-			OUT nextElement = serializer.createInstance();
-			nextElement =  format.nextRecord(nextElement);
-			if (nextElement == null && splitIterator.hasNext()) {
-				format.open(splitIterator.next());
-				continue;
-			} else if (nextElement == null) {
-				break;
-			}
-			ctx.collect(nextElement);
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-
-
-	/**
-	 * Returns the {@code InputFormat}. This is only needed because we need to set the input
-	 * split assigner on the {@code StreamGraph}.
-	 */
-	public InputFormat<OUT, InputSplit> getFormat() {
-		return format;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
deleted file mode 100644
index af47f59..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * A stream source function that returns a sequence of elements.
- * 
- * <p>Upon construction, this source function serializes the elements using Flink's type information.
- * That way, any object transport using Java serialization will not be affected by the serializability
- * of the elements.</p>
- * 
- * @param <T> The type of elements returned by this function.
- */
-public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedAsynchronously<Integer> {
-	
-	private static final long serialVersionUID = 1L;
-
-	/** The (de)serializer to be used for the data elements */
-	private final TypeSerializer<T> serializer;
-	
-	/** The actual data elements, in serialized form */
-	private final byte[] elementsSerialized;
-	
-	/** The number of serialized elements */
-	private final int numElements;
-
-	/** The number of elements emitted already */
-	private volatile int numElementsEmitted;
-
-	/** The number of elements to skip initially */
-	private volatile int numElementsToSkip;
-	
-	/** Flag to make the source cancelable */
-	private volatile boolean isRunning = true;
-
-	
-	public FromElementsFunction(TypeSerializer<T> serializer, T... elements) throws IOException {
-		this(serializer, Arrays.asList(elements));
-	}
-	
-	public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements) throws IOException {
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
-
-		int count = 0;
-		try {
-			for (T element : elements) {
-				serializer.serialize(element, wrapper);
-				count++;
-			}
-		}
-		catch (Exception e) {
-			throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
-		}
-
-		this.serializer = serializer;
-		this.elementsSerialized = baos.toByteArray();
-		this.numElements = count;
-	}
-
-	@Override
-	public void run(SourceContext<T> ctx) throws Exception {
-		ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
-		final DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
-		
-		// if we are restored from a checkpoint and need to skip elements, skip them now.
-		int toSkip = numElementsToSkip;
-		if (toSkip > 0) {
-			try {
-				while (toSkip > 0) {
-					serializer.deserialize(input);
-					toSkip--;
-				}
-			}
-			catch (Exception e) {
-				throw new IOException("Failed to deserialize an element from the source. " +
-						"If you are using user-defined serialization (Value and Writable types), check the " +
-						"serialization functions.\nSerializer is " + serializer);
-			}
-			
-			this.numElementsEmitted = this.numElementsToSkip;
-		}
-		
-		final Object lock = ctx.getCheckpointLock();
-		
-		while (isRunning && numElementsEmitted < numElements) {
-			T next;
-			try {
-				next = serializer.deserialize(input);
-			}
-			catch (Exception e) {
-				throw new IOException("Failed to deserialize an element from the source. " +
-						"If you are using user-defined serialization (Value and Writable types), check the " +
-						"serialization functions.\nSerializer is " + serializer);
-			}
-			
-			synchronized (lock) {
-				ctx.collect(next);
-				numElementsEmitted++;
-			}
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-
-
-	/**
-	 * Gets the number of elements produced in total by this function.
-	 * 
-	 * @return The number of elements produced in total.
-	 */
-	public int getNumElements() {
-		return numElements;
-	}
-
-	/**
-	 * Gets the number of elements emitted so far.
-	 * 
-	 * @return The number of elements emitted so far.
-	 */
-	public int getNumElementsEmitted() {
-		return numElementsEmitted;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Checkpointing
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-		return this.numElementsEmitted;
-	}
-
-	@Override
-	public void restoreState(Integer state) {
-		this.numElementsToSkip = state;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Verifies that all elements in the collection are non-null, and are of the given class, or
-	 * a subclass thereof.
-	 * 
-	 * @param elements The collection to check.
-	 * @param viewedAs The class to which the elements must be assignable to.
-	 * 
-	 * @param <OUT> The generic type of the collection to be checked.
-	 */
-	public static <OUT> void checkCollection(Collection<OUT> elements, Class<OUT> viewedAs) {
-		for (OUT elem : elements) {
-			if (elem == null) {
-				throw new IllegalArgumentException("The collection contains a null element");
-			}
-
-			if (!viewedAs.isAssignableFrom(elem.getClass())) {
-				throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
-						viewedAs.getCanonicalName());
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
deleted file mode 100644
index 655710e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import java.util.Iterator;
-
-public class FromIteratorFunction<T> implements SourceFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final Iterator<T> iterator;
-
-	private volatile boolean isRunning = true;
-
-	public FromIteratorFunction(Iterator<T> iterator) {
-		this.iterator = iterator;
-	}
-
-	@Override
-	public void run(SourceContext<T> ctx) throws Exception {
-		while (isRunning && iterator.hasNext()) {
-			ctx.collect(iterator.next());
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
deleted file mode 100644
index bc78e4d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.SplittableIterator;
-
-import java.util.Iterator;
-
-public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private SplittableIterator<T> fullIterator;
-
-	private transient Iterator<T> iterator;
-
-	private volatile boolean isRunning = true;
-
-	public FromSplittableIteratorFunction(SplittableIterator<T> iterator) {
-		this.fullIterator = iterator;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		int numberOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
-		int indexofThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
-		iterator = fullIterator.split(numberOfSubTasks)[indexofThisSubTask];
-		isRunning = true;
-	}
-
-	@Override
-	public void run(SourceContext<T> ctx) throws Exception {
-		while (isRunning && iterator.hasNext()) {
-			ctx.collect(iterator.next());
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
deleted file mode 100644
index 3ac63af..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.runtime.state.SerializedCheckpointData;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Abstract base class for data sources that receive elements from a message queue and
- * acknowledge them back by IDs.
- * <p>
- * The mechanism for this source assumes that messages are identified by a unique ID.
- * When messages are taken from the message queue, the message must not be dropped immediately,
- * but must be retained until acknowledged. Messages that are not acknowledged within a certain
- * time interval will be served again (to a different connection, established by the recovered source).
- * <p>
- * Note that this source can give no guarantees about message order in teh case of failures,
- * because messages that were retrieved but not yet acknowledged will be returned later again, after
- * a set of messages that was not retrieved before the failure.
- * <p>
- * Internally, this source gathers the IDs of elements it emits. Per checkpoint, the IDs are stored and
- * acknowledged when the checkpoint is complete. That way, no message is acknowledged unless it is certain
- * that it has been successfully processed throughout the topology and the updates to any state caused by
- * that message are persistent.
- * <p>
- * All messages that are emitted and successfully processed by the streaming program will eventually be
- * acknowledged. In corner cases, the source may acknowledge certain IDs multiple times, if a
- * failure occurs while acknowledging.
- * <p>
- * A typical way to use this base in a source function is by implementing a run() method as follows:
- * <pre>{@code
- * public void run(SourceContext<Type> ctx) throws Exception {
- *     while (running) {
- *         Message msg = queue.retrieve();
- *         synchronized (ctx.getCheckpointLock()) {
- *             ctx.collect(msg.getMessageData());
- *             addId(msg.getMessageId());
- *         }
- *     }
- * }
- * }</pre>
- * 
- * @param <Type> The type of the messages created by the source.
- * @param <Id> The type of the IDs that are used for acknowledging elements.
- */
-public abstract class MessageAcknowledingSourceBase<Type, Id> extends RichSourceFunction<Type> 
-	implements Checkpointed<SerializedCheckpointData[]>, CheckpointNotifier {
-	
-	private static final long serialVersionUID = -8689291992192955579L;
-	
-	/** Serializer used to serialize the IDs for checkpoints */
-	private final TypeSerializer<Id> idSerializer;
-	
-	/** The list gathering the IDs of messages emitted during the current checkpoint */
-	private transient List<Id> idsForCurrentCheckpoint;
-
-	/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
-	private transient ArrayDeque<Tuple2<Long, List<Id>>> pendingCheckpoints;
-
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new MessageAcknowledingSourceBase for IDs of teh given type.
-	 * 
-	 * @param idClass The class of the message ID type, used to create a serializer for the message IDs.
-	 */
-	protected MessageAcknowledingSourceBase(Class<Id> idClass) {
-		this(TypeExtractor.getForClass(idClass));
-	}
-
-	/**
-	 * Creates a new MessageAcknowledingSourceBase for IDs of teh given type.
-	 * 
-	 * @param idTypeInfo The type information of the message ID type, used to create a serializer for the message IDs.
-	 */
-	protected MessageAcknowledingSourceBase(TypeInformation<Id> idTypeInfo) {
-		this.idSerializer = idTypeInfo.createSerializer(new ExecutionConfig());
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		idsForCurrentCheckpoint = new ArrayList<>(64);
-		pendingCheckpoints = new ArrayDeque<>();
-	}
-
-	@Override
-	public void close() throws Exception {
-		idsForCurrentCheckpoint.clear();
-		pendingCheckpoints.clear();
-	}
-
-
-	// ------------------------------------------------------------------------
-	//  ID Checkpointing
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This method must be implemented to acknowledge the given set of IDs back to the message queue.
-	 * @param ids The list od IDs to acknowledge.
-	 */
-	protected abstract void acknowledgeIDs(List<Id> ids);
-
-	/**
-	 * Adds an ID to be stored with the current checkpoint.
-	 * @param id The ID to add.
-	 */
-	protected void addId(Id id) {
-		idsForCurrentCheckpoint.add(id);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Checkpointing the data
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-		pendingCheckpoints.addLast(new Tuple2<Long, List<Id>>(checkpointId, idsForCurrentCheckpoint));
-		idsForCurrentCheckpoint = new ArrayList<>(64);
-		
-		return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
-	}
-
-	@Override
-	public void restoreState(SerializedCheckpointData[] state) throws Exception {
-		pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
-	}
-
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		for (Iterator<Tuple2<Long, List<Id>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
-			Tuple2<Long, List<Id>> checkpoint = iter.next();
-			long id = checkpoint.f0;
-			
-			if (id <= checkpointId) {
-				acknowledgeIDs(checkpoint.f1);
-				iter.remove();
-			}
-			else {
-				break;
-			}
-		}
-	}
-}


Mime
View raw message