flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [18/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:41:56 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
new file mode 100755
index 0000000..7ce864e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import java.io.IOException;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.ConnectorSource;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.QueueingConsumer;
+
+public class RMQSource<OUT> extends ConnectorSource<OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
+
+	private final String QUEUE_NAME;
+	private final String HOST_NAME;
+
+	private transient ConnectionFactory factory;
+	private transient Connection connection;
+	private transient Channel channel;
+	private transient QueueingConsumer consumer;
+	private transient QueueingConsumer.Delivery delivery;
+
+	OUT out;
+
+	public RMQSource(String HOST_NAME, String QUEUE_NAME,
+			DeserializationSchema<OUT> deserializationSchema) {
+		super(deserializationSchema);
+		this.HOST_NAME = HOST_NAME;
+		this.QUEUE_NAME = QUEUE_NAME;
+	}
+
+	/**
+	 * Initializes the connection to RMQ.
+	 */
+	private void initializeConnection() {
+		factory = new ConnectionFactory();
+		factory.setHost(HOST_NAME);
+		try {
+			connection = factory.newConnection();
+			channel = connection.createChannel();
+			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+			consumer = new QueueingConsumer(channel);
+			channel.basicConsume(QUEUE_NAME, true, consumer);
+		} catch (IOException e) {
+			throw new RuntimeException("Cannot create RMQ connection with " + QUEUE_NAME + " at "
+					+ HOST_NAME, e);
+		}
+	}
+
+	/**
+	 * Called to forward the data from the source to the {@link DataStream}.
+	 * 
+	 * @param collector
+	 *            The Collector for sending data to the dataStream
+	 */
+	@Override
+	public void invoke(Collector<OUT> collector) throws Exception {
+
+		while (true) {
+
+			try {
+				delivery = consumer.nextDelivery();
+			} catch (Exception e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+				}
+			}
+
+			out = schema.deserialize(delivery.getBody());
+			if (schema.isEndOfStream(out)) {
+				break;
+			} else {
+				collector.collect(out);
+			}
+		}
+
+	}
+
+	@Override
+	public void open(Configuration config) {
+		initializeConnection();
+	}
+
+	@Override
+	public void close() {
+		try {
+			connection.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+					+ " at " + HOST_NAME, e);
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
new file mode 100755
index 0000000..a6ca9ae
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
+
+public class RMQTopology {
+
+	public static void main(String[] args) throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		@SuppressWarnings("unused")
+		DataStream<String> dataStream1 = env.addSource(
+				new RMQSource<String>("localhost", "hello", new SimpleStringSchema())).print();
+
+		@SuppressWarnings("unused")
+		DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
+				"q").addSink(
+				new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
+
+		env.execute();
+	}
+
+	public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public byte[] serialize(String element) {
+			return element.getBytes();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
new file mode 100644
index 0000000..ddb2538
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to emit tweets from
+ * Twitter. It can connect to Twitter Streaming API, collect tweets and
+ */
+public class TwitterSource extends RichParallelSourceFunction<String> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
+
+	private static final long serialVersionUID = 1L;
+	private String authPath;
+	private transient BlockingQueue<String> queue;
+	private int queueSize = 10000;
+	private transient BasicClient client;
+	private int waitSec = 5;
+
+	private boolean streaming;
+	private int numberOfTweets;
+
+	/**
+	 * Create {@link TwitterSource} for streaming
+	 * 
+	 * @param authPath
+	 *            Location of the properties file containing the required
+	 *            authentication information.
+	 */
+	public TwitterSource(String authPath) {
+		this.authPath = authPath;
+		streaming = true;
+	}
+
+	/**
+	 * Create {@link TwitterSource} to collect finite number of tweets
+	 * 
+	 * @param authPath
+	 *            Location of the properties file containing the required
+	 *            authentication information.
+	 * @param numberOfTweets
+	 * 
+	 */
+	public TwitterSource(String authPath, int numberOfTweets) {
+		this.authPath = authPath;
+		streaming = false;
+		this.numberOfTweets = numberOfTweets;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		initializeConnection();
+	}
+
+	@Override
+	public void invoke(Collector<String> collector) throws Exception {
+
+		if (streaming) {
+			collectMessages(collector);
+		} else {
+			collectFiniteMessages(collector);
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		closeConnection();
+	}
+
+	/**
+	 * Initialize Hosebird Client to be able to consume Twitter's Streaming API
+	 */
+	private void initializeConnection() {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Initializing Twitter Streaming API connection");
+		}
+
+		queue = new LinkedBlockingQueue<String>(queueSize);
+
+		StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+		endpoint.stallWarnings(false);
+
+		Authentication auth = authenticate();
+
+		initializeClient(endpoint, auth);
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Twitter Streaming API connection established successfully");
+		}
+	}
+
+	private OAuth1 authenticate() {
+
+		Properties authenticationProperties = loadAuthenticationProperties();
+
+		return new OAuth1(authenticationProperties.getProperty("consumerKey"),
+				authenticationProperties.getProperty("consumerSecret"),
+				authenticationProperties.getProperty("token"),
+				authenticationProperties.getProperty("secret"));
+	}
+
+	/**
+	 * Reads the given properties file for the authentication data.
+	 * 
+	 * @return the authentication data.
+	 */
+	private Properties loadAuthenticationProperties() {
+		Properties properties = new Properties();
+		try {
+			InputStream input = new FileInputStream(authPath);
+			properties.load(input);
+			input.close();
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot open .properties file: " + authPath, e);
+		}
+		return properties;
+	}
+
+	private void initializeClient(StatusesSampleEndpoint endpoint, Authentication auth) {
+
+		client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.STREAM_HOST)
+				.endpoint(endpoint).authentication(auth)
+				.processor(new StringDelimitedProcessor(queue)).build();
+
+		client.connect();
+	}
+
+	/**
+	 * Put tweets into collector
+	 * 
+	 * @param collector
+	 *            Collector in which the tweets are collected.
+	 */
+	protected void collectFiniteMessages(Collector<String> collector) {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Collecting tweets");
+		}
+
+		for (int i = 0; i < numberOfTweets; i++) {
+			collectOneMessage(collector);
+		}
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Collecting tweets finished");
+		}
+	}
+
+	/**
+	 * Put tweets into collector
+	 * 
+	 * @param collector
+	 *            Collector in which the tweets are collected.
+	 */
+	protected void collectMessages(Collector<String> collector) {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Tweet-stream begins");
+		}
+
+		while (true) {
+			collectOneMessage(collector);
+		}
+	}
+
+	/**
+	 * Put one tweet into the collector.
+	 * 
+	 * @param collector
+	 *            Collector in which the tweets are collected.
+	 */
+	protected void collectOneMessage(Collector<String> collector) {
+		if (client.isDone()) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
+						.getMessage());
+			}
+		}
+
+		try {
+			String msg = queue.poll(waitSec, TimeUnit.SECONDS);
+			if (msg != null) {
+				collector.collect(msg);
+			} else {
+				if (LOG.isInfoEnabled()) {
+					LOG.info("Did not receive a message in {} seconds", waitSec);
+				}
+			}
+		} catch (InterruptedException e) {
+			throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
+		}
+
+	}
+
+	private void closeConnection() {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Initiating connection close");
+		}
+
+		client.stop();
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Connection closed successfully");
+		}
+	}
+
+	/**
+	 * Get the size of the queue in which the tweets are contained temporarily.
+	 * 
+	 * @return the size of the queue in which the tweets are contained temporarily
+	 */
+	public int getQueueSize() {
+		return queueSize;
+	}
+
+	/**
+	 * Set the size of the queue in which the tweets are contained temporarily.
+	 * 
+	 * @param queueSize
+	 *            The desired value.
+	 */
+	public void setQueueSize(int queueSize) {
+		this.queueSize = queueSize;
+	}
+
+	/**
+	 * This function tells how long TwitterSource waits for the tweets.
+	 * 
+	 * @return Number of second.
+	 */
+	public int getWaitSec() {
+		return waitSec;
+	}
+
+	/**
+	 * This function sets how long TwitterSource should wait for the tweets.
+	 * 
+	 * @param waitSec
+	 *            The desired value.
+	 */
+	public void setWaitSec(int waitSec) {
+		this.waitSec = waitSec;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
new file mode 100644
index 0000000..a32fe1b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TwitterStreaming {
+
+	private static final int PARALLELISM = 1;
+	private static final int SOURCE_PARALLELISM = 1;
+	private static final int NUMBEROFTWEETS = 100;
+
+	private static final Logger LOG = LoggerFactory.getLogger(TwitterStreaming.class);
+
+	public static class TwitterSink implements SinkFunction<Tuple5<Long, Integer, String, String, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Tuple5<Long, Integer, String, String, String> tuple) {
+			System.out.println("ID: " + tuple.f0 + " int: " + tuple.f1 + " LANGUAGE: " + tuple.f2);
+			System.out.println("NAME: " + tuple.f4);
+			System.out.println("TEXT: " + tuple.f3);
+			System.out.println("");
+		}
+
+	}
+
+	public static class SelectDataFlatMap extends
+			JSONParseFlatMap<String, Tuple5<Long, Integer, String, String, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Tuple5<Long, Integer, String, String, String>> out)
+				throws Exception {
+			try {
+				out.collect(new Tuple5<Long, Integer, String, String, String>(
+						getLong(value, "id"),
+						getInt(value, "entities.hashtags[0].indices[1]"),
+						getString(value, "lang"),
+						getString(value, "text"),
+						getString(value, "user.name")));
+			} catch (JSONException e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Field not found");
+				}
+			} 
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		String path = new String();
+
+		if (args != null && args.length == 1) {
+			path = args[0];
+		} else {
+			System.err.println("USAGE:\nTwitterStreaming <pathToPropertiesFile>");
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(PARALLELISM);
+
+		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
+				.setParallelism(SOURCE_PARALLELISM);
+
+		DataStream<Tuple5<Long, Integer, String, String, String>> selectedDataStream = streamSource
+				.flatMap(new SelectDataFlatMap());
+
+		selectedDataStream.addSink(new TwitterSink());
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
new file mode 100644
index 0000000..4bc6df0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+ * This program demonstrate the use of TwitterSource. 
+ * Its aim is to count the frequency of the languages of tweets
+ */
+public class TwitterTopology {
+
+	private static final int NUMBEROFTWEETS = 100;
+	
+	/**
+	 * FlatMapFunction to determine the language of tweets if possible 
+	 */
+	public static class SelectLanguageFlatMap extends
+			JSONParseFlatMap<String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * Select the language from the incoming JSON text
+		 */
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			try{
+				out.collect(getString(value, "lang"));
+			}
+			catch (JSONException e){
+				out.collect("");
+			}
+		}
+
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		String path = new String();
+
+		if (args != null && args.length == 1) {
+			path = args[0];
+		} else {
+			System.err.println("USAGE:\nTwitterLocal <pathToPropertiesFile>");
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS));
+
+
+		DataStream<Tuple2<String, Integer>> dataStream = streamSource
+				.flatMap(new SelectLanguageFlatMap())
+				.map(new MapFunction<String, Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+					
+					@Override
+					public Tuple2<String, Integer> map(String value) throws Exception {
+						return new Tuple2<String, Integer>(value, 1);
+					}
+				})
+				.groupBy(0)
+				.sum(1);
+
+		dataStream.print();
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
new file mode 100644
index 0000000..4507a1d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+import java.io.Serializable;
+
+public interface DeserializationSchema<T> extends Serializable {
+
+	/**
+	 * Deserializes the incoming data.
+	 * 
+	 * @param message
+	 *            The incoming message in a byte array
+	 * @return The deserialized message in the required format.
+	 */
+	public T deserialize(byte[] message);
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted
+	 * 
+	 * @param nextElement
+	 *            The element to test for end signal
+	 * @return The end signal, if true the stream shuts down
+	 */
+	public boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
new file mode 100644
index 0000000..29c749a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+public class RawSchema implements DeserializationSchema<byte[]>,
+		SerializationSchema<byte[], byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public byte[] deserialize(byte[] message) {
+		return message;
+	}
+
+	@Override
+	public boolean isEndOfStream(byte[] nextElement) {
+		return false;
+	}
+
+	@Override
+	public byte[] serialize(byte[] element) {
+		return element;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
new file mode 100644
index 0000000..7c32312
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+import java.io.Serializable;
+
+public interface SerializationSchema<T,R> extends Serializable {
+
+	/**
+	 * Serializes the incoming element to a specified type.
+	 * 
+	 * @param element
+	 *            The incoming element to be serialized
+	 * @return The serialized element.
+	 */
+	public R serialize(T element);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
new file mode 100644
index 0000000..4b21580
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+public class SimpleStringSchema implements DeserializationSchema<String>,
+		SerializationSchema<String, String> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public String deserialize(byte[] message) {
+		return new String(message);
+	}
+
+	@Override
+	public boolean isEndOfStream(String nextElement) {
+		return false;
+	}
+
+	@Override
+	public String serialize(String element) {
+		return element;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java
new file mode 100644
index 0000000..fe3783b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/db/DBStateTest.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class DBStateTest {
+	
+	public void stateTest(DBState<String, Integer> state) {
+		state.put("k1", 1);
+		assertEquals(Integer.valueOf(1), state.get("k1"));
+
+		state.put("k2", 2);
+		state.put("k3", 3);
+		assertEquals(Integer.valueOf(2), state.get("k2"));
+		assertEquals(Integer.valueOf(3), state.get("k3"));
+		state.remove("k2");
+		
+		try {
+			state.get("k2");
+			fail();
+		} catch (Exception e) {
+		}
+	}
+	
+	private void iteratorTest(DBStateWithIterator<String, Integer> state) {
+		HashMap<String, Integer> expected = new HashMap<String, Integer>();
+		HashMap<String, Integer> result = new HashMap<String, Integer>();
+		
+		state.put("10", 10);
+		state.put("20", 20);
+		state.put("30", 30);
+
+		expected.put("10", 10);
+		expected.put("20", 20);
+		expected.put("30", 30);
+		
+		DBStateIterator<String, Integer> iterator = state.getIterator();
+		while (iterator.hasNext()) {
+			String key = iterator.getNextKey();
+			Integer value = iterator.getNextValue();
+			result.put(key, value);
+			iterator.next();
+		}
+		state.close();
+		
+		assertEquals(expected, result);
+	}
+	
+	// TODO
+	@Ignore("Creates files with no licenses")
+	@Test
+	public void levelDBTest() {
+		LevelDBState<String, Integer> state = new LevelDBState<String, Integer>("test");
+		stateTest(state);
+		state.close();
+		
+		state = new LevelDBState<String, Integer>("test");
+		iteratorTest(state);
+		state.close();
+	}
+	
+	@Ignore("Needs running Memcached")
+	@Test
+	public void memcachedTest() {
+		MemcachedState<Integer> state = new MemcachedState<Integer>();
+		stateTest(state);
+		state.close();
+	}
+
+	@Ignore("Needs running Redis")
+	@Test
+	public void redisTest() {
+		RedisState<String, Integer> state = new RedisState<String, Integer>();
+		stateTest(state);
+		state.close();
+		
+		state = new RedisState<String, Integer>();
+		iteratorTest(state);
+		state.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
new file mode 100644
index 0000000..b1d4115
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class JSONParserTest {
+
+	private String jsonText;
+	private String searchedField;
+
+	public JSONParserTest(String text, String field) {
+		jsonText = text;
+		searchedField = field;
+	}
+
+	@Parameters
+	public static Collection<Object[]> initParameterList() {
+
+		Object[][] parameterList = new Object[][] { 
+				{ "{\"key\":\"value\"}", 							"key" },
+				{ "{\"key\":[\"value\"]}", 							"key[0]" },
+				{ "{\"key\":[{\"key\":\"value\"}]}", 				"key[0].key" },
+				{ "{\"key\":[{\"key\":[{\"key\":\"value\"}]}]}", 	"key[0].key[0].key"},
+				{ "{\"key\":[1,[{\"key\":\"value\"}]]}", 			"key[1][0].key" },
+				{ "{\"key\":[1,[[\"key\",2,\"value\"]]]}", 			"key[1][0][2]" },
+				{ "{\"key\":{\"key\":{\"otherKey\":\"wrongValue\",\"key\":\"value\"},\"otherKey\":\"wrongValue\"},\"otherKey\":\"wrongValue\"}" , "key.key.key"}
+				};
+
+		return Arrays.asList(parameterList);
+	}
+
+	@Test
+	public void test() {
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+			String expected = "{\"retValue\":\"value\"}";
+
+			assertTrue(expected.equals(jo.toString()));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
new file mode 100644
index 0000000..8851086
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+
+
+public class JSONParserTest2 {
+	
+	@Test
+	public void testGetBooleanFunction() {
+		String jsonText = "{\"key\":true}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertTrue(jo.getBoolean("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+	@Test
+	public void testGetDoubleFunction() {
+		double expected = 12345.12345;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getDouble("retValue"),0.000001);
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+	@Test
+	public void testGetIntFunction() {
+		int expected = 15;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getInt("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+
+	@Test
+	public void testGetLongFunction() {
+		long expected = 111111111111L;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getLong("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2fb9345
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
@@ -0,0 +1,19 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/pom.xml b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
new file mode 100644
index 0000000..a60bd01
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-parent</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-streaming-core</artifactId>
+	<name>flink-streaming-core</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-math</artifactId>
+			<version>2.2</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+        
+	        <dependency>
+			<groupId>org.apache.sling</groupId>
+			<artifactId>org.apache.sling.commons.json</artifactId>
+			<version>2.0.6</version>
+        	</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
new file mode 100644
index 0000000..1d51216
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
+import org.apache.flink.streaming.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.util.InstantiationUtil;
+
+public class StreamConfig implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String INPUT_TYPE = "inputType_";
+	private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
+	private static final String NUMBER_OF_INPUTS = "numberOfInputs";
+	private static final String CHAINED_OUTPUTS = "chainedOutputs";
+	private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
+	private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
+	private static final String OUTPUT_NAME = "outputName_";
+	private static final String PARTITIONER_OBJECT = "partitionerObject_";
+	private static final String VERTEX_NAME = "vertexName";
+	private static final String ITERATION_ID = "iteration-id";
+	private static final String OUTPUT_SELECTOR = "outputSelector";
+	private static final String DIRECTED_EMIT = "directedEmit";
+	private static final String SERIALIZEDUDF = "serializedudf";
+	private static final String USER_FUNCTION = "userfunction";
+	private static final String BUFFER_TIMEOUT = "bufferTimeout";
+	private static final String OPERATOR_STATES = "operatorStates";
+	private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
+	private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
+	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
+	private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
+	private static final String ITERATON_WAIT = "iterationWait";
+	private static final String OUTPUTS = "outVertexNames";
+	private static final String EDGES_IN_ORDER = "rwOrder";
+
+	// DEFAULT VALUES
+
+	private static final long DEFAULT_TIMEOUT = 100;
+
+	// CONFIG METHODS
+
+	private Configuration config;
+
+	public StreamConfig(Configuration config) {
+		this.config = config;
+	}
+
+	public Configuration getConfiguration() {
+		return config;
+	}
+
+	public void setVertexName(String vertexName) {
+		config.setString(VERTEX_NAME, vertexName);
+	}
+
+	public String getTaskName() {
+		return config.getString(VERTEX_NAME, "Missing");
+	}
+
+	public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
+	}
+
+	public void setTypeSerializerIn2(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
+	}
+
+	public void setTypeSerializerOut1(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
+	}
+
+	public void setTypeSerializerOut2(StreamRecordSerializer<?> serializer) {
+		setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_IN_1, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_IN_2, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_OUT_1, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> StreamRecordSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
+		try {
+			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					TYPE_SERIALIZER_OUT_2, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate serializer.");
+		}
+	}
+
+	private void setTypeSerializer(String key, StreamRecordSerializer<?> typeWrapper) {
+		config.setBytes(key, SerializationUtils.serialize(typeWrapper));
+	}
+
+	public void setBufferTimeout(long timeout) {
+		config.setLong(BUFFER_TIMEOUT, timeout);
+	}
+
+	public long getBufferTimeout() {
+		return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
+	}
+
+	public void setUserInvokable(StreamInvokable<?, ?> invokableObject) {
+		if (invokableObject != null) {
+			config.setClass(USER_FUNCTION, invokableObject.getClass());
+
+			try {
+				config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject));
+			} catch (SerializationException e) {
+				throw new RuntimeException("Cannot serialize invokable object "
+						+ invokableObject.getClass(), e);
+			}
+		}
+	}
+
+	@SuppressWarnings({ "unchecked" })
+	public <T> T getUserInvokable(ClassLoader cl) {
+		try {
+			return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
+		} catch (Exception e) {
+			throw new StreamVertexException("Cannot instantiate user function", e);
+		}
+	}
+
+	public void setDirectedEmit(boolean directedEmit) {
+		config.setBoolean(DIRECTED_EMIT, directedEmit);
+	}
+
+	public boolean isDirectedEmit() {
+		return config.getBoolean(DIRECTED_EMIT, false);
+	}
+
+
+	public void setOutputSelectors(List<OutputSelector<?>> outputSelector) {
+		try {
+			if (outputSelector != null) {
+				setDirectedEmit(true);
+				config.setBytes(OUTPUT_SELECTOR, SerializationUtils.serialize((Serializable) outputSelector));
+			}
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize OutputSelector");
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader cl) {
+		try {
+			return (List<OutputSelector<T>>) InstantiationUtil.readObjectFromConfig(this.config,
+					OUTPUT_SELECTOR, cl);
+		} catch (Exception e) {
+			throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e);
+		}
+	}
+
+	public void setIterationId(Integer iterationId) {
+		config.setInteger(ITERATION_ID, iterationId);
+	}
+
+	public Integer getIterationId() {
+		return config.getInteger(ITERATION_ID, 0);
+	}
+
+	public void setIterationWaitTime(long time) {
+		config.setLong(ITERATON_WAIT, time);
+	}
+
+	public long getIterationWaitTime() {
+		return config.getLong(ITERATON_WAIT, 0);
+	}
+
+	public <T> void setPartitioner(String output, StreamPartitioner<T> partitionerObject) {
+
+		config.setBytes(PARTITIONER_OBJECT + output,
+				SerializationUtils.serialize(partitionerObject));
+	}
+
+	@SuppressWarnings("unchecked")
+	public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, String output) {
+		StreamPartitioner<T> partitioner = null;
+		try {
+			partitioner = (StreamPartitioner<T>) InstantiationUtil.readObjectFromConfig(
+					this.config, PARTITIONER_OBJECT + output, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Partitioner could not be instantiated.");
+		}
+		if (partitioner != null) {
+			return partitioner;
+		} else {
+			return new ShufflePartitioner<T>();
+		}
+	}
+
+	public void setSelectedNames(String output, List<String> selected) {
+		if (selected != null) {
+			config.setBytes(OUTPUT_NAME + output,
+					SerializationUtils.serialize((Serializable) selected));
+		} else {
+			config.setBytes(OUTPUT_NAME + output,
+					SerializationUtils.serialize((Serializable) new ArrayList<String>()));
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<String> getSelectedNames(String output) {
+		return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output,
+				null));
+	}
+
+	public void setNumberOfInputs(int numberOfInputs) {
+		config.setInteger(NUMBER_OF_INPUTS, numberOfInputs);
+	}
+
+	public int getNumberOfInputs() {
+		return config.getInteger(NUMBER_OF_INPUTS, 0);
+	}
+
+	public void setNumberOfOutputs(int numberOfOutputs) {
+		config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs);
+	}
+
+	public int getNumberOfOutputs() {
+		return config.getInteger(NUMBER_OF_OUTPUTS, 0);
+	}
+
+	public void setOutputs(List<String> outputVertexNames) {
+		config.setBytes(OUTPUTS, SerializationUtils.serialize((Serializable) outputVertexNames));
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<String> getOutputs(ClassLoader cl) {
+		try {
+			return (List<String>) InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate outputs.");
+		}
+	}
+
+	public void setOutEdgesInOrder(List<Tuple2<String, String>> outEdgeList) {
+
+		config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList));
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<Tuple2<String, String>> getOutEdgesInOrder(ClassLoader cl) {
+		try {
+			return (List<Tuple2<String, String>>) InstantiationUtil.readObjectFromConfig(
+					this.config, EDGES_IN_ORDER, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate outputs.");
+		}
+	}
+
+	public void setInputIndex(int inputNumber, Integer inputTypeNumber) {
+		config.setInteger(INPUT_TYPE + inputNumber++, inputTypeNumber);
+	}
+
+	public int getInputIndex(int inputNumber) {
+		return config.getInteger(INPUT_TYPE + inputNumber, 0);
+	}
+
+	public void setOperatorStates(Map<String, OperatorState<?>> states) {
+		config.setBytes(OPERATOR_STATES, SerializationUtils.serialize((Serializable) states));
+	}
+
+	@SuppressWarnings("unchecked")
+	public Map<String, OperatorState<?>> getOperatorStates(ClassLoader cl) {
+		try {
+			return (Map<String, OperatorState<?>>) InstantiationUtil.readObjectFromConfig(
+					this.config, OPERATOR_STATES, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not load operator state");
+		}
+	}
+
+	public void setChainedOutputs(List<String> chainedOutputs) {
+		config.setBytes(CHAINED_OUTPUTS,
+				SerializationUtils.serialize((Serializable) chainedOutputs));
+	}
+
+	@SuppressWarnings("unchecked")
+	public List<String> getChainedOutputs(ClassLoader cl) {
+		try {
+			return (List<String>) InstantiationUtil.readObjectFromConfig(this.config,
+					CHAINED_OUTPUTS, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate chained outputs.");
+		}
+	}
+
+	public void setTransitiveChainedTaskConfigs(Map<String, StreamConfig> chainedTaskConfigs) {
+		config.setBytes(CHAINED_TASK_CONFIG,
+				SerializationUtils.serialize((Serializable) chainedTaskConfigs));
+	}
+
+	@SuppressWarnings("unchecked")
+	public Map<String, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) {
+		try {
+
+			Map<String, StreamConfig> confs = (Map<String, StreamConfig>) InstantiationUtil
+					.readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
+
+			return confs == null ? new HashMap<String, StreamConfig>() : confs;
+		} catch (Exception e) {
+			throw new RuntimeException("Could not instantiate configuration.");
+		}
+	}
+
+	public void setChainStart() {
+		config.setBoolean(IS_CHAINED_VERTEX, true);
+	}
+
+	public boolean isChainStart() {
+		return config.getBoolean(IS_CHAINED_VERTEX, false);
+	}
+
+	@Override
+	public String toString() {
+
+		ClassLoader cl = getClass().getClassLoader();
+
+		StringBuilder builder = new StringBuilder();
+		builder.append("\n=======================");
+		builder.append("Stream Config");
+		builder.append("=======================");
+		builder.append("\nTask name: " + getTaskName());
+		builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
+		builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
+		builder.append("\nOutput names: " + getOutputs(cl));
+		builder.append("\nPartitioning:");
+		for (String outputname : getOutputs(cl)) {
+			builder.append("\n\t" + outputname + ": "
+					+ getPartitioner(cl, outputname).getClass().getSimpleName());
+		}
+
+		builder.append("\nChained subtasks: " + getChainedOutputs(cl));
+
+		try {
+			builder.append("\nInvokable: " + getUserInvokable(cl).getClass().getSimpleName());
+		} catch (Exception e) {
+			builder.append("\nInvokable: Missing");
+		}
+		builder.append("\nBuffer timeout: " + getBufferTimeout());
+		if (isChainStart() && getChainedOutputs(cl).size() > 0) {
+			builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
+			builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
+		}
+
+		return builder.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
new file mode 100644
index 0000000..b5e43af
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.compiler.plan.StreamingPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
+import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
+import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
+import org.apache.flink.streaming.api.streamvertex.StreamVertex;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.state.OperatorState;
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Object for building Apache Flink stream processing graphs
+ */
+public class StreamGraph extends StreamingPlan {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
+	private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
+
+	protected boolean chaining = true;
+	private String jobName = DEAFULT_JOB_NAME;
+
+	// Graph attributes
+	private Map<String, Integer> operatorParallelisms;
+	private Map<String, Long> bufferTimeouts;
+	private Map<String, List<String>> outEdgeLists;
+	private Map<String, List<Integer>> outEdgeTypes;
+	private Map<String, List<List<String>>> selectedNames;
+	private Map<String, List<String>> inEdgeLists;
+	private Map<String, List<StreamPartitioner<?>>> outputPartitioners;
+	private Map<String, String> operatorNames;
+	private Map<String, StreamInvokable<?, ?>> invokableObjects;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersIn1;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
+	private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
+	private Map<String, Class<? extends AbstractInvokable>> jobVertexClasses;
+	private Map<String, List<OutputSelector<?>>> outputSelectors;
+	private Map<String, Integer> iterationIds;
+	private Map<Integer, String> iterationIDtoHeadName;
+	private Map<Integer, String> iterationIDtoTailName;
+	private Map<String, Integer> iterationTailCount;
+	private Map<String, Long> iterationTimeouts;
+	private Map<String, Map<String, OperatorState<?>>> operatorStates;
+	private Map<String, InputFormat<String, ?>> inputFormatLists;
+
+	private Set<String> sources;
+
+	public StreamGraph() {
+
+		initGraph();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("StreamGraph created");
+		}
+	}
+
+	public void initGraph() {
+		operatorParallelisms = new HashMap<String, Integer>();
+		bufferTimeouts = new HashMap<String, Long>();
+		outEdgeLists = new HashMap<String, List<String>>();
+		outEdgeTypes = new HashMap<String, List<Integer>>();
+		selectedNames = new HashMap<String, List<List<String>>>();
+		inEdgeLists = new HashMap<String, List<String>>();
+		outputPartitioners = new HashMap<String, List<StreamPartitioner<?>>>();
+		operatorNames = new HashMap<String, String>();
+		invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
+		typeSerializersIn1 = new HashMap<String, StreamRecordSerializer<?>>();
+		typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>();
+		typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>();
+		typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>();
+		outputSelectors = new HashMap<String, List<OutputSelector<?>>>();
+		jobVertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
+		iterationIds = new HashMap<String, Integer>();
+		iterationIDtoHeadName = new HashMap<Integer, String>();
+		iterationIDtoTailName = new HashMap<Integer, String>();
+		iterationTailCount = new HashMap<String, Integer>();
+		iterationTimeouts = new HashMap<String, Long>();
+		operatorStates = new HashMap<String, Map<String, OperatorState<?>>>();
+		inputFormatLists = new HashMap<String, InputFormat<String, ?>>();
+		sources = new HashSet<String>();
+	}
+
+	/**
+	 * Adds a vertex to the streaming graph with the given parameters
+	 * 
+	 * @param vertexName
+	 *            Name of the vertex
+	 * @param invokableObject
+	 *            User defined operator
+	 * @param inTypeInfo
+	 *            Input type for serialization
+	 * @param outTypeInfo
+	 *            Output type for serialization
+	 * @param operatorName
+	 *            Operator type
+	 * @param parallelism
+	 *            Number of parallel instances created
+	 */
+	public <IN, OUT> void addStreamVertex(String vertexName,
+			StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo,
+			TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) {
+
+		addVertex(vertexName, StreamVertex.class, invokableObject, operatorName, parallelism);
+
+		StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
+				inTypeInfo) : null;
+		StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
+				outTypeInfo) : null;
+
+		addTypeSerializers(vertexName, inSerializer, null, outSerializer, null);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Vertex: {}", vertexName);
+		}
+	}
+
+	public <IN, OUT> void addSourceVertex(String vertexName,
+			StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo,
+			TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) {
+		addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName,
+				parallelism);
+		sources.add(vertexName);
+	}
+
+	/**
+	 * Adds a vertex for the iteration head to the {@link JobGraph}. The
+	 * iterated values will be fed from this vertex back to the graph.
+	 * 
+	 * @param vertexName
+	 *            Name of the vertex
+	 * @param iterationHead
+	 *            Id of the iteration head
+	 * @param iterationID
+	 *            ID of iteration for multiple iterations
+	 * @param parallelism
+	 *            Number of parallel instances created
+	 * @param waitTime
+	 *            Max wait time for next record
+	 */
+	public void addIterationHead(String vertexName, String iterationHead, Integer iterationID,
+			int parallelism, long waitTime) {
+
+		addVertex(vertexName, StreamIterationHead.class, null, null, parallelism);
+
+		chaining = false;
+
+		iterationIds.put(vertexName, iterationID);
+		iterationIDtoHeadName.put(iterationID, vertexName);
+
+		setSerializersFrom(iterationHead, vertexName);
+
+		setEdge(vertexName, iterationHead,
+				outputPartitioners.get(inEdgeLists.get(iterationHead).get(0)).get(0), 0,
+				new ArrayList<String>());
+
+		iterationTimeouts.put(iterationIDtoHeadName.get(iterationID), waitTime);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("ITERATION SOURCE: {}", vertexName);
+		}
+
+		sources.add(vertexName);
+	}
+
+	/**
+	 * Adds a vertex for the iteration tail to the {@link JobGraph}. The values
+	 * intended to be iterated will be sent to this sink from the iteration
+	 * head.
+	 * 
+	 * @param vertexName
+	 *            Name of the vertex
+	 * @param iterationTail
+	 *            Id of the iteration tail
+	 * @param iterationID
+	 *            ID of iteration for mulitple iterations
+	 * @param parallelism
+	 *            Number of parallel instances created
+	 * @param waitTime
+	 *            Max waiting time for next record
+	 */
+	public void addIterationTail(String vertexName, String iterationTail, Integer iterationID,
+			long waitTime) {
+
+		if (bufferTimeouts.get(iterationTail) == 0) {
+			throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
+		}
+
+		addVertex(vertexName, StreamIterationTail.class, null, null, getParallelism(iterationTail));
+
+		iterationIds.put(vertexName, iterationID);
+		iterationIDtoTailName.put(iterationID, vertexName);
+
+		setSerializersFrom(iterationTail, vertexName);
+		iterationTimeouts.put(iterationIDtoTailName.get(iterationID), waitTime);
+
+		setParallelism(iterationIDtoHeadName.get(iterationID), getParallelism(iterationTail));
+		setBufferTimeout(iterationIDtoHeadName.get(iterationID), bufferTimeouts.get(iterationTail));
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("ITERATION SINK: {}", vertexName);
+		}
+
+	}
+
+	public <IN1, IN2, OUT> void addCoTask(String vertexName,
+			CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo,
+			TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo,
+			String operatorName, int parallelism) {
+
+		addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism);
+
+		addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo),
+				new StreamRecordSerializer<IN2>(in2TypeInfo), new StreamRecordSerializer<OUT>(
+						outTypeInfo), null);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("CO-TASK: {}", vertexName);
+		}
+	}
+
+	/**
+	 * Sets vertex parameters in the JobGraph
+	 * 
+	 * @param vertexName
+	 *            Name of the vertex
+	 * @param vertexClass
+	 *            The class of the vertex
+	 * @param invokableObjectject
+	 *            The user defined invokable object
+	 * @param operatorName
+	 *            Type of the user defined operator
+	 * @param parallelism
+	 *            Number of parallel instances created
+	 */
+	private void addVertex(String vertexName, Class<? extends AbstractInvokable> vertexClass,
+			StreamInvokable<?, ?> invokableObject, String operatorName, int parallelism) {
+
+		jobVertexClasses.put(vertexName, vertexClass);
+		setParallelism(vertexName, parallelism);
+		invokableObjects.put(vertexName, invokableObject);
+		operatorNames.put(vertexName, operatorName);
+		outEdgeLists.put(vertexName, new ArrayList<String>());
+		outEdgeTypes.put(vertexName, new ArrayList<Integer>());
+		selectedNames.put(vertexName, new ArrayList<List<String>>());
+		outputSelectors.put(vertexName, new ArrayList<OutputSelector<?>>());
+		inEdgeLists.put(vertexName, new ArrayList<String>());
+		outputPartitioners.put(vertexName, new ArrayList<StreamPartitioner<?>>());
+		iterationTailCount.put(vertexName, 0);
+	}
+
+	/**
+	 * Connects two vertices in the JobGraph using the selected partitioner
+	 * settings
+	 * 
+	 * @param upStreamVertexName
+	 *            Name of the upstream(output) vertex
+	 * @param downStreamVertexName
+	 *            Name of the downstream(input) vertex
+	 * @param partitionerObject
+	 *            Partitioner object
+	 * @param typeNumber
+	 *            Number of the type (used at co-functions)
+	 * @param outputNames
+	 *            User defined names of the out edge
+	 */
+	public void setEdge(String upStreamVertexName, String downStreamVertexName,
+			StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) {
+		outEdgeLists.get(upStreamVertexName).add(downStreamVertexName);
+		outEdgeTypes.get(upStreamVertexName).add(typeNumber);
+		inEdgeLists.get(downStreamVertexName).add(upStreamVertexName);
+		outputPartitioners.get(upStreamVertexName).add(partitionerObject);
+		selectedNames.get(upStreamVertexName).add(outputNames);
+	}
+
+	private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1,
+			StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out1,
+			StreamRecordSerializer<?> out2) {
+		typeSerializersIn1.put(vertexName, in1);
+		typeSerializersIn2.put(vertexName, in2);
+		typeSerializersOut1.put(vertexName, out1);
+		typeSerializersOut2.put(vertexName, out2);
+	}
+
+	/**
+	 * Sets the number of parallel instances created for the given vertex.
+	 * 
+	 * @param vertexName
+	 *            Name of the vertex
+	 * @param parallelism
+	 *            Number of parallel instances created
+	 */
+	public void setParallelism(String vertexName, int parallelism) {
+		operatorParallelisms.put(vertexName, parallelism);
+	}
+
+	public int getParallelism(String vertexName) {
+		return operatorParallelisms.get(vertexName);
+	}
+
+	/**
+	 * Sets the input format for the given vertex.
+	 * 
+	 * @param vertexName
+	 *            Name of the vertex
+	 * @param inputFormat
+	 *            input format of the file source associated with the given
+	 *            vertex
+	 */
+	public void setInputFormat(String vertexName, InputFormat<String, ?> inputFormat) {
+		inputFormatLists.put(vertexName, inputFormat);
+	}
+
+	public void setBufferTimeout(String vertexName, long bufferTimeout) {
+		this.bufferTimeouts.put(vertexName, bufferTimeout);
+	}
+
+	public long getBufferTimeout(String vertexName) {
+		return this.bufferTimeouts.get(vertexName);
+	}
+
+	public void addOperatorState(String veretxName, String stateName, OperatorState<?> state) {
+		Map<String, OperatorState<?>> states = operatorStates.get(veretxName);
+		if (states == null) {
+			states = new HashMap<String, OperatorState<?>>();
+			states.put(stateName, state);
+		} else {
+			if (states.containsKey(stateName)) {
+				throw new RuntimeException("State has already been registered with this name: "
+						+ stateName);
+			} else {
+				states.put(stateName, state);
+			}
+		}
+		operatorStates.put(veretxName, states);
+	}
+
+	/**
+	 * Sets a user defined {@link OutputSelector} for the given operator. Used
+	 * for directed emits.
+	 * 
+	 * @param vertexName
+	 *            Name of the vertex for which the output selector will be set
+	 * @param outputSelector
+	 *            The user defined output selector.
+	 */
+	public <T> void setOutputSelector(String vertexName, OutputSelector<T> outputSelector) {
+		outputSelectors.get(vertexName).add(outputSelector);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Outputselector set for {}", vertexName);
+		}
+
+	}
+
+	public <IN, OUT> void setInvokable(String vertexName, StreamInvokable<IN, OUT> invokableObject) {
+		invokableObjects.put(vertexName, invokableObject);
+	}
+
+	public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
+		StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType);
+		typeSerializersOut1.put(id, serializer);
+	}
+
+	public StreamInvokable<?, ?> getInvokable(String vertexName) {
+		return invokableObjects.get(vertexName);
+	}
+
+	@SuppressWarnings("unchecked")
+	public <OUT> StreamRecordSerializer<OUT> getOutSerializer1(String vertexName) {
+		return (StreamRecordSerializer<OUT>) typeSerializersOut1.get(vertexName);
+	}
+
+	@SuppressWarnings("unchecked")
+	public <OUT> StreamRecordSerializer<OUT> getOutSerializer2(String vertexName) {
+		return (StreamRecordSerializer<OUT>) typeSerializersOut2.get(vertexName);
+	}
+
+	@SuppressWarnings("unchecked")
+	public <IN> StreamRecordSerializer<IN> getInSerializer1(String vertexName) {
+		return (StreamRecordSerializer<IN>) typeSerializersIn1.get(vertexName);
+	}
+
+	@SuppressWarnings("unchecked")
+	public <IN> StreamRecordSerializer<IN> getInSerializer2(String vertexName) {
+		return (StreamRecordSerializer<IN>) typeSerializersIn2.get(vertexName);
+	}
+
+	/**
+	 * Sets TypeSerializerWrapper from one vertex to another, used with some
+	 * sinks.
+	 * 
+	 * @param from
+	 *            from
+	 * @param to
+	 *            to
+	 */
+	public void setSerializersFrom(String from, String to) {
+		operatorNames.put(to, operatorNames.get(from));
+
+		typeSerializersIn1.put(to, typeSerializersOut1.get(from));
+		typeSerializersIn2.put(to, typeSerializersOut2.get(from));
+		typeSerializersOut1.put(to, typeSerializersOut1.get(from));
+		typeSerializersOut2.put(to, typeSerializersOut2.get(from));
+	}
+
+	/**
+	 * Gets the assembled {@link JobGraph} and adds a default name for it.
+	 */
+	public JobGraph getJobGraph() {
+		return getJobGraph(jobName);
+	}
+
+	/**
+	 * Gets the assembled {@link JobGraph} and adds a user specified name for
+	 * it.
+	 * 
+	 * @param jobGraphName
+	 *            name of the jobGraph
+	 */
+	public JobGraph getJobGraph(String jobGraphName) {
+
+		this.jobName = jobGraphName;
+		StreamingJobGraphGenerator optimizer = new StreamingJobGraphGenerator(this);
+
+		return optimizer.createJobGraph(jobGraphName);
+	}
+
+	public void setJobName(String jobName) {
+		this.jobName = jobName;
+	}
+
+	public void setChaining(boolean chaining) {
+		this.chaining = chaining;
+	}
+
+	public Collection<String> getSources() {
+		return sources;
+	}
+
+	public List<String> getOutEdges(String vertexName) {
+		return outEdgeLists.get(vertexName);
+	}
+
+	public List<String> getInEdges(String vertexName) {
+		return inEdgeLists.get(vertexName);
+	}
+
+	public List<Integer> getOutEdgeTypes(String vertexName) {
+
+		return outEdgeTypes.get(vertexName);
+	}
+
+	public StreamPartitioner<?> getOutPartitioner(String upStreamVertex, String downStreamVertex) {
+		return outputPartitioners.get(upStreamVertex).get(
+				outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
+	}
+
+	public List<String> getSelectedNames(String upStreamVertex, String downStreamVertex) {
+
+		return selectedNames.get(upStreamVertex).get(
+				outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
+	}
+
+	public Collection<Integer> getIterationIDs() {
+		return new HashSet<Integer>(iterationIds.values());
+	}
+
+	public String getIterationTail(int iterID) {
+		return iterationIDtoTailName.get(iterID);
+	}
+
+	public String getIterationHead(int iterID) {
+		return iterationIDtoHeadName.get(iterID);
+	}
+
+	public Class<? extends AbstractInvokable> getJobVertexClass(String vertexName) {
+		return jobVertexClasses.get(vertexName);
+	}
+
+	public InputFormat<String, ?> getInputFormat(String vertexName) {
+		return inputFormatLists.get(vertexName);
+	}
+
+	public List<OutputSelector<?>> getOutputSelector(String vertexName) {
+		return outputSelectors.get(vertexName);
+	}
+
+	public Map<String, OperatorState<?>> getState(String vertexName) {
+		return operatorStates.get(vertexName);
+	}
+
+	public Integer getIterationID(String vertexName) {
+		return iterationIds.get(vertexName);
+	}
+
+	public long getIterationTimeout(String vertexName) {
+		return iterationTimeouts.get(vertexName);
+	}
+
+	public String getOperatorName(String vertexName) {
+		return operatorNames.get(vertexName);
+	}
+
+	@Override
+	public String getStreamingPlanAsJSON() {
+
+		try {
+			JSONObject json = new JSONObject();
+			JSONArray nodes = new JSONArray();
+
+			json.put("nodes", nodes);
+
+			for (String id : operatorNames.keySet()) {
+				JSONObject node = new JSONObject();
+				nodes.put(node);
+
+				node.put("id", Integer.valueOf(id));
+				node.put("type", getOperatorName(id));
+
+				if (sources.contains(id)) {
+					node.put("pact", "Data Source");
+				} else {
+					node.put("pact", "Data Stream");
+				}
+
+				node.put("contents", getOperatorName(id) + " at "
+						+ getInvokable(id).getUserFunction().getClass().getSimpleName());
+				node.put("parallelism", getParallelism(id));
+
+				int numIn = getInEdges(id).size();
+				if (numIn > 0) {
+
+					JSONArray inputs = new JSONArray();
+					node.put("predecessors", inputs);
+
+					for (int i = 0; i < numIn; i++) {
+
+						String inID = getInEdges(id).get(i);
+
+						JSONObject input = new JSONObject();
+						inputs.put(input);
+
+						input.put("id", Integer.valueOf(inID));
+						input.put("ship_strategy", getOutPartitioner(inID, id).getStrategy());
+						if (i == 0) {
+							input.put("side", "first");
+						} else if (i == 1) {
+							input.put("side", "second");
+						}
+					}
+				}
+
+			}
+			return json.toString();
+		} catch (JSONException e) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("JSON plan creation failed: {}", e);
+			}
+			return "";
+		}
+
+	}
+
+	@Override
+	public void dumpStreamingPlanAsJSON(File file) throws IOException {
+		PrintWriter pw = null;
+		try {
+			pw = new PrintWriter(new FileOutputStream(file), false);
+			pw.write(getStreamingPlanAsJSON());
+			pw.flush();
+
+		} finally {
+			if (pw != null) {
+				pw.close();
+			}
+		}
+	}
+}


Mime
View raw message