flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [23/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.
Date Fri, 02 Dec 2016 13:35:13 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
new file mode 100644
index 0000000..dc5396a
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+/**
+ * The builder for {@link RedisCommandsContainer}.
+ */
+public class RedisCommandsContainerBuilder {
+
+	/**
+	 * Initialize the {@link RedisCommandsContainer} based on the instance type.
+	 * @param flinkJedisConfigBase configuration base
+	 * @return @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
+     */
+	public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){
+		if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){
+			FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase;
+			return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
+		} else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) {
+			FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase;
+			return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
+		} else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
+			FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase;
+			return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
+		} else {
+			throw new IllegalArgumentException("Jedis configuration not found");
+		}
+	}
+
+	/**
+	 * Builds container for single Redis environment.
+	 *
+	 * @param jedisPoolConfig configuration for JedisPool
+	 * @return container for single Redis environment
+	 * @throws NullPointerException if jedisPoolConfig is null
+	 */
+	public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
+		Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null");
+
+		GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+		genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
+		genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
+		genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
+
+		JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
+			jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
+			jedisPoolConfig.getDatabase());
+		return new RedisContainer(jedisPool);
+	}
+
+	/**
+	 * Builds container for Redis Cluster environment.
+	 *
+	 * @param jedisClusterConfig configuration for JedisCluster
+	 * @return container for Redis Cluster environment
+	 * @throws NullPointerException if jedisClusterConfig is null
+	 */
+	public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) {
+		Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null");
+
+		GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+		genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
+		genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
+		genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
+
+		JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
+			jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
+		return new RedisClusterContainer(jedisCluster);
+	}
+
+	/**
+	 * Builds container for Redis Sentinel environment.
+	 *
+	 * @param jedisSentinelConfig configuration for JedisSentinel
+	 * @return container for Redis sentinel environment
+	 * @throws NullPointerException if jedisSentinelConfig is null
+	 */
+	public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
+		Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
+
+		GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+		genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
+		genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
+		genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
+
+		JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+			jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
+			jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
+			jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
+		return new RedisContainer(jedisSentinelPool);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
new file mode 100644
index 0000000..ba4bbda
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server or to Redis sentinels
+ * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, please use the second constructor {@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class);
+
+	private transient JedisPool jedisPool;
+	private transient JedisSentinelPool jedisSentinelPool;
+
+	/**
+	 * Use this constructor if to connect with single Redis server.
+	 *
+	 * @param jedisPool JedisPool which actually manages Jedis instances
+	 */
+	public RedisContainer(JedisPool jedisPool) {
+		Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null");
+		this.jedisPool = jedisPool;
+		this.jedisSentinelPool = null;
+	}
+
+	/**
+	 * Use this constructor if Redis environment is clustered with sentinels.
+	 *
+	 * @param sentinelPool SentinelPool which actually manages Jedis instances
+	 */
+	public RedisContainer(final JedisSentinelPool sentinelPool) {
+		Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null");
+		this.jedisPool = null;
+		this.jedisSentinelPool = sentinelPool;
+	}
+
+	/**
+	 * Closes the Jedis instances.
+	 */
+	@Override
+	public void close() throws IOException {
+		if (this.jedisPool != null) {
+			this.jedisPool.close();
+		}
+		if (this.jedisSentinelPool != null) {
+			this.jedisSentinelPool.close();
+		}
+	}
+
+	@Override
+	public void open() throws Exception {
+
+		// echo() tries to open a connection and echos back the
+		// message passed as argument. Here we use it to monitor
+		// if we can communicate with the cluster.
+
+		getInstance().echo("Test");
+	}
+
+	@Override
+	public void hset(final String key, final String hashField, final String value) {
+		Jedis jedis = null;
+		try {
+			jedis = getInstance();
+			jedis.hset(key, hashField, value);
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}",
+					key, hashField, e.getMessage());
+			}
+			throw e;
+		} finally {
+			releaseInstance(jedis);
+		}
+	}
+
+	@Override
+	public void rpush(final String listName, final String value) {
+		Jedis jedis = null;
+		try {
+			jedis = getInstance();
+			jedis.rpush(listName, value);
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}",
+					listName, e.getMessage());
+			}
+			throw e;
+		} finally {
+			releaseInstance(jedis);
+		}
+	}
+
+	@Override
+	public void lpush(String listName, String value) {
+		Jedis jedis = null;
+		try {
+			jedis = getInstance();
+			jedis.lpush(listName, value);
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Cannot send Redis message with command LUSH to list {} error message {}",
+					listName, e.getMessage());
+			}
+			throw e;
+		} finally {
+			releaseInstance(jedis);
+		}
+	}
+
+	@Override
+	public void sadd(final String setName, final String value) {
+		Jedis jedis = null;
+		try {
+			jedis = getInstance();
+			jedis.sadd(setName, value);
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}",
+					setName, e.getMessage());
+			}
+			throw e;
+		} finally {
+			releaseInstance(jedis);
+		}
+	}
+
+	@Override
+	public void publish(final String channelName, final String message) {
+		Jedis jedis = null;
+		try {
+			jedis = getInstance();
+			jedis.publish(channelName, message);
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}",
+					channelName, e.getMessage());
+			}
+			throw e;
+		} finally {
+			releaseInstance(jedis);
+		}
+	}
+
+	@Override
+	public void set(final String key, final String value) {
+		Jedis jedis = null;
+		try {
+			jedis = getInstance();
+			jedis.set(key, value);
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Cannot send Redis message with command SET to key {} error message {}",
+					key, e.getMessage());
+			}
+			throw e;
+		} finally {
+			releaseInstance(jedis);
+		}
+	}
+
+	@Override
+	public void pfadd(final String key, final String element) {
+		Jedis jedis = null;
+		try {
+			jedis = getInstance();
+			jedis.pfadd(key, element);
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Cannot send Redis message with command PFADD to key {} error message {}",
+					key, e.getMessage());
+			}
+			throw e;
+		} finally {
+			releaseInstance(jedis);
+		}
+	}
+
+	@Override
+	public void zadd(final String key, final String score, final String element) {
+		Jedis jedis = null;
+		try {
+			jedis = getInstance();
+			jedis.zadd(key, Double.valueOf(score), element);
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Cannot send Redis message with command ZADD to set {} error message {}",
+					key, e.getMessage());
+			}
+			throw e;
+		} finally {
+			releaseInstance(jedis);
+		}
+	}
+
+	/**
+	 * Returns Jedis instance from the pool.
+	 *
+	 * @return the Jedis instance
+     */
+	private Jedis getInstance() {
+		if (jedisSentinelPool != null) {
+			return jedisSentinelPool.getResource();
+		} else {
+			return jedisPool.getResource();
+		}
+	}
+
+	/**
+	 * Closes the jedis instance after finishing the command.
+	 *
+	 * @param jedis The jedis instance
+     */
+	private void releaseInstance(final Jedis jedis) {
+		if (jedis == null) {
+			return;
+		}
+		try {
+			jedis.close();
+		} catch (Exception e) {
+			LOG.error("Failed to close (return) instance to pool", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
new file mode 100644
index 0000000..b0661c7
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link RedisDataType} group.
+ */
+public enum RedisCommand {
+
+	/**
+	 * Insert the specified value at the head of the list stored at key.
+	 * If key does not exist, it is created as empty list before performing the push operations.
+	 */
+	LPUSH(RedisDataType.LIST),
+
+	/**
+	 * Insert the specified value at the tail of the list stored at key.
+	 * If key does not exist, it is created as empty list before performing the push operation.
+	 */
+	RPUSH(RedisDataType.LIST),
+
+	/**
+	 * Add the specified member to the set stored at key.
+	 * Specified member that is already a member of this set is ignored.
+	 */
+	SADD(RedisDataType.SET),
+
+	/**
+	 * Set key to hold the string value. If key already holds a value,
+	 * it is overwritten, regardless of its type.
+	 */
+	SET(RedisDataType.STRING),
+
+	/**
+	 * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
+	 */
+	PFADD(RedisDataType.HYPER_LOG_LOG),
+
+	/**
+	 * Posts a message to the given channel.
+	 */
+	PUBLISH(RedisDataType.PUBSUB),
+
+	/**
+	 * Adds the specified members with the specified score to the sorted set stored at key.
+	 */
+	ZADD(RedisDataType.SORTED_SET),
+
+	/**
+	 * Sets field in the hash stored at key to value. If key does not exist,
+	 * a new key holding a hash is created. If field already exists in the hash, it is overwritten.
+	 */
+	HSET(RedisDataType.HASH);
+
+	/**
+	 * The {@link RedisDataType} this command belongs to.
+	 */
+	private RedisDataType redisDataType;
+
+	RedisCommand(RedisDataType redisDataType) {
+		this.redisDataType = redisDataType;
+	}
+
+
+	/**
+	 * The {@link RedisDataType} this command belongs to.
+	 * @return the {@link RedisDataType}
+	 */
+	public RedisDataType getRedisDataType(){
+		return redisDataType;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
new file mode 100644
index 0000000..1eea48a
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * The description of the command type. This must be passed while creating new {@link RedisMapper}.
+ * <p>When creating descriptor for the group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET},
+ * you need to use first constructor {@link #RedisCommandDescription(RedisCommand, String)}.
+ * If the {@code additionalKey} is {@code null} it will throw {@code IllegalArgumentException}
+ *
+ * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
+ * you can use second constructor {@link #RedisCommandDescription(RedisCommand)}
+ */
+public class RedisCommandDescription implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private RedisCommand redisCommand;
+
+	/**
+	 * This additional key is needed for the group {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+	 * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
+	 * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
+	 * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
+	 * {@link #getAdditionalKey()} used as hash name for {@link RedisDataType#HASH}
+	 * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
+	 * {@link #getAdditionalKey()} used as set name for {@link RedisDataType#SORTED_SET}
+	 */
+	private String additionalKey;
+
+	/**
+	 * Use this constructor when data type is {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
+	 * If different data type is specified, {@code additionalKey} is ignored.
+	 * @param redisCommand the redis command type {@link RedisCommand}
+	 * @param additionalKey additional key for Hash and Sorted set data type
+	 */
+	public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
+		Preconditions.checkNotNull(redisCommand, "Redis command type can not be null");
+		this.redisCommand = redisCommand;
+		this.additionalKey = additionalKey;
+
+		if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
+			redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) {
+			if (additionalKey == null) {
+				throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
+			}
+		}
+	}
+
+	/**
+	 * Use this constructor when command type is not in group {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
+	 *
+	 * @param redisCommand the redis data type {@link RedisCommand}
+	 */
+	public RedisCommandDescription(RedisCommand redisCommand) {
+		this(redisCommand, null);
+	}
+
+	/**
+	 * Returns the {@link RedisCommand}.
+	 *
+	 * @return the command type of the mapping
+	 */
+	public RedisCommand getCommand() {
+		return redisCommand;
+	}
+
+	/**
+	 * Returns the additional key if data type is {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+	 *
+	 * @return the additional key
+	 */
+	public String getAdditionalKey() {
+		return additionalKey;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
new file mode 100644
index 0000000..6e3997c
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available data type for Redis.
+ */
+public enum RedisDataType {
+
+	/**
+	 * Strings are the most basic kind of Redis value. Redis Strings are binary safe,
+	 * this means that a Redis string can contain any kind of data, for instance a JPEG image or a serialized Ruby object.
+	 * A String value can be at max 512 Megabytes in length.
+	 */
+	STRING,
+
+	/**
+	 * Redis Hashes are maps between string fields and string values.
+	 */
+	HASH,
+
+	/**
+	 * Redis Lists are simply lists of strings, sorted by insertion order.
+	 */
+	LIST,
+
+	/**
+	 * Redis Sets are an unordered collection of Strings.
+	 */
+	SET,
+
+	/**
+	 * Redis Sorted Sets are, similarly to Redis Sets, non repeating collections of Strings.
+	 * The difference is that every member of a Sorted Set is associated with score,
+	 * that is used in order to take the sorted set ordered, from the smallest to the greatest score.
+	 * While members are unique, scores may be repeated.
+	 */
+	SORTED_SET,
+
+	/**
+	 * HyperLogLog is a probabilistic data structure used in order to count unique things.
+	 */
+	HYPER_LOG_LOG,
+
+	/**
+	 * Redis implementation of publish and subscribe paradigm. Published messages are characterized into channels,
+	 * without knowledge of what (if any) subscribers there may be.
+	 * Subscribers express interest in one or more channels, and only receive messages
+	 * that are of interest, without knowledge of what (if any) publishers there are.
+	 */
+	PUBSUB
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
new file mode 100644
index 0000000..63fed19
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates the description how the input data should be mapped to redis type.
+ *<p>Example:
+ *<pre>{@code
+ *private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>> {
+ *    public RedisDataTypeDescription getCommandDescription() {
+ *        return new RedisDataTypeDescription(RedisCommand.PUBLISH);
+ *    }
+ *    public String getKeyFromData(Tuple2<String, String> data) {
+ *        return data.f0;
+ *    }
+ *    public String getValueFromData(Tuple2<String, String> data) {
+ *        return data.f1;
+ *    }
+ *}
+ *}</pre>
+ *
+ * @param <T> The type of the element handled by this {@code RedisMapper}
+ */
+public interface RedisMapper<T> extends Function, Serializable {
+
+	/**
+	 * Returns descriptor which defines data type.
+	 *
+	 * @return data type descriptor
+	 */
+	RedisCommandDescription getCommandDescription();
+
+	/**
+	 * Extracts key from data.
+	 *
+	 * @param data source data
+	 * @return key
+	 */
+	String getKeyFromData(T data);
+
+	/**
+	 * Extracts value from data.
+	 *
+	 * @param data source data
+	 * @return value
+	 */
+	String getValueFromData(T data);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
new file mode 100644
index 0000000..7d98f2d
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import redis.embedded.RedisServer;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public abstract class RedisITCaseBase extends StreamingMultipleProgramsTestBase {
+
+	public static final int REDIS_PORT = getAvailablePort();
+	public static final String REDIS_HOST = "127.0.0.1";
+
+	private static RedisServer redisServer;
+
+	@BeforeClass
+	public static void createRedisServer() throws IOException, InterruptedException {
+		redisServer = new RedisServer(REDIS_PORT);
+		redisServer.start();
+	}
+
+	@AfterClass
+	public static void stopRedisServer(){
+		redisServer.stop();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
new file mode 100644
index 0000000..dc59ba4
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisSentinelPool;
+import redis.embedded.RedisCluster;
+import redis.embedded.util.JedisUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public class RedisSentinelClusterTest extends TestLogger {
+
+	private static RedisCluster cluster;
+	private static final String REDIS_MASTER = "master";
+	private static final String TEST_KEY = "testKey";
+	private static final String TEST_VALUE = "testValue";
+	private static final List<Integer> sentinels = Arrays.asList(getAvailablePort(), getAvailablePort());
+	private static final List<Integer> group1 = Arrays.asList(getAvailablePort(), getAvailablePort());
+
+	private JedisSentinelPool jedisSentinelPool;
+	private FlinkJedisSentinelConfig jedisSentinelConfig;
+
+	@BeforeClass
+	public static void setUpCluster(){
+		cluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1)
+			.serverPorts(group1).replicationGroup(REDIS_MASTER, 1)
+			.build();
+		cluster.start();
+	}
+
+	@Before
+	public void setUp() {
+		Set<String> hosts = JedisUtil.sentinelHosts(cluster);
+		jedisSentinelConfig = new FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER)
+			.setSentinels(hosts).build();
+		jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+			jedisSentinelConfig.getSentinels());
+	}
+
+	@Test
+	public void testRedisSentinelOperation() {
+		RedisCommandsContainer redisContainer = RedisCommandsContainerBuilder.build(jedisSentinelConfig);
+		Jedis jedis = null;
+		try{
+			jedis = jedisSentinelPool.getResource();
+			redisContainer.set(TEST_KEY, TEST_VALUE);
+			assertEquals(TEST_VALUE, jedis.get(TEST_KEY));
+		}finally {
+			if (jedis != null){
+				jedis.close();
+			}
+		}
+	}
+
+	@After
+	public void tearDown() throws IOException {
+		if (jedisSentinelPool != null) {
+			jedisSentinelPool.close();
+		}
+	}
+
+	@AfterClass
+	public static void tearDownCluster() throws IOException {
+		if (!cluster.isActive()) {
+			cluster.stop();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
new file mode 100644
index 0000000..21f3cca
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkITCase extends RedisITCaseBase {
+
+	private FlinkJedisPoolConfig jedisPoolConfig;
+	private static final Long NUM_ELEMENTS = 20L;
+	private static final String REDIS_KEY = "TEST_KEY";
+	private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
+
+	StreamExecutionEnvironment env;
+
+
+	private Jedis jedis;
+
+	@Before
+	public void setUp(){
+		jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+			.setHost(REDIS_HOST)
+			.setPort(REDIS_PORT).build();
+		jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+		env = StreamExecutionEnvironment.getExecutionEnvironment();
+	}
+
+	@Test
+	public void testRedisListDataType() throws Exception {
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+			new RedisCommandMapper(RedisCommand.LPUSH));
+
+		source.addSink(redisSink);
+		env.execute("Test Redis List Data Type");
+
+		assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+		jedis.del(REDIS_KEY);
+	}
+
+	@Test
+	public void testRedisSetDataType() throws Exception {
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+			new RedisCommandMapper(RedisCommand.SADD));
+
+		source.addSink(redisSink);
+		env.execute("Test Redis Set Data Type");
+
+		assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+		jedis.del(REDIS_KEY);
+	}
+
+	@Test
+	public void testRedisHyperLogLogDataType() throws Exception {
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+			new RedisCommandMapper(RedisCommand.PFADD));
+
+		source.addSink(redisSink);
+		env.execute("Test Redis Hyper Log Log Data Type");
+
+		assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+		jedis.del(REDIS_KEY);
+	}
+
+	@Test
+	public void testRedisSortedSetDataType() throws Exception {
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet());
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+			new RedisAdditionalDataMapper(RedisCommand.ZADD));
+
+		source.addSink(redisSink);
+		env.execute("Test Redis Sorted Set Data Type");
+
+		assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
+
+		jedis.del(REDIS_ADDITIONAL_KEY);
+	}
+
+	@Test
+	public void testRedisHashDataType() throws Exception {
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+			new RedisAdditionalDataMapper(RedisCommand.HSET));
+
+		source.addSink(redisSink);
+		env.execute("Test Redis Hash Data Type");
+
+		assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+
+		jedis.del(REDIS_ADDITIONAL_KEY);
+	}
+
+	@After
+	public void tearDown(){
+		if(jedis != null){
+			jedis.close();
+		}
+	}
+
+	private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+				ctx.collect(new Tuple2<>(REDIS_KEY, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	private static class TestSourceFunctionHash implements SourceFunction<Tuple2<String, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+				ctx.collect(new Tuple2<>("" + i, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	private static class TestSourceFunctionSortedSet implements SourceFunction<Tuple2<String, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+				ctx.collect(new Tuple2<>( "message #" + i, "" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	public static class RedisCommandMapper implements RedisMapper<Tuple2<String, String>>{
+
+		private RedisCommand redisCommand;
+
+		public RedisCommandMapper(RedisCommand redisCommand){
+			this.redisCommand = redisCommand;
+		}
+
+		@Override
+		public RedisCommandDescription getCommandDescription() {
+			return new RedisCommandDescription(redisCommand);
+		}
+
+		@Override
+		public String getKeyFromData(Tuple2<String, String> data) {
+			return data.f0;
+		}
+
+		@Override
+		public String getValueFromData(Tuple2<String, String> data) {
+			return data.f1;
+		}
+	}
+
+	public static class RedisAdditionalDataMapper implements RedisMapper<Tuple2<String, String>>{
+
+		private RedisCommand redisCommand;
+
+		public RedisAdditionalDataMapper(RedisCommand redisCommand){
+			this.redisCommand = redisCommand;
+		}
+
+		@Override
+		public RedisCommandDescription getCommandDescription() {
+			return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
+		}
+
+		@Override
+		public String getKeyFromData(Tuple2<String, String> data) {
+			return data.f0;
+		}
+
+		@Override
+		public String getValueFromData(Tuple2<String, String> data) {
+			return data.f1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
new file mode 100644
index 0000000..caf3945
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.JedisPool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import redis.clients.jedis.JedisPubSub;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkPublishITCase extends RedisITCaseBase {
+
+	private static final int NUM_ELEMENTS = 20;
+	private static final String REDIS_CHANNEL = "CHANNEL";
+
+	private static final List<String> sourceList = new ArrayList<>();
+	private Thread sinkThread;
+	private PubSub pubSub;
+
+	@Before
+	public void before() throws Exception {
+		pubSub = new PubSub();
+		sinkThread = new Thread(new Subscribe(pubSub));
+	}
+
+	@Test
+	public void redisSinkTest() throws Exception {
+		sinkThread.start();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+			.setHost(REDIS_HOST)
+			.setPort(REDIS_PORT).build();
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, new RedisTestMapper());
+
+		source.addSink(redisSink);
+
+		env.execute("Redis Sink Test");
+
+		assertEquals(NUM_ELEMENTS, sourceList.size());
+	}
+
+	@After
+	public void after() throws Exception {
+		pubSub.unsubscribe();
+		sinkThread.join();
+		sourceList.clear();
+	}
+
+	private class Subscribe implements Runnable {
+		private PubSub localPubSub;
+		private Subscribe(PubSub pubSub){
+			this.localPubSub = pubSub;
+		}
+
+		@Override
+		public void run() {
+			JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT);
+			pool.getResource().subscribe(localPubSub, REDIS_CHANNEL);
+		}
+	}
+
+	private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+				ctx.collect(new Tuple2<>(REDIS_CHANNEL, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	public static class PubSub extends JedisPubSub {
+
+		@Override
+		public void onMessage(String channel, String message) {
+			sourceList.add(message);
+		}
+
+	}
+
+	private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>>{
+
+		@Override
+		public RedisCommandDescription getCommandDescription() {
+			return new RedisCommandDescription(RedisCommand.PUBLISH);
+		}
+
+		@Override
+		public String getKeyFromData(Tuple2<String, String> data) {
+			return data.f0;
+		}
+
+		@Override
+		public String getValueFromData(Tuple2<String, String> data) {
+			return data.f1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
new file mode 100644
index 0000000..59f59f2
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
+
+public class RedisSinkTest extends TestLogger {
+
+	@Test(expected=NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfDataMapperIsNull(){
+		new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), null);
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointerExceptionIfCommandDescriptionIsNull(){
+		new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), new TestMapper(null));
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointerExceptionIfConfigurationIsNull(){
+		new RedisSink<>(null, new TestMapper(new RedisCommandDescription(RedisCommand.LPUSH)));
+	}
+
+	@Test
+	public void testRedisDownBehavior() throws Exception {
+
+		// create a wrong configuration so that open() fails.
+
+		FlinkJedisPoolConfig wrongJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+			.setHost("127.0.0.1")
+			.setPort(1234).build();
+
+		testDownBehavior(wrongJedisPoolConfig);
+	}
+
+	@Test
+	public void testRedisClusterDownBehavior() throws Exception {
+
+		Set<InetSocketAddress> hosts = new HashSet<>();
+		hosts.add(new InetSocketAddress("127.0.0.1", 1234));
+
+		// create a wrong configuration so that open() fails.
+
+		FlinkJedisClusterConfig wrongJedisClusterConfig = new FlinkJedisClusterConfig.Builder()
+			.setNodes(hosts)
+			.setTimeout(100)
+			.setMaxIdle(1)
+			.setMaxTotal(1)
+			.setMinIdle(1).build();
+
+		testDownBehavior(wrongJedisClusterConfig);
+	}
+
+	@Test
+	public void testRedisSentinelDownBehavior() throws Exception {
+
+		Set<String> hosts = new HashSet<>();
+		hosts.add("localhost:55095");
+
+		// create a wrong configuration so that open() fails.
+
+		FlinkJedisSentinelConfig wrongJedisSentinelConfig = new FlinkJedisSentinelConfig.Builder()
+			.setMasterName("master")
+			.setSentinels(hosts)
+			.build();
+
+		testDownBehavior(wrongJedisSentinelConfig);
+	}
+
+	private void testDownBehavior(FlinkJedisConfigBase config) throws Exception {
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(config,
+			new RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD));
+
+		try {
+			redisSink.open(new Configuration());
+		} catch (Exception e) {
+
+			// search for nested JedisConnectionExceptions
+			// because this is the expected behavior
+
+			Throwable t = e;
+			int depth = 0;
+			while (!(t instanceof JedisConnectionException)) {
+				t = t.getCause();
+				if (t == null || depth++ == 20) {
+					throw e;
+				}
+			}
+		}
+	}
+
+	private class TestMapper implements RedisMapper<Tuple2<String, String>>{
+		private RedisCommandDescription redisCommandDescription;
+
+		public TestMapper(RedisCommandDescription redisCommandDescription){
+			this.redisCommandDescription = redisCommandDescription;
+		}
+		@Override
+		public RedisCommandDescription getCommandDescription() {
+			return redisCommandDescription;
+		}
+
+		@Override
+		public String getKeyFromData(Tuple2<String, String> data) {
+			return data.f0;
+		}
+
+		@Override
+		public String getValueFromData(Tuple2<String, String> data) {
+			return data.f1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
new file mode 100644
index 0000000..ed1d713
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class FlinkJedisConfigBaseTest extends TestLogger {
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){
+		new TestConfig(-1, 0, 0, 0);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){
+		new TestConfig(1, -1, 0, 0);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){
+		new TestConfig(0, 0, -1, 0);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){
+		new TestConfig(0, 0, 0, -1);
+	}
+
+	private class TestConfig extends FlinkJedisConfigBase{
+
+		protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) {
+			super(connectionTimeout, maxTotal, maxIdle, minIdle);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
new file mode 100644
index 0000000..40db578
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisClusterConfigTest extends TestLogger {
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfNodeValueIsNull(){
+		FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+		builder.setMinIdle(0)
+			.setMaxIdle(0)
+			.setMaxTotal(0)
+			.setTimeout(0)
+			.build();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowIllegalArgumentExceptionIfNodeValuesAreEmpty(){
+		Set<InetSocketAddress> set = new HashSet<>();
+		FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+		builder.setMinIdle(0)
+			.setMaxIdle(0)
+			.setMaxTotal(0)
+			.setTimeout(0)
+			.setNodes(set)
+			.build();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
new file mode 100644
index 0000000..dc16cfe
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class JedisPoolConfigTest extends TestLogger {
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfHostValueIsNull(){
+		FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder();
+		builder.build();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
new file mode 100644
index 0000000..8445fae
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisSentinelConfigTest extends TestLogger {
+
+	public static final String MASTER_NAME = "test-master";
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfMasterValueIsNull(){
+		FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+		Set<String> sentinels = new HashSet<>();
+		sentinels.add("127.0.0.1");
+		builder.setSentinels(sentinels).build();
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfSentinelsValueIsNull(){
+		FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+		builder.setMasterName(MASTER_NAME).build();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowNullPointExceptionIfSentinelsValueIsEmpty(){
+		FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+		Set<String> sentinels = new HashSet<>();
+		builder.setMasterName(MASTER_NAME).setSentinels(sentinels).build();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
new file mode 100644
index 0000000..b0eee48
--- /dev/null
+++ b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.streaming.connectors.redis.RedisSinkITCase;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisDataTypeDescriptionTest extends TestLogger {
+
+	@Test(expected=IllegalArgumentException.class)
+	public void shouldThrowExceptionIfAdditionalKeyIsNotGivenForHashDataType(){
+		RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.HSET);
+		redisCommandMapper.getCommandDescription();
+	}
+
+	@Test
+	public void shouldReturnNullForAdditionalDataType(){
+		RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH);
+		RedisCommandDescription redisDataTypeDescription = redisCommandMapper.getCommandDescription();
+		assertEquals(RedisDataType.LIST, redisDataTypeDescription.getCommand().getRedisDataType());
+		assertNull(redisDataTypeDescription.getAdditionalKey());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-twitter/pom.xml b/flink-connectors/flink-connector-twitter/pom.xml
new file mode 100644
index 0000000..27a966f
--- /dev/null
+++ b/flink-connectors/flink-connector-twitter/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-twitter_2.10</artifactId>
+	<name>flink-connector-twitter</name>
+
+	<packaging>jar</packaging>
+
+	<properties>
+		<hbc-core.version>2.2.0</hbc-core.version>
+	</properties>
+
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.twitter</groupId>
+			<artifactId>hbc-core</artifactId>
+			<version>${hbc-core.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<configuration>
+							<artifactSet>
+								<includes combine.children="append">
+									<!-- We include all dependencies that transitively depend on guava -->
+									<include>com.twitter:hbc-core</include>
+									<include>com.twitter:joauth</include>
+									<include>org.apache.httpcomponents:httpclient</include>
+									<include>org.apache.httpcomponents:httpcore</include>
+								</includes>
+							</artifactSet>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
new file mode 100644
index 0000000..66fa237
--- /dev/null
+++ b/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Properties;
+
+import com.twitter.hbc.common.DelimitedStreamReader;
+import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to emit tweets from
+ * Twitter. This is not a parallel source because the Twitter API only allows
+ * two concurrent connections.
+ */
+public class TwitterSource extends RichSourceFunction<String> implements StoppableFunction {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
+
+	private static final long serialVersionUID = 1L;
+
+	// ----- Required property keys
+
+	public static final String CONSUMER_KEY = "twitter-source.consumerKey";
+
+	public static final String CONSUMER_SECRET = "twitter-source.consumerSecret";
+
+	public static final String TOKEN = "twitter-source.token";
+
+	public static final String TOKEN_SECRET = "twitter-source.tokenSecret";
+
+	// ------ Optional property keys
+
+	public static final String CLIENT_NAME = "twitter-source.name";
+
+	public static final String CLIENT_HOSTS = "twitter-source.hosts";
+
+	public static final String CLIENT_BUFFER_SIZE = "twitter-source.bufferSize";
+
+	// ----- Fields set by the constructor
+
+	private final Properties properties;
+
+	private EndpointInitializer initializer = new SampleStatusesEndpoint();
+
+	// ----- Runtime fields
+	private transient BasicClient client;
+	private transient Object waitLock;
+	private transient boolean running = true;
+
+
+	/**
+	 * Create {@link TwitterSource} for streaming
+	 * 
+	 * @param properties For the source
+	 */
+	public TwitterSource(Properties properties) {
+		checkProperty(properties, CONSUMER_KEY);
+		checkProperty(properties, CONSUMER_SECRET);
+		checkProperty(properties, TOKEN);
+		checkProperty(properties, TOKEN_SECRET);
+
+		this.properties = properties;
+	}
+
+	private static void checkProperty(Properties p, String key) {
+		if(!p.containsKey(key)) {
+			throw new IllegalArgumentException("Required property '" + key + "' not set.");
+		}
+	}
+
+
+	/**
+	 * Set a custom endpoint initializer.
+	 */
+	public void setCustomEndpointInitializer(EndpointInitializer initializer) {
+		Objects.requireNonNull(initializer, "Initializer has to be set");
+		ClosureCleaner.ensureSerializable(initializer);
+		this.initializer = initializer;
+	}
+
+	// ----- Source lifecycle
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		waitLock = new Object();
+	}
+
+
+	@Override
+	public void run(final SourceContext<String> ctx) throws Exception {
+		LOG.info("Initializing Twitter Streaming API connection");
+
+		StreamingEndpoint endpoint = initializer.createEndpoint();
+
+		Authentication auth = new OAuth1(properties.getProperty(CONSUMER_KEY),
+			properties.getProperty(CONSUMER_SECRET),
+			properties.getProperty(TOKEN),
+			properties.getProperty(TOKEN_SECRET));
+
+		client = new ClientBuilder()
+			.name(properties.getProperty(CLIENT_NAME, "flink-twitter-source"))
+			.hosts(properties.getProperty(CLIENT_HOSTS, Constants.STREAM_HOST))
+			.endpoint(endpoint)
+			.authentication(auth)
+			.processor(new HosebirdMessageProcessor() {
+				public DelimitedStreamReader reader;
+
+				@Override
+				public void setup(InputStream input) {
+					reader = new DelimitedStreamReader(input, Constants.DEFAULT_CHARSET, Integer.parseInt(properties.getProperty(CLIENT_BUFFER_SIZE, "50000")));
+				}
+
+				@Override
+				public boolean process() throws IOException, InterruptedException {
+					String line = reader.readLine();
+					ctx.collect(line);
+					return true;
+				}
+			})
+			.build();
+
+		client.connect();
+		running = true;
+
+		LOG.info("Twitter Streaming API connection established successfully");
+
+		// just wait now
+		while(running) {
+			synchronized (waitLock) {
+				waitLock.wait(100L);
+			}
+		}
+	}
+
+	@Override
+	public void close() {
+		this.running = false;
+		LOG.info("Closing source");
+		if (client != null) {
+			// client seems to be thread-safe
+			client.stop();
+		}
+		// leave main method
+		synchronized (waitLock) {
+			waitLock.notify();
+		}
+	}
+
+	@Override
+	public void cancel() {
+		LOG.info("Cancelling Twitter source");
+		close();
+	}
+
+	@Override
+	public void stop() {
+		LOG.info("Stopping Twitter source");
+		close();
+	}
+
+	// ------ Custom endpoints
+
+	/**
+	 * Implementing this interface allows users of this source to set a custom endpoint.
+	 */
+	public interface EndpointInitializer {
+		StreamingEndpoint createEndpoint();
+	}
+
+	/**
+	 * Default endpoint initializer returning the {@see StatusesSampleEndpoint}.
+	 */
+	private static class SampleStatusesEndpoint implements EndpointInitializer, Serializable {
+		@Override
+		public StreamingEndpoint createEndpoint() {
+			// this default endpoint initializer returns the sample endpoint: Returning a sample from the firehose (all tweets)
+			StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+			endpoint.stallWarnings(false);
+			endpoint.delimited(false);
+			return endpoint;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/pom.xml b/flink-connectors/flink-hadoop-compatibility/pom.xml
new file mode 100644
index 0000000..5938560
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/pom.xml
@@ -0,0 +1,182 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+	
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-hadoop-compatibility_2.10</artifactId>
+	<name>flink-hadoop-compatibility</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-hadoop2</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+		
+	</dependencies>
+
+
+	<build>
+		<plugins>
+			<!-- activate API compatibility checks -->
+			<plugin>
+				<groupId>com.github.siom79.japicmp</groupId>
+				<artifactId>japicmp-maven-plugin</artifactId>
+			</plugin>
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Scala Code Style, most of the configuration done via plugin management -->
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<configuration>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
new file mode 100644
index 0000000..7bcb4bf
--- /dev/null
+++ b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
+import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+import org.apache.hadoop.io.Writable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Type information for data types that extend Hadoop's {@link Writable} interface. The Writable
+ * interface defines the serialization and deserialization routines for the data type.
+ *
+ * @param <T> The type of the class represented by this type information.
+ */
+@Public
+public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private final Class<T> typeClass;
+
+	@PublicEvolving
+	public WritableTypeInfo(Class<T> typeClass) {
+		this.typeClass = checkNotNull(typeClass);
+
+		checkArgument(
+			Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class),
+			"WritableTypeInfo can only be used for subclasses of %s", Writable.class.getName());
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	@PublicEvolving
+	public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
+		if(Comparable.class.isAssignableFrom(typeClass)) {
+			return new WritableComparator(sortOrderAscending, typeClass);
+		}
+		else {
+			throw new UnsupportedOperationException("Cannot create Comparator for "+typeClass.getCanonicalName()+". " +
+													"Class does not implement Comparable interface.");
+		}
+	}
+
+	@Override
+	@PublicEvolving
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	@PublicEvolving
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	@PublicEvolving
+	public int getArity() {
+		return 1;
+	}
+	
+	@Override
+	@PublicEvolving
+	public int getTotalFields() {
+		return 1;
+	}
+
+	@Override
+	@PublicEvolving
+	public Class<T> getTypeClass() {
+		return this.typeClass;
+	}
+
+	@Override
+	@PublicEvolving
+	public boolean isKeyType() {
+		return Comparable.class.isAssignableFrom(typeClass);
+	}
+
+	@Override
+	@PublicEvolving
+	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
+		return new WritableSerializer<T>(typeClass);
+	}
+	
+	@Override
+	public String toString() {
+		return "WritableType<" + typeClass.getName() + ">";
+	}	
+	
+	@Override
+	public int hashCode() {
+		return typeClass.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof WritableTypeInfo) {
+			@SuppressWarnings("unchecked")
+			WritableTypeInfo<T> writableTypeInfo = (WritableTypeInfo<T>) obj;
+
+			return writableTypeInfo.canEqual(this) &&
+				typeClass == writableTypeInfo.typeClass;
+
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof WritableTypeInfo;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	@PublicEvolving
+	static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass) {
+		if (Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class)) {
+			return new WritableTypeInfo<T>(typeClass);
+		}
+		else {
+			throw new InvalidTypesException("The given class is no subclass of " + Writable.class.getName());
+		}
+	}
+	
+}


Mime
View raw message