flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [38/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package
Date Wed, 21 Oct 2015 09:03:54 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
deleted file mode 100644
index e9a739f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-/**
- * A stream data source that is executed in parallel. Upon execution, the runtime will
- * execute as many parallel instances of this function function as configured parallelism
- * of the source.
- *
- * <p>This interface acts only as a marker to tell the system that this source may
- * be executed in parallel. When different parallel instances are required to perform
- * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
- * context, which reveals information like the number of parallel tasks, and which parallel
- * task the current instance is.
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
deleted file mode 100644
index 6e0086d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichEventTimeSourceFunction.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Base class for implementing a parallel event-time data source that has access to context information
- * (via {@link #getRuntimeContext()}) and additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
- *
- * <p>
- * This class is useful when implementing parallel sources where different parallel subtasks
- * need to perform different work. Typical patterns for that are:
- * <ul>
- *     <li>Use {@link #getRuntimeContext()} to obtain the runtime context.</li>
- *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()}
- *         to determine the current parallelism. It is strongly encouraged to use this method, rather than
- *         hard-wiring the parallelism, because the configured parallelism may change depending on
- *         program configuration. The parallelism may also change after recovering failures, when fewer than
- *         desired parallel worker as available.</li>
- *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()} to
- *         determine which subtask the current instance of the function executes.</li>
- * </ul>
- *
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public abstract class RichEventTimeSourceFunction<OUT> extends AbstractRichFunction implements EventTimeSourceFunction<OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
deleted file mode 100644
index 7cbf674..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Base class for implementing a parallel data source. Upon execution, the runtime will
- * execute as many parallel instances of this function function as configured parallelism
- * of the source.
- * 
- * <p>The data source has access to context information (such as the number of parallel
- * instances of the source, and which parallel instance the current instance is)
- * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p>
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
-		implements ParallelSourceFunction<OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
deleted file mode 100644
index dd08b2a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Base class for implementing a parallel data source that has access to context information
- * (via {@link #getRuntimeContext()}) and additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
- *
- * <p>This class is useful when implementing parallel sources where different parallel subtasks
- * need to perform different work. Typical patterns for that are:
- * <ul>
- *     <li>Use {@link #getRuntimeContext()} to obtain the runtime context.</li>
- *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()}
- *         to determine the current parallelism. It is strongly encouraged to use this method, rather than
- *         hard-wiring the parallelism, because the configured parallelism may change depending on
- *         program configuration. The parallelism may also change after recovering failures, when fewer than
- *         desired parallel worker as available.</li>
- *     <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()} to
- *         determine which subtask the current instance of the function executes.</li>
- * </ul>
- * </p>
- *
- * @param <OUT> The type of the records produced by this source.
- */
-public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
deleted file mode 100644
index 9310b71..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.runtime.util.IOUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A source function that reads strings from a socket. The source will read bytes from the socket stream
- * and convert them to characters, each byte individually. When the delimiter character is received,
- * the function will output the current string, and begin a new string.
- * <p>
- * The function strips trailing <i>carriage return</i> characters (\r) when the delimiter is the
- * newline character (\n).
- * <p>
- * The function can be set to reconnect to the server socket in case that the stream is closed on the server side.
- */
-public class SocketTextStreamFunction implements SourceFunction<String> {
-
-	private static final long serialVersionUID = 1L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
-
-	/** Default delay between successive connection attempts */
-	private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
-
-	/** Default connection timeout when connecting to the server socket (infinite) */
-	private static final int CONNECTION_TIMEOUT_TIME = 0;
-	
-	
-	private final String hostname;
-	private final int port;
-	private final char delimiter;
-	private final long maxNumRetries;
-	private final long delayBetweenRetries;
-	
-	private transient Socket currentSocket;
-	
-	private volatile boolean isRunning = true;
-
-	
-	public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxNumRetries) {
-		this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
-	}
-	
-	public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
-		checkArgument(port > 0 && port < 65536, "port is out of range");
-		checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
-		checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
-		
-		this.hostname = checkNotNull(hostname, "hostname must not be null");
-		this.port = port;
-		this.delimiter = delimiter;
-		this.maxNumRetries = maxNumRetries;
-		this.delayBetweenRetries = delayBetweenRetries;
-	}
-
-	@Override
-	public void run(SourceContext<String> ctx) throws Exception {
-		final StringBuilder buffer = new StringBuilder();
-		long attempt = 0;
-		
-		while (isRunning) {
-			
-			try (Socket socket = new Socket()) {
-				currentSocket = socket;
-				
-				LOG.info("Connecting to server socket " + hostname + ':' + port);
-				socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
-				BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-
-				int data;
-				while (isRunning && (data = reader.read()) != -1) {
-					// check if the string is complete
-					if (data != delimiter) {
-						buffer.append((char) data);
-					}
-					else {
-						// truncate trailing carriage return
-						if (delimiter == '\n' && buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
-							buffer.setLength(buffer.length() - 1);
-						}
-						ctx.collect(buffer.toString());
-						buffer.setLength(0);
-					}
-				}
-			}
-
-			// if we dropped out of this loop due to an EOF, sleep and retry
-			if (isRunning) {
-				attempt++;
-				if (maxNumRetries == -1 || attempt < maxNumRetries) {
-					LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
-					Thread.sleep(delayBetweenRetries);
-				}
-				else {
-					// this should probably be here, but some examples expect simple exists of the stream source
-					// throw new EOFException("Reached end of stream and reconnects are not enabled.");
-					break;
-				}
-			}
-		}
-
-		// collect trailing data
-		if (buffer.length() > 0) {
-			ctx.collect(buffer.toString());
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-		
-		// we need to close the socket as well, because the Thread.interrupt() function will
-		// not wake the thread in the socketStream.read() method when blocked.
-		Socket theSocket = this.currentSocket;
-		if (theSocket != null) {
-			IOUtils.closeSocket(theSocket);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
deleted file mode 100644
index 886d6e7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-import java.io.Serializable;
-
-/**
- * Base interface for all stream data sources in Flink. The contract of a stream source
- * is the following: When the source should start emitting elements the {@link #run} method
- * is called with a {@link org.apache.flink.util.Collector} that can be used for emitting elements.
- * The run method can run for as long as necessary. The source must, however, react to an
- * invocation of {@link #cancel} by breaking out of its main loop.
- *
- * <p>
- * <b>Note about checkpointed sources</b> <br>
- *
- * Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
- * interface must ensure that state checkpointing, updating of internal state and emission of
- * elements are not done concurrently. This is achieved by using the provided checkpointing lock
- * object to protect update of state and emission of elements in a synchronized block.
- * </p>
- *
- * <p>
- * This is the basic pattern one should follow when implementing a (checkpointed) source:
- * </p>
- *
- * {@code
- *  public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long> {
- *      private long count = 0L;
- *      private volatile boolean isRunning = true;
- *
- *      @Override
- *      public void run(SourceContext<T> ctx) {
- *          while (isRunning && count < 1000) {
- *              synchronized (ctx.getCheckpointLock()) {
- *                  ctx.collect(count);
- *                  count++;
- *              }
- *          }
- *      }
- *
- *      @Override
- *      public void cancel() {
- *          isRunning = false;
- *      }
- *
- *      @Override
- *      public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; }
- *
- *      @Override
- *      public void restoreState(Long state) { this.count = state; }
- * }
- * </pre>
- *
- *
- * <p>
- * <b>Note about element timestamps and watermarks:</b> <br>
- * Sources must only manually emit watermarks when they implement
- * {@link EventTimeSourceFunction }.
- * Otherwise, elements automatically get the current timestamp assigned at ingress
- * and the system automatically emits watermarks.
- *
- * @param <T> The type of the elements produced by this source.
- */
-public interface SourceFunction<T> extends Function, Serializable {
-
-	/**
-	 * Starts the source. You can use the {@link org.apache.flink.util.Collector} parameter to emit
-	 * elements. Sources that implement
-	 * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} must lock on the
-	 * checkpoint lock (using a synchronized block) before updating internal state and/or emitting
-	 * elements. Also, the update of state and emission of elements must happen in the same
-	 * synchronized block.
-	 *
-	 * @param ctx The context for interaction with the outside world.
-	 */
-	void run(SourceContext<T> ctx) throws Exception;
-
-	/**
-	 * Cancels the source. Most sources will have a while loop inside the
-	 * {@link #run} method. You need to ensure that the source will break out of this loop. This
-	 * can be achieved by having a volatile field "isRunning" that is checked in the loop and that
-	 * is set to false in this method.
-	 */
-	void cancel();
-
-	/**
-	 * Interface that source functions use to communicate with the outside world. Normally
-	 * sources would just emit elements in a loop using {@link #collect}. If the source is a
-	 * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} source it must retrieve
-	 * the checkpoint lock object and use it to protect state updates and element emission as
-	 * described in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
-	 *
-	 * @param <T> The type of the elements produced by the source.
-	 */
-	public static interface SourceContext<T> {
-
-		/**
-		 * Emits one element from the source. The result of {@link System#currentTimeMillis()} is set as
-		 * the timestamp of the emitted element.
-		 *
-		 * @param element The element to emit
-		 */
-		void collect(T element);
-
-		/**
-		 * Emits one element from the source with the given timestamp.
-		 *
-		 * @param element The element to emit
-		 * @param timestamp The timestamp in milliseconds
-		 */
-		public void collectWithTimestamp(T element, long timestamp);
-
-		/**
-		 * Emits the given {@link org.apache.flink.streaming.api.watermark.Watermark}.
-		 *
-		 * <p>
-		 * <b>Important:</b>
-		 * Sources must only manually emit watermarks when they implement
-		 * {@link EventTimeSourceFunction}.
-		 * Otherwise, elements automatically get the current timestamp assigned at ingress
-		 * and the system automatically emits watermarks.
-		 *
-		 * @param mark The {@link Watermark} to emit
-		 */
-		void emitWatermark(Watermark mark);
-
-
-		/**
-		 * Returns the checkpoint lock. Please refer to the explanation about checkpointed sources
-		 * in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
-		 * 
-		 * @return The object to use as the lock. 
-		 */
-		Object getCheckpointLock();
-
-		/**
-		 * This must be called when closing the source operator to allow the {@link SourceContext}
-		 * to clean up internal state.
-		 */
-		void close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
deleted file mode 100644
index 14badf1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-
-/**
- * A stateful streaming source that emits each number from a given interval exactly once,
- * possibly in parallel.
- */
-public class StatefulSequenceSource extends RichParallelSourceFunction<Long> implements Checkpointed<Long> {
-	
-	private static final long serialVersionUID = 1L;
-
-	private final long start;
-	private final long end;
-
-	private long collected;
-
-	private volatile boolean isRunning = true;
-
-	/**
-	 * Creates a source that emits all numbers from the given interval exactly once.
-	 *
-	 * @param start Start of the range of numbers to emit.
-	 * @param end End of the range of numbers to emit.
-	 */
-	public StatefulSequenceSource(long start, long end) {
-		this.start = start;
-		this.end = end;
-	}
-
-	@Override
-	public void run(SourceContext<Long> ctx) throws Exception {
-		final Object checkpointLock = ctx.getCheckpointLock();
-
-		RuntimeContext context = getRuntimeContext();
-
-		final long stepSize = context.getNumberOfParallelSubtasks();
-		final long congruence = start + context.getIndexOfThisSubtask();
-
-		final long toCollect =
-				((end - start + 1) % stepSize > (congruence - start)) ?
-					((end - start + 1) / stepSize + 1) :
-					((end - start + 1) / stepSize);
-		
-
-		while (isRunning && collected < toCollect) {
-			synchronized (checkpointLock) {
-				ctx.collect(collected * stepSize + congruence);
-				collected++;
-			}
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-
-	@Override
-	public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-		return collected;
-	}
-
-	@Override
-	public void restoreState(Long state) {
-		collected = state;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
deleted file mode 100644
index 1d54436..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * Base interface for functions that are evaluated over non-keyed windows.
- *
- * @param <IN> The type of the input value.
- * @param <OUT> The type of the output value.
- */
-public interface AllWindowFunction<IN, OUT,  W extends Window> extends Function, Serializable {
-
-	/**
-	 * Evaluates the window and outputs none or several elements.
-	 *
-	 * @param window The window that is being evaluated.
-	 * @param values The elements in the window being evaluated.
-	 * @param out A collector for emitting elements.
-	 *
-	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
-	 */
-	void apply(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
deleted file mode 100644
index 69f24fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class FoldAllWindowFunction<W extends Window, T, R>
-		extends WrappingFunction<FoldFunction<T, R>>
-		implements AllWindowFunction<T, R, W>, OutputTypeConfigurable<R> {
-	private static final long serialVersionUID = 1L;
-
-	private byte[] serializedInitialValue;
-	private TypeSerializer<R> outSerializer;
-	private transient R initialValue;
-
-	public FoldAllWindowFunction(R initialValue, FoldFunction<T, R> reduceFunction) {
-		super(reduceFunction);
-		this.initialValue = initialValue;
-	}
-
-	@Override
-	public void open(Configuration configuration) throws Exception {
-		super.open(configuration);
-
-		if (serializedInitialValue == null) {
-			throw new RuntimeException("No initial value was serialized for the fold " +
-					"window function. Probably the setOutputType method was not called.");
-		}
-
-		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
-		InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
-				new DataInputStream(bais)
-		);
-		initialValue = outSerializer.deserialize(in);
-	}
-
-	@Override
-	public void apply(W window, Iterable<T> values, Collector<R> out) throws Exception {
-		R result = outSerializer.copy(initialValue);
-
-		for (T val: values) {
-			result = wrappedFunction.fold(result, val);
-		}
-
-		out.collect(result);
-	}
-
-	@Override
-	public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) {
-		outSerializer = outTypeInfo.createSerializer(executionConfig);
-
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
-				new DataOutputStream(baos)
-		);
-
-		try {
-			outSerializer.serialize(initialValue, out);
-		} catch (IOException ioe) {
-			throw new RuntimeException("Unable to serialize initial value of type " +
-					initialValue.getClass().getSimpleName() + " of fold window function.", ioe);
-		}
-
-		serializedInitialValue = baos.toByteArray();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
deleted file mode 100644
index 04d2ac7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class FoldWindowFunction<K, W extends Window, T, R>
-		extends WrappingFunction<FoldFunction<T, R>>
-		implements WindowFunction<T, R, K, W>, OutputTypeConfigurable<R> {
-	private static final long serialVersionUID = 1L;
-
-	private byte[] serializedInitialValue;
-	private TypeSerializer<R> outSerializer;
-	private transient R initialValue;
-
-	public FoldWindowFunction(R initialValue, FoldFunction<T, R> reduceFunction) {
-		super(reduceFunction);
-		this.initialValue = initialValue;
-	}
-
-	@Override
-	public void open(Configuration configuration) throws Exception {
-		super.open(configuration);
-
-		if (serializedInitialValue == null) {
-			throw new RuntimeException("No initial value was serialized for the fold " +
-					"window function. Probably the setOutputType method was not called.");
-		}
-
-		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
-		InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
-				new DataInputStream(bais)
-		);
-		initialValue = outSerializer.deserialize(in);
-	}
-
-	@Override
-	public void apply(K k, W window, Iterable<T> values, Collector<R> out) throws Exception {
-		R result = outSerializer.copy(initialValue);
-
-		for (T val: values) {
-			result = wrappedFunction.fold(result, val);
-		}
-
-		out.collect(result);
-	}
-
-	@Override
-	public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) {
-		outSerializer = outTypeInfo.createSerializer(executionConfig);
-
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
-				new DataOutputStream(baos)
-		);
-
-		try {
-			outSerializer.serialize(initialValue, out);
-		} catch (IOException ioe) {
-			throw new RuntimeException("Unable to serialize initial value of type " +
-					initialValue.getClass().getSimpleName() + " of fold window function.", ioe);
-		}
-
-		serializedInitialValue = baos.toByteArray();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
deleted file mode 100644
index 24855a5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceAllWindowFunction.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-public class ReduceAllWindowFunction<W extends Window, T> extends RichAllWindowFunction<T, T, W> {
-	private static final long serialVersionUID = 1L;
-
-	private final ReduceFunction<T> reduceFunction;
-
-	public ReduceAllWindowFunction(ReduceFunction<T> reduceFunction) {
-		this.reduceFunction = reduceFunction;
-	}
-
-	@Override
-	public void setRuntimeContext(RuntimeContext ctx) {
-		super.setRuntimeContext(ctx);
-		FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		FunctionUtils.openFunction(reduceFunction, parameters);
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		FunctionUtils.closeFunction(reduceFunction);
-	}
-
-	@Override
-	public void apply(W window, Iterable<T> values, Collector<T> out) throws Exception {
-		T result = null;
-
-		for (T v: values) {
-			if (result == null) {
-				result = v;
-			} else {
-				result = reduceFunction.reduce(result, v);
-			}
-		}
-
-		if (result != null) {
-			out.collect(result);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
deleted file mode 100644
index edd8a34..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-public class ReduceWindowFunction<K, W extends Window, T>
-		extends WrappingFunction<ReduceFunction<T>>
-		implements WindowFunction<T, T, K, W> {
-	private static final long serialVersionUID = 1L;
-
-	public ReduceWindowFunction(ReduceFunction<T> reduceFunction) {
-		super(reduceFunction);
-	}
-
-	@Override
-	public void apply(K k, W window, Iterable<T> values, Collector<T> out) throws Exception {
-		T result = null;
-
-		for (T v: values) {
-			if (result == null) {
-				result = v;
-			} else {
-				result = wrappedFunction.reduce(result, v);
-			}
-		}
-
-		if (result != null) {
-			out.collect(result);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
deleted file mode 100644
index 6a472b1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunctionWithWindow.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-public class ReduceWindowFunctionWithWindow<K, W extends Window, T> extends RichWindowFunction<T, Tuple2<W, T>, K, W> {
-	private static final long serialVersionUID = 1L;
-
-	private final ReduceFunction<T> reduceFunction;
-
-	public ReduceWindowFunctionWithWindow(ReduceFunction<T> reduceFunction) {
-		this.reduceFunction = reduceFunction;
-	}
-
-	@Override
-	public void setRuntimeContext(RuntimeContext ctx) {
-		super.setRuntimeContext(ctx);
-		FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		FunctionUtils.openFunction(reduceFunction, parameters);
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		FunctionUtils.closeFunction(reduceFunction);
-	}
-
-	@Override
-	public void apply(K k, W window, Iterable<T> values, Collector<Tuple2<W, T>> out) throws Exception {
-		T result = null;
-
-		for (T v: values) {
-			if (result == null) {
-				result = v;
-			} else {
-				result = reduceFunction.reduce(result, v);
-			}
-		}
-
-		if (result != null) {
-			out.collect(Tuple2.of(window, result));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
deleted file mode 100644
index d78e2c3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-public abstract class RichAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction implements AllWindowFunction<IN, OUT, W> {
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
deleted file mode 100644
index 0d40bbd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-public abstract class RichWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements WindowFunction<IN, OUT, KEY, W> {
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
deleted file mode 100644
index eda12c0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * Base interface for functions that are evaluated over keyed (grouped) windows.
- *
- * @param <IN> The type of the input value.
- * @param <OUT> The type of the output value.
- * @param <KEY> The type of the key.
- */
-public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
-
-	/**
-	 * Evaluates the window and outputs none or several elements.
-	 *
-	 * @param key The key for which this window is evaluated.
-	 * @param window The window that is being evaluated.
-	 * @param values The elements in the window being evaluated.
-	 * @param out A collector for emitting elements.
-	 * 
-	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery. 
-	 */
-	void apply(KEY key, W window, Iterable<IN> values, Collector<OUT> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
deleted file mode 100644
index 86a12e2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
-
-/**
- * This delta function calculates the cosine distance between two given vectors.
- * The cosine distance is defined as: cosineDistance=1-cosineSimilarity
- * 
- * Cosine similarity: http://en.wikipedia.org/wiki/Cosine_similarity
- * 
- * @param <DATA>
- *            The input data type. This delta function works with a double[],
- *            but can extract/convert to it from any other given object in case
- *            the respective extractor has been set. See
- *            {@link ExtractionAwareDeltaFunction} for more information.
- */
-public class CosineDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]> {
-
-	/**
-	 * auto-generated id
-	 */
-	private static final long serialVersionUID = -1217813582965151599L;
-
-	public CosineDistance() {
-		super(null);
-	}
-
-	public CosineDistance(Extractor<DATA, double[]> converter) {
-		super(converter);
-	}
-
-	@Override
-	public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
-		if (isNullvector(oldDataPoint, newDataPoint)) {
-			return 0;
-		}
-
-		if (oldDataPoint.length != newDataPoint.length) {
-			throw new IllegalArgumentException(
-					"The size of two input arrays are not same, can not compute cosine distance");
-		}
-
-		double sum1 = 0;
-		double sum2 = 0;
-		for (int i = 0; i < oldDataPoint.length; i++) {
-			sum1 += oldDataPoint[i] * oldDataPoint[i];
-			sum2 += newDataPoint[i] * newDataPoint[i];
-		}
-		sum1 = Math.sqrt(sum1);
-		sum2 = Math.sqrt(sum2);
-
-		return 1d - (dotProduct(oldDataPoint, newDataPoint) / (sum1 * sum2));
-	}
-
-	private double dotProduct(double[] a, double[] b) {
-		double result = 0;
-		for (int i = 0; i < a.length; i++) {
-			result += a[i] * b[i];
-		}
-		return result;
-	}
-
-	private boolean isNullvector(double[]... vectors) {
-		outer: for (double[] v : vectors) {
-			for (double field : v) {
-				if (field != 0) {
-					continue outer;
-				}
-			}
-			// This position is only reached in case all fields are 0.
-			return true;
-		}
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
deleted file mode 100644
index 0ce2bf9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta;
-
-import java.io.Serializable;
-
-/**
- * This interface allows the implementation of a function which calculates the
- * delta between two data points. Delta functions might be used in delta
- * policies and allow flexible adaptive windowing based on the arriving data
- * points.
- *
- * @param <DATA>
- *            The type of input data which can be compared using this function.
- */
-public interface DeltaFunction<DATA> extends Serializable {
-
-	/**
-	 * Calculates the delta between two given data points.
-	 * 
-	 * @param oldDataPoint
-	 *            the old data point.
-	 * @param newDataPoint
-	 *            the new data point.
-	 * @return the delta between the two given points.
-	 */
-	public double getDelta(DATA oldDataPoint, DATA newDataPoint);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
deleted file mode 100644
index 23efbf2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
-
-/**
- * This delta function calculates the euclidean distance between two given
- * points.
- * 
- * Euclidean distance: http://en.wikipedia.org/wiki/Euclidean_distance
- * 
- * @param <DATA>
- *            The input data type. This delta function works with a double[],
- *            but can extract/convert to it from any other given object in case
- *            the respective extractor has been set. See
- *            {@link ExtractionAwareDeltaFunction} for more information.
- */
-public class EuclideanDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]> {
-
-	public EuclideanDistance() {
-		super(null);
-	}
-
-	public EuclideanDistance(Extractor<DATA, double[]> converter) {
-		super(converter);
-	}
-
-	/**
-	 * auto-generated version id
-	 */
-	private static final long serialVersionUID = 3119432599634512359L;
-
-	@Override
-	public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
-		double result = 0;
-		for (int i = 0; i < oldDataPoint.length; i++) {
-			result += (oldDataPoint[i] - newDataPoint[i]) * (oldDataPoint[i] - newDataPoint[i]);
-		}
-		return Math.sqrt(result);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
deleted file mode 100644
index 7a4e01a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta;
-
-import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
-
-/**
- * Extend this abstract class to implement a delta function which is aware of
- * extracting the data on which the delta is calculated from a more complex data
- * structure. For example in case you want to be able to run a delta only on one
- * field of a Tuple type or only on some fields from an array.
- * 
- * @param <DATA>
- *            The input data type. The input of this type will be passed to the
- *            extractor which will transform into a TO-object. The delta
- *            function then runs on this TO-object.
- * @param <TO>
- *            The type on which the delta function runs. (The type of the delta
- *            function)
- */
-public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements DeltaFunction<DATA> {
-
-	/**
-	 * Generated Version ID
-	 */
-	private static final long serialVersionUID = 6927486219702689554L;
-	private Extractor<DATA, TO> converter;
-
-	public ExtractionAwareDeltaFunction(Extractor<DATA, TO> converter) {
-		this.converter = converter;
-	}
-
-	/**
-	 * This method takes the two data point and runs the set extractor on it.
-	 * The delta function implemented at {@link #getNestedDelta} is then called
-	 * with the extracted data. In case no extractor is set the input data gets
-	 * passes to {@link #getNestedDelta} as-is. The return value is just
-	 * forwarded from {@link #getNestedDelta}.
-	 * 
-	 * @param oldDataPoint
-	 *            the older data point as raw data (before extraction).
-	 * @param newDataPoint
-	 *            the new data point as raw data (before extraction).
-	 * @return the delta between the two points.
-	 */
-	@SuppressWarnings("unchecked")
-	@Override
-	public double getDelta(DATA oldDataPoint, DATA newDataPoint) {
-		if (converter == null) {
-			// In case no conversion/extraction is required, we can cast DATA to
-			// TO
-			// => Therefore, "unchecked" warning is suppressed for this method.
-			return getNestedDelta((TO) oldDataPoint, (TO) newDataPoint);
-		} else {
-			return getNestedDelta(converter.extract(oldDataPoint), converter.extract(newDataPoint));
-		}
-
-	}
-
-	/**
-	 * This method is exactly the same as
-	 * {@link DeltaFunction#getDelta(Object, Object)} except that it gets the
-	 * result of the previously done extractions as input. Therefore, this
-	 * method only does the actual calculation of the delta but no data
-	 * extraction or conversion.
-	 * 
-	 * @param oldDataPoint
-	 *            the older data point.
-	 * @param newDataPoint
-	 *            the new data point.
-	 * @return the delta between the two points.
-	 */
-	public abstract double getNestedDelta(TO oldDataPoint, TO newDataPoint);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
deleted file mode 100644
index baceba4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Converts a Tuple to an Object-Array. The field which should be included in
- * the array can selected and reordered as needed.
- */
-public class ArrayFromTuple implements Extractor<Tuple, Object[]> {
-
-	/**
-	 * Auto generated version id
-	 */
-	private static final long serialVersionUID = -6076121226427616818L;
-	int[] order = null;
-
-	/**
-	 * Using this constructor the extractor will convert the whole tuple (all
-	 * fields in the original order) to an array.
-	 */
-	public ArrayFromTuple() {
-		// noting to do
-	}
-
-	/**
-	 * Using this constructor the extractor will combine the fields as specified
-	 * in the indexes parameter in an object array.
-	 * 
-	 * @param indexes
-	 *            the field ids (enumerated from 0)
-	 */
-	public ArrayFromTuple(int... indexes) {
-		this.order = indexes;
-	}
-
-	@Override
-	public Object[] extract(Tuple in) {
-		Object[] output;
-
-		if (order == null) {
-			// copy the whole tuple
-			output = new Object[in.getArity()];
-			for (int i = 0; i < in.getArity(); i++) {
-				output[i] = in.getField(i);
-			}
-		} else {
-			// copy user specified order
-			output = new Object[order.length];
-			for (int i = 0; i < order.length; i++) {
-				output[i] = in.getField(order[i]);
-			}
-		}
-
-		return output;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
deleted file mode 100644
index 89c3a32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-/**
- * Combines two extractors which will be executed one after each other.
- *
- * @param <FROM>
- *            The input type of the first extractor.
- * @param <OVER>
- *            The output type of the first and the input type of the second
- *            extractor.
- * @param <TO>
- *            The output type of the second extractor and the output type of the
- *            over all extraction.
- */
-public class ConcatenatedExtract<FROM, OVER, TO> implements Extractor<FROM, TO> {
-
-	/**
-	 * auto-generated id
-	 */
-	private static final long serialVersionUID = -7807197760725651752L;
-
-	private Extractor<FROM, OVER> e1;
-	private Extractor<OVER, TO> e2;
-
-	/**
-	 * Combines two extractors which will be executed one after each other.
-	 * 
-	 * @param e1
-	 *            First extractor: This extractor gets applied to the input data
-	 *            first. Its output as then passed as input to the second
-	 *            extractor.
-	 * @param e2
-	 *            Second extractor: This extractor gets the output of the first
-	 *            extractor as input. Its output is then the result of the over
-	 *            all extraction.
-	 */
-	public ConcatenatedExtract(Extractor<FROM, OVER> e1, Extractor<OVER, TO> e2) {
-		this.e1 = e1;
-		this.e2 = e2;
-	}
-
-	@Override
-	public TO extract(FROM in) {
-		return e2.extract(e1.extract(in));
-	}
-
-	public <OUT> ConcatenatedExtract<FROM, TO, OUT> add(Extractor<TO, OUT> e3) {
-		return new ConcatenatedExtract<FROM, TO, OUT>(this, e3);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
deleted file mode 100644
index 8cd0014..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import java.io.Serializable;
-
-/**
- * Extractors allow to extract/convert one type to another. They are mostly used
- * to extract some fields out of a more complex structure (Tuple/Array) to run
- * further calculation on the extraction result.
- * 
- * @param <FROM>
- *            The input data type.
- * @param <TO>
- *            The output data type.
- */
-public interface Extractor<FROM, TO> extends Serializable {
-
-	/**
-	 * Extracts/Converts the given input to an object of the output type
-	 * 
-	 * @param in
-	 *            the input data
-	 * @return the extracted/converted data
-	 */
-	public TO extract(FROM in);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
deleted file mode 100644
index f9d0a2b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import java.lang.reflect.Array;
-
-/**
- * Extracts a single field out of an array.
- * 
- * @param <OUT>
- *            The type of the extracted field.
- */
-public class FieldFromArray<OUT> implements Extractor<Object, OUT> {
-
-	/**
-	 * Auto-gernated version id
-	 */
-	private static final long serialVersionUID = -5161386546695574359L;
-	private int fieldId = 0;
-
-	/**
-	 * Extracts the first field (id 0) from the array
-	 */
-	public FieldFromArray() {
-		// noting to do => will use default 0
-	}
-
-	/**
-	 * Extracts the field with the given id from the array.
-	 * 
-	 * @param fieldId
-	 *            The id of the field which will be extracted from the array.
-	 */
-	public FieldFromArray(int fieldId) {
-		this.fieldId = fieldId;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public OUT extract(Object in) {
-		return (OUT) Array.get(in, fieldId);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
deleted file mode 100644
index 627afca..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Extracts a single field out of a tuple.
- * 
- * @param <OUT>
- *            The type of the extracted field.
- */
-public class FieldFromTuple<OUT> implements Extractor<Tuple, OUT> {
-
-	/**
-	 * Auto-gernated version id
-	 */
-	private static final long serialVersionUID = -5161386546695574359L;
-	private int fieldId = 0;
-
-	/**
-	 * Extracts the first field (id 0) from the tuple
-	 */
-	public FieldFromTuple() {
-		// noting to do => will use default 0
-	}
-
-	/**
-	 * Extracts the field with the given id from the tuple.
-	 * 
-	 * @param fieldId
-	 *            The id of the field which will be extracted from the tuple.
-	 */
-	public FieldFromTuple(int fieldId) {
-		this.fieldId = fieldId;
-	}
-
-	@Override
-	public OUT extract(Tuple in) {
-		return in.getField(fieldId);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
deleted file mode 100644
index b1c080e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import java.lang.reflect.Array;
-
-/**
- * Extracts multiple fields from an array and puts them into a new array of the
- * specified type.
- *
- * @param <OUT>
- *            The type of the output array. If out is set to String, the output
- *            of the extractor will be a String[]. If it is set to String[] the
- *            output will be String[][].
- */
-public class FieldsFromArray<OUT> implements Extractor<Object, OUT[]> {
-
-	/**
-	 * Auto-generated version id
-	 */
-	private static final long serialVersionUID = 8075055384516397670L;
-	private int[] order;
-	private Class<OUT> clazz;
-
-	/**
-	 * Extracts multiple fields from an array and puts them in the given order
-	 * into a new array of the specified type.
-	 * 
-	 * @param clazz
-	 *            the Class object representing the component type of the new
-	 *            array
-	 * @param indexes
-	 *            The indexes of the fields to be extracted. Any order is
-	 *            possible, but not more than 255 fields due to limitations in
-	 *            {@link Array#newInstance(Class, int...)}.
-	 */
-	public FieldsFromArray(Class<OUT> clazz, int... indexes) {
-		this.order = indexes;
-		this.clazz = clazz;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public OUT[] extract(Object in) {
-		OUT[] output = (OUT[]) Array.newInstance(clazz, order.length);
-		for (int i = 0; i < order.length; i++) {
-			output[i] = (OUT) Array.get(in, this.order[i]);
-		}
-		return output;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
deleted file mode 100644
index fc7f3ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromTuple.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Extracts one or more fields of the type Double from a tuple and puts them
- * into a new double[]
- */
-public class FieldsFromTuple implements Extractor<Tuple, double[]> {
-
-	/**
-	 * auto generated version id
-	 */
-	private static final long serialVersionUID = -2554079091050273761L;
-	int[] indexes;
-
-	/**
-	 * Extracts one or more fields of the the type Double from a tuple and puts
-	 * them into a new double[] (in the specified order).
-	 * 
-	 * @param indexes
-	 *            The indexes of the fields to be extracted.
-	 */
-	public FieldsFromTuple(int... indexes) {
-		this.indexes = indexes;
-	}
-
-	@Override
-	public double[] extract(Tuple in) {
-		double[] out = new double[indexes.length];
-		for (int i = 0; i < indexes.length; i++) {
-			out[i] = (Double) in.getField(indexes[i]);
-		}
-		return out;
-	}
-}


Mime
View raw message