flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [19/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:41:57 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
new file mode 100644
index 0000000..b31618c
--- /dev/null
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.spargel.java;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.DeltaIterationResultSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+@SuppressWarnings("serial")
+public class SpargelTranslationTest {
+
+	@Test
+	public void testTranslationPlainEdges() {
+		try {
+			final String ITERATION_NAME = "Test Name";
+			
+			final String AGGREGATOR_NAME = "AggregatorName";
+			
+			final String BC_SET_MESSAGES_NAME = "borat messages";
+			
+			final String BC_SET_UPDATES_NAME = "borat updates";
+			;
+			final int NUM_ITERATIONS = 13;
+			
+			final int ITERATION_DOP = 77;
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> bcMessaging = env.fromElements(1L);
+			DataSet<Long> bcUpdate = env.fromElements(1L);
+			
+			DataSet<Tuple2<String, Double>> result;
+			
+			// ------------ construct the test program ------------------
+			{
+				
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+	
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+				
+				
+				VertexCentricIteration<String, Double, Long, ?> vertexIteration = 
+						VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
+				vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging);
+				vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
+				
+				vertexIteration.setName(ITERATION_NAME);
+				vertexIteration.setParallelism(ITERATION_DOP);
+				
+				vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+				
+				result = initialVertices.runOperation(vertexIteration);
+			}
+			
+			
+			// ------------- validate the java program ----------------
+			
+			assertTrue(result instanceof DeltaIterationResultSet);
+			
+			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			
+			// check the basic iteration properties
+			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+			assertEquals(ITERATION_DOP, iteration.getParallelism());
+			assertEquals(ITERATION_NAME, iteration.getName());
+			
+			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+			
+			// validate that the semantic properties are set as they should
+			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+			
+			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+			
+			// validate that the broadcast sets are forwarded
+			assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+			assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
+		try {
+			final String ITERATION_NAME = "Test Name";
+			
+			final String AGGREGATOR_NAME = "AggregatorName";
+			
+			final String BC_SET_MESSAGES_NAME = "borat messages";
+			
+			final String BC_SET_UPDATES_NAME = "borat updates";
+			;
+			final int NUM_ITERATIONS = 13;
+			
+			final int ITERATION_DOP = 77;
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> bcVar = env.fromElements(1L);
+			
+			DataSet<Tuple2<String, Double>> result;
+			
+			// ------------ construct the test program ------------------
+			{
+				
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+	
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+				
+				
+				VertexCentricIteration<String, Double, Long, ?> vertexIteration = 
+						VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
+				vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
+				vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
+				
+				vertexIteration.setName(ITERATION_NAME);
+				vertexIteration.setParallelism(ITERATION_DOP);
+				
+				vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+				
+				result = initialVertices.runOperation(vertexIteration);
+			}
+			
+			
+			// ------------- validate the java program ----------------
+			
+			assertTrue(result instanceof DeltaIterationResultSet);
+			
+			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			
+			// check the basic iteration properties
+			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+			assertEquals(ITERATION_DOP, iteration.getParallelism());
+			assertEquals(ITERATION_NAME, iteration.getName());
+			
+			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+			
+			// validate that the semantic properties are set as they should
+			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+			
+			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+			
+			// validate that the broadcast sets are forwarded
+			assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+			assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static class UpdateFunction extends VertexUpdateFunction<String, Double, Long> {
+
+		@Override
+		public void updateVertex(String vertexKey, Double vertexValue, MessageIterator<Long> inMessages) {}
+	}
+	
+	public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double, Long, Object> {
+
+		@Override
+		public void sendMessages(String vertexKey, Double vertexValue) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
new file mode 100644
index 0000000..166f7a2
--- /dev/null
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.spargel;
+
+import java.io.BufferedReader;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class SpargelConnectedComponentsITCase extends JavaProgramTestBase {
+
+	private static final long SEED = 9487520347802987L;
+	
+	private static final int NUM_VERTICES = 1000;
+	
+	private static final int NUM_EDGES = 10000;
+
+	private String resultPath;
+	
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempFilePath("results");
+	}
+	
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
+		DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
+		
+		DataSet<Tuple2<Long, Long>> edges = edgeString.map(new EdgeParser());
+		
+		DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+		DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
+		
+		result.writeAsCsv(resultPath, "\n", " ");
+		env.execute("Spargel Connected Components");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (BufferedReader reader : getResultReader(resultPath)) {
+			ConnectedComponentsData.checkOddEvenResult(reader);
+		}
+	}
+	
+	public static final class EdgeParser extends RichMapFunction<String, Tuple2<Long, Long>> {
+		public Tuple2<Long, Long> map(String value) {
+			String[] nums = value.split(" ");
+			return new Tuple2<Long, Long>(Long.parseLong(nums[0]), Long.parseLong(nums[1]));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/resources/log4j-test.properties b/flink-staging/flink-spargel/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2fb9345
--- /dev/null
+++ b/flink-staging/flink-spargel/src/test/resources/log4j-test.properties
@@ -0,0 +1,19 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-spargel/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/resources/logback-test.xml b/flink-staging/flink-spargel/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-staging/flink-spargel/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
new file mode 100644
index 0000000..df34b20
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
@@ -0,0 +1,243 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-parent</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-streaming-connectors</artifactId>
+	<name>flink-streaming-connectors</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.10</artifactId>
+			<version>0.8.0</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>net.sf.jopt-simple</groupId>
+					<artifactId>jopt-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-reflect</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.yammer.metrics</groupId>
+					<artifactId>metrics-annotation</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>com.rabbitmq</groupId>
+			<artifactId>amqp-client</artifactId>
+			<version>3.3.1</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flume</groupId>
+			<artifactId>flume-ng-core</artifactId>
+			<version>1.5.0</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-io</groupId>
+					<artifactId>commons-io</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-codec</groupId>
+					<artifactId>commons-codec</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-cli</groupId>
+					<artifactId>commons-cli</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-lang</groupId>
+					<artifactId>commons-lang</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.avro</groupId>
+					<artifactId>avro</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jackson</groupId>
+					<artifactId>jackson-core-asl</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jackson</groupId>
+					<artifactId>jackson-mapper-asl</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.thoughtworks.paranamer</groupId>
+					<artifactId>paranamer</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.tukaani</groupId>
+					<artifactId>xz</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.velocity</groupId>
+					<artifactId>velocity</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-collections</groupId>
+					<artifactId>commons-collections</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.google.code.gson</groupId>
+					<artifactId>gson</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.thrift</groupId>
+					<artifactId>libthrift</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>com.twitter</groupId>
+			<artifactId>hbc-core</artifactId>
+			<version>2.2.0</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.fusesource.leveldbjni</groupId>
+			<artifactId>leveldbjni-all</artifactId>
+			<version>1.8</version>
+		</dependency>
+
+		<dependency>
+			<groupId>redis.clients</groupId>
+			<artifactId>jedis</artifactId>
+			<version>2.4.2</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.code.simple-spring-memcached</groupId>
+			<artifactId>spymemcached</artifactId>
+			<version>2.8.4</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.sling</groupId>
+			<artifactId>org.apache.sling.commons.json</artifactId>
+			<version>2.0.6</version>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
new file mode 100644
index 0000000..1623943
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
+import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+
+public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements
+		GenericSourceFunction<OUT> {
+
+	private static final long serialVersionUID = 1L;
+	protected DeserializationSchema<OUT> schema;
+
+	public ConnectorSource(DeserializationSchema<OUT> schema) {
+		this.schema = schema;
+	}
+
+	@Override
+	public TypeInformation<OUT> getType() {
+		return TypeExtractor.createTypeInfo(DeserializationSchema.class, schema.getClass(), 0,
+				null, null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java
new file mode 100644
index 0000000..9b1de36
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/CustomSerializationDBState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+public abstract class CustomSerializationDBState<K extends Serializable, V extends Serializable> {
+
+	protected DBSerializer<K> keySerializer;
+	protected DBSerializer<V> valueSerializer;
+
+	public CustomSerializationDBState(DBSerializer<K> keySerializer, DBSerializer<V> valueSerializer) {
+		this.keySerializer = keySerializer;
+		this.valueSerializer = valueSerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java
new file mode 100644
index 0000000..a80e20c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBSerializer.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+/**
+ * Interface for custom serialization of keys and values used in external
+ * databases.
+ *
+ * @param <T>
+ *            Type of the key or value to serialize
+ */
+public interface DBSerializer<T extends Serializable> {
+	
+	byte[] write(T object);
+
+	T read(byte[] serializedObject);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java
new file mode 100644
index 0000000..7528182
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBState.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+public interface DBState<K, V> {
+	
+	public void put(K key, V value);
+	
+	public V get(K key);
+	
+	public void remove(K key);
+
+	public void close();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java
new file mode 100644
index 0000000..890d933
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateIterator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+public abstract class DBStateIterator<K extends Serializable, V extends Serializable> {
+
+	public abstract boolean hasNext();
+
+	public abstract K getNextKey();
+
+	public abstract V getNextValue();
+
+	public abstract void next();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java
new file mode 100644
index 0000000..3383330
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DBStateWithIterator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+public interface DBStateWithIterator<K extends Serializable, V extends Serializable> extends DBState<K, V> {
+	
+	public DBStateIterator<K, V> getIterator();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java
new file mode 100644
index 0000000..6b770cb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/DefaultDBSerializer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.SerializationUtils;
+
+public class DefaultDBSerializer<T extends Serializable>  implements DBSerializer<T> {
+	
+	@Override
+	public byte[] write(T object) {
+		return SerializationUtils.serialize(object);
+	}
+
+	@Override
+	public T read(byte[] serializedObject) {
+		return SerializationUtils.deserialize(serializedObject);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java
new file mode 100644
index 0000000..2e8626d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/LevelDBState.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import static org.fusesource.leveldbjni.JniDBFactory.factory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+
+/**
+ * Interface to a LevelDB key-value store.
+ * 
+ * @see <a href="https://code.google.com/p/leveldb/">https://code.google.com/p/leveldb/</a>
+ * 
+ * @param <K>
+ *            Type of key
+ * @param <V>
+ *            Type of value
+ */
+public class LevelDBState<K extends Serializable, V extends Serializable> extends
+		CustomSerializationDBState<K, V> implements DBStateWithIterator<K, V> {
+
+	private DB database;
+
+	public LevelDBState(String dbName, DBSerializer<K> keySerializer,
+			DBSerializer<V> valueSerializer) {
+		super(keySerializer, valueSerializer);
+		Options options = new Options();
+		File file = new File(dbName);
+		options.createIfMissing(true);
+		try {
+			factory.destroy(file, options);
+			database = factory.open(file, options);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	public LevelDBState(String dbName) {
+		this(dbName, new DefaultDBSerializer<K>(), new DefaultDBSerializer<V>());
+	}
+	
+	@Override
+	public void close() {
+		try {
+			database.close();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void put(K key, V value) {
+		database.put(keySerializer.write(key), valueSerializer.write(value));
+	}
+
+	@Override
+	public V get(K key) {
+		byte[] serializedValue = database.get(keySerializer.write(key));
+		if (serializedValue != null) {
+			return valueSerializer.read(serializedValue);
+		} else {
+			throw new RuntimeException("No such entry at key " + key);
+		}
+	}
+
+	@Override
+	public void remove(K key) {
+		database.delete(keySerializer.write(key));
+	}
+
+	@Override
+	public DBStateIterator<K, V> getIterator() {
+		return new LevelDBStateIterator();
+	}
+
+	private class LevelDBStateIterator extends DBStateIterator<K, V> {
+		private DBIterator iterator;
+
+		public LevelDBStateIterator() {
+			this.iterator = database.iterator();
+			this.iterator.seekToFirst();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return iterator.hasNext();
+		}
+
+		@Override
+		public K getNextKey() {
+			return keySerializer.read(iterator.peekNext().getKey());
+		}
+
+		@Override
+		public V getNextValue() {
+			return valueSerializer.read(iterator.peekNext().getValue());
+		}
+
+		@Override
+		public void next() {
+			iterator.next();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java
new file mode 100644
index 0000000..7f06a1f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/MemcachedState.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import net.spy.memcached.MemcachedClient;
+
+/**
+ * Interface to a Memcached key-value cache. It needs a running instance of Memcached.
+ * 
+ * @see <a href="http://memcached.org/">http://memcached.org</a>
+ * 
+ * @param <V>
+ *            Type of value
+ */
+public class MemcachedState<V> implements DBState<String, V> {
+
+	private MemcachedClient memcached;
+
+	public MemcachedState() {
+		try {
+			memcached = new MemcachedClient(new InetSocketAddress("localhost", 11211));
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	public MemcachedState(String hostname, int portNum) {
+		try {
+			memcached = new MemcachedClient(new InetSocketAddress(hostname, portNum));
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void close() {
+		memcached.shutdown();
+	}
+
+	@Override
+	public void put(String key, V value) {
+		memcached.set(key, 0, value);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public V get(String key) {
+		return (V) memcached.get(key);
+	}
+
+	@Override
+	public void remove(String key) {
+		memcached.delete(key.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java
new file mode 100644
index 0000000..b9244c5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/db/RedisState.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.db;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Set;
+
+import redis.clients.jedis.Jedis;
+
+/**
+ * Interface to a Redis key-value cache. It needs a running instance of Redis.
+ * 
+ * @see <a href="http://redis.io/">http://redis.io/</a>
+ * 
+ * @param <K>
+ *            Type of key
+ * @param <V>
+ *            Type of value
+ */
+public class RedisState<K extends Serializable, V extends Serializable> extends
+		CustomSerializationDBState<K, V> implements DBStateWithIterator<K, V> {
+
+	private Jedis jedis;
+
+	public RedisState(DBSerializer<K> keySerializer, DBSerializer<V> valueSerializer) {
+		super(keySerializer, valueSerializer);
+		jedis = new Jedis("localhost");
+	}
+
+	public RedisState() {
+		this(new DefaultDBSerializer<K>(), new DefaultDBSerializer<V>());
+	}
+
+	@Override
+	public void close() {
+		jedis.close();
+	}
+
+	@Override
+	public void put(K key, V value) {
+		jedis.set(keySerializer.write(key), valueSerializer.write(value));
+	}
+
+	@Override
+	public V get(K key) {
+		return valueSerializer.read(jedis.get(keySerializer.write(key)));
+	}
+
+	@Override
+	public void remove(K key) {
+		jedis.del(keySerializer.write(key));
+	}
+
+	@Override
+	public DBStateIterator<K, V> getIterator() {
+		return new RedisStateIterator();
+	}
+
+	private class RedisStateIterator extends DBStateIterator<K, V> {
+
+		private Set<byte[]> set;
+		private Iterator<byte[]> iterator;
+		private byte[] currentKey;
+
+		public RedisStateIterator() {
+			set = jedis.keys(new byte[0]);
+			jedis.keys("*".getBytes()).iterator();
+			iterator = set.iterator();
+			currentKey = iterator.next();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return iterator.hasNext();
+		}
+
+		@Override
+		public K getNextKey() {
+			return keySerializer.read(currentKey);
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public V getNextValue() {
+			return (V) jedis.get(currentKey);
+		}
+
+		@Override
+		public void next() {
+			currentKey = iterator.next();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
new file mode 100644
index 0000000..8a2f2b8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlumeSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
+
+	private transient FlinkRpcClientFacade client;
+	boolean initDone = false;
+	String host;
+	int port;
+	SerializationSchema<IN, byte[]> scheme;
+
+	public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
+		this.host = host;
+		this.port = port;
+		this.scheme = schema;
+	}
+
+	/**
+	 * Receives tuples from the Apache Flink {@link DataStream} and forwards
+	 * them to Apache Flume.
+	 * 
+	 * @param value
+	 *            The tuple arriving from the datastream
+	 */
+	@Override
+	public void invoke(IN value) {
+
+		byte[] data = scheme.serialize(value);
+		client.sendDataToFlume(data);
+
+	}
+
+	private class FlinkRpcClientFacade {
+		private RpcClient client;
+		private String hostname;
+		private int port;
+
+		/**
+		 * Initializes the connection to Apache Flume.
+		 * 
+		 * @param hostname
+		 *            The host
+		 * @param port
+		 *            The port.
+		 */
+		public void init(String hostname, int port) {
+			// Setup the RPC connection
+			this.hostname = hostname;
+			this.port = port;
+			int initCounter = 0;
+			while (true) {
+				if (initCounter >= 90) {
+					throw new RuntimeException("Cannot establish connection with" + port + " at "
+							+ host);
+				}
+				try {
+					this.client = RpcClientFactory.getDefaultInstance(hostname, port);
+				} catch (FlumeException e) {
+					// Wait one second if the connection failed before the next
+					// try
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e1) {
+						if (LOG.isErrorEnabled()) {
+							LOG.error("Interrupted while trying to connect {} at {}", port, host);
+						}
+					}
+				}
+				if (client != null) {
+					break;
+				}
+				initCounter++;
+			}
+			initDone = true;
+		}
+
+		/**
+		 * Sends byte arrays as {@link Event} series to Apache Flume.
+		 * 
+		 * @param data
+		 *            The byte array to send to Apache FLume
+		 */
+		public void sendDataToFlume(byte[] data) {
+			Event event = EventBuilder.withBody(data);
+
+			try {
+				client.append(event);
+
+			} catch (EventDeliveryException e) {
+				// clean up and recreate the client
+				client.close();
+				client = null;
+				client = RpcClientFactory.getDefaultInstance(hostname, port);
+			}
+		}
+
+	}
+
+	@Override
+	public void close() {
+		client.client.close();
+	}
+
+	@Override
+	public void open(Configuration config) {
+		client = new FlinkRpcClientFacade();
+		client.init(host, port);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
new file mode 100644
index 0000000..4f6ec2d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import java.util.List;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.ConnectorSource;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.util.Collector;
+import org.apache.flume.Context;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.source.AvroSource;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.Status;
+
+public class FlumeSource<OUT> extends ConnectorSource<OUT> {
+	private static final long serialVersionUID = 1L;
+
+	String host;
+	String port;
+	volatile boolean finished = false;
+
+	FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema) {
+		super(deserializationSchema);
+		this.host = host;
+		this.port = Integer.toString(port);
+	}
+
+	public class MyAvroSource extends AvroSource {
+		Collector<OUT> collector;
+
+		/**
+		 * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
+		 * {@link DataStream}.
+		 * 
+		 * @param avroEvent
+		 *            The event that should be sent to the dataStream
+		 * @return A {@link Status}.OK message if sending the event was
+		 *         successful.
+		 */
+		@Override
+		public Status append(AvroFlumeEvent avroEvent) {
+			collect(avroEvent);
+			return Status.OK;
+		}
+
+		/**
+		 * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
+		 * {@link DataStream}.
+		 * 
+		 * @param events
+		 *            The events that is sent to the dataStream
+		 * @return A Status.OK message if sending the events was successful.
+		 */
+		@Override
+		public Status appendBatch(List<AvroFlumeEvent> events) {
+			for (AvroFlumeEvent avroEvent : events) {
+				collect(avroEvent);
+			}
+
+			return Status.OK;
+		}
+
+		/**
+		 * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
+		 * {@link DataStream}.
+		 * 
+		 * @param avroEvent
+		 *            The event that is sent to the dataStream
+		 */
+		private void collect(AvroFlumeEvent avroEvent) {
+			byte[] b = avroEvent.getBody().array();
+			OUT out = FlumeSource.this.schema.deserialize(b);
+
+			if (schema.isEndOfStream(out)) {
+				FlumeSource.this.finished = true;
+				this.stop();
+				FlumeSource.this.notifyAll();
+			} else {
+				collector.collect(out);
+			}
+
+		}
+
+	}
+
+	MyAvroSource avroSource;
+
+	/**
+	 * Configures the AvroSource. Also sets the collector so the application can
+	 * use it from outside of the invoke function.
+	 * 
+	 * @param collector
+	 *            The collector used in the invoke function
+	 */
+	public void configureAvroSource(Collector<OUT> collector) {
+
+		avroSource = new MyAvroSource();
+		avroSource.collector = collector;
+		Context context = new Context();
+		context.put("port", port);
+		context.put("bind", host);
+		avroSource.configure(context);
+		// An instance of a ChannelProcessor is required for configuring the
+		// avroSource although it will not be used in this case.
+		ChannelProcessor cp = new ChannelProcessor(null);
+		avroSource.setChannelProcessor(cp);
+	}
+
+	/**
+	 * Configures the AvroSource and runs until the user calls a close function.
+	 * 
+	 * @param collector
+	 *            The Collector for sending data to the datastream
+	 */
+	@Override
+	public void invoke(Collector<OUT> collector) throws Exception {
+		configureAvroSource(collector);
+		avroSource.start();
+		while (!finished) {
+			this.wait();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
new file mode 100644
index 0000000..3cfd7d4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
+
+public class FlumeTopology {
+
+	public static void main(String[] args) throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		@SuppressWarnings("unused")
+		DataStream<String> dataStream1 = env.addSource(
+				new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
+				new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
+
+		env.execute();
+	}
+
+	public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public byte[] serialize(String element) {
+			return element.getBytes();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
new file mode 100644
index 0000000..0f16541
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+ * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files.
+ * 
+ * @param <IN>
+ *            Type of the input elements.
+ * @param <OUT>
+ *            Type of the returned elements.
+ */
+public abstract class JSONParseFlatMap<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	// private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
+
+	/**
+	 * Get the value object associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public Object get(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).get("retValue");
+	}
+
+	/**
+	 * Get the boolean value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public boolean getBoolean(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getBoolean("retValue");
+	}
+
+	/**
+	 * Get the double value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public double getDouble(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getDouble("retValue");
+	}
+
+	/**
+	 * Get the int value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public int getInt(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getInt("retValue");
+	}
+
+	/**
+	 * Get the long value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public long getLong(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getLong("retValue");
+	}
+	
+	/**
+	 * Get the String value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public String getString(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+		
+		return parser.parse(field).getString("retValue");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
new file mode 100644
index 0000000..c1eabbd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+
+/**
+ * A JSONParser contains a JSONObject and provides opportunity to access
+ * embedded fields in JSON code.
+ */
+public class JSONParser {
+
+	private JSONObject originalJO;
+	private String searchedfield;
+	private Object temp;
+
+	/**
+	 * Construct a JSONParser from a string. The string has to be a JSON code
+	 * from which we want to get a field.
+	 * 
+	 * @param jsonText
+	 *            A string which contains a JSON code. String representation of
+	 *            a JSON code.
+	 * @throws JSONException
+	 *             If there is a syntax error in the source string.
+	 */
+	public JSONParser(String jsonText) throws JSONException {
+		originalJO = new JSONObject(jsonText);
+	}
+
+	/**
+	 * 
+	 * Parse the JSON code passed to the constructor to find the given key.
+	 * 
+	 * @param key
+	 *            The key whose value is searched for.
+	 * @return A JSONObject which has only one field called "retValue" and the
+	 *         value associated to it is the searched value. The methods of
+	 *         JSONObject can be used to get the field value in a desired
+	 *         format.
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	public JSONObject parse(String key) throws JSONException {
+		initializeParser(key);
+		parsing();
+		return putResultInJSONObj();
+	}
+
+	/**
+	 * Prepare the fields of the class for the parsing
+	 * 
+	 * @param key
+	 *            The key whose value is searched for.
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	private void initializeParser(String key) throws JSONException {
+		searchedfield = key;
+		temp = new JSONObject(originalJO.toString());
+	}
+
+	/**
+	 * This function goes through the given field and calls the appropriate
+	 * functions to treat the units between the punctuation marks.
+	 * 
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	private void parsing() throws JSONException {
+		StringTokenizer st = new StringTokenizer(searchedfield, ".");
+		while (st.hasMoreTokens()) {
+			find(st.nextToken());
+		}
+	}
+
+	/**
+	 * Search for the next part of the field and update the state if it was
+	 * found.
+	 * 
+	 * @param nextToken
+	 *            The current part of the searched field.
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	private void find(String nextToken) throws JSONException {
+		if (endsWithBracket(nextToken)) {
+			treatAllBracket(nextToken);
+		} else {
+			temp = ((JSONObject) temp).get(nextToken);
+		}
+	}
+
+	/**
+	 * Determine whether the given string ends with a closing square bracket ']'
+	 * 
+	 * @param nextToken
+	 *            The current part of the searched field.
+	 * @return True if the given string ends with a closing square bracket ']'
+	 *         and false otherwise.
+	 */
+	private boolean endsWithBracket(String nextToken) {
+		return nextToken.substring(nextToken.length() - 1).endsWith("]");
+	}
+
+	/**
+	 * Handle (multidimensional) arrays. Treat the square bracket pairs one
+	 * after the other if necessary.
+	 * 
+	 * @param nextToken
+	 *            The current part of the searched field.
+	 * @throws JSONException
+	 *             If the searched element is not found.
+	 */
+	private void treatAllBracket(String nextToken) throws JSONException {
+		List<String> list = Arrays.asList(nextToken.split("\\["));
+		ListIterator<String> iter = list.listIterator();
+
+		temp = ((JSONObject) temp).get(iter.next());
+
+		while (iter.hasNext()) {
+			int index = Integer.parseInt(cutBracket(iter.next()));
+			temp = ((JSONArray) temp).get(index);
+		}
+	}
+
+	/**
+	 * Remove the last character of the string.
+	 * 
+	 * @param string
+	 *            String to modify.
+	 * @return The given string without the last character.
+	 */
+	private String cutBracket(String string) {
+		return string.substring(0, string.length() - 1);
+	}
+
+	/**
+	 * Save the result of the search into a JSONObject.
+	 * 
+	 * @return A special JSONObject which contain only one key. The value
+	 *         associated to this key is the result of the search.
+	 * @throws JSONException
+	 *             If there is a problem creating the JSONObject. (e.g. invalid
+	 *             syntax)
+	 */
+	private JSONObject putResultInJSONObj() throws JSONException {
+		JSONObject jo = new JSONObject();
+		jo.put("retValue", temp);
+		return jo;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
new file mode 100644
index 0000000..9bb87a0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+
+public class KafkaSink<IN, OUT> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private kafka.javaapi.producer.Producer<Integer, OUT> producer;
+	private Properties props;
+	private String topicId;
+	private String brokerAddr;
+	private boolean initDone = false;
+	private SerializationSchema<IN, OUT> scheme;
+
+	public KafkaSink(String topicId, String brokerAddr,
+			SerializationSchema<IN, OUT> serializationSchema) {
+		this.topicId = topicId;
+		this.brokerAddr = brokerAddr;
+		this.scheme = serializationSchema;
+
+	}
+
+	/**
+	 * Initializes the connection to Kafka.
+	 */
+	public void initialize() {
+		props = new Properties();
+
+		props.put("metadata.broker.list", brokerAddr);
+		props.put("serializer.class", "kafka.serializer.StringEncoder");
+		props.put("request.required.acks", "1");
+
+		ProducerConfig config = new ProducerConfig(props);
+		producer = new Producer<Integer, OUT>(config);
+		initDone = true;
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Kafka.
+	 * 
+	 * @param next
+	 *            The incoming data
+	 */
+	@Override
+	public void invoke(IN next) {
+		if (!initDone) {
+			initialize();
+		}
+
+		producer.send(new KeyedMessage<Integer, OUT>(topicId, scheme.serialize(next)));
+
+	}
+
+	@Override
+	public void close() {
+		producer.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
new file mode 100644
index 0000000..7328500
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.ConnectorSource;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.util.Collector;
+
+public class KafkaSource<OUT> extends ConnectorSource<OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private final String zkQuorum;
+	private final String groupId;
+	private final String topicId;
+	private ConsumerConnector consumer;
+
+	OUT outTuple;
+
+	public KafkaSource(String zkQuorum, String groupId, String topicId,
+			DeserializationSchema<OUT> deserializationSchema) {
+		super(deserializationSchema);
+		this.zkQuorum = zkQuorum;
+		this.groupId = groupId;
+		this.topicId = topicId;
+	}
+
+	/**
+	 * Initializes the connection to Kafka.
+	 */
+	private void initializeConnection() {
+		Properties props = new Properties();
+		props.put("zookeeper.connect", zkQuorum);
+		props.put("group.id", groupId);
+		props.put("zookeeper.session.timeout.ms", "2000");
+		props.put("zookeeper.sync.time.ms", "200");
+		props.put("auto.commit.interval.ms", "1000");
+		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
+	}
+
+	/**
+	 * Called to forward the data from the source to the {@link DataStream}.
+	 * 
+	 * @param collector
+	 *            The Collector for sending data to the dataStream
+	 */
+	@Override
+	public void invoke(Collector<OUT> collector) throws Exception {
+
+		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
+				.createMessageStreams(Collections.singletonMap(topicId, 1));
+
+		KafkaStream<byte[], byte[]> stream = consumerMap.get(topicId).get(0);
+		ConsumerIterator<byte[], byte[]> it = stream.iterator();
+
+		while (it.hasNext()) {
+			OUT out = schema.deserialize(it.next().message());
+			if (schema.isEndOfStream(out)) {
+				break;
+			}
+			collector.collect(out);
+		}
+		consumer.shutdown();
+	}
+
+	@Override
+	public void open(Configuration config) {
+		initializeConnection();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
new file mode 100644
index 0000000..7801d56
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
+import org.apache.flink.util.Collector;
+
+public class KafkaTopology {
+
+	public static final class MySource implements SourceFunction<String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Collector<String> collector) throws Exception {
+			for (int i = 0; i < 10; i++) {
+				collector.collect(new String(Integer.toString(i)));
+			}
+			collector.collect(new String("q"));
+
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		@SuppressWarnings("unused")
+		DataStream<String> stream1 = env
+				.addSource(
+						new KafkaSource<String>("localhost:2181", "group", "test",
+								new SimpleStringSchema())).print();
+
+		@SuppressWarnings("unused")
+		DataStream<String> stream2 = env.addSource(new MySource()).addSink(
+				new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringSchema()));
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
new file mode 100644
index 0000000..38c4f5f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import java.io.IOException;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class RMQSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
+
+	private String QUEUE_NAME;
+	private String HOST_NAME;
+	private transient ConnectionFactory factory;
+	private transient Connection connection;
+	private transient Channel channel;
+	private SerializationSchema<IN, byte[]> scheme;
+
+	public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) {
+		this.HOST_NAME = HOST_NAME;
+		this.QUEUE_NAME = QUEUE_NAME;
+		this.scheme = schema;
+	}
+
+	/**
+	 * Initializes the connection to RMQ.
+	 */
+	public void initializeConnection() {
+		factory = new ConnectionFactory();
+		factory.setHost(HOST_NAME);
+		try {
+			connection = factory.newConnection();
+			channel = connection.createChannel();
+			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to RMQ.
+	 * 
+	 * @param value
+	 *            The incoming data
+	 */
+	@Override
+	public void invoke(IN value) {
+		try {
+			byte[] msg = scheme.serialize(value);
+
+			channel.basicPublish("", QUEUE_NAME, null, msg);
+
+		} catch (IOException e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+			}
+		}
+
+	}
+
+	/**
+	 * Closes the connection.
+	 */
+	private void closeChannel() {
+		try {
+			channel.close();
+			connection.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+					+ " at " + HOST_NAME, e);
+		}
+
+	}
+
+	@Override
+	public void open(Configuration config) {
+		initializeConnection();
+	}
+
+	@Override
+	public void close() {
+		closeChannel();
+	}
+
+}


Mime
View raw message