flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] flink git commit: [FLINK-3034] Redis Sink Connector
Date Thu, 07 Jul 2016 08:02:50 GMT
Repository: flink
Updated Branches:
  refs/heads/master cb78245ea -> 3ab9e36c7


http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
new file mode 100644
index 0000000..dc59ba4
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisSentinelPool;
+import redis.embedded.RedisCluster;
+import redis.embedded.util.JedisUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public class RedisSentinelClusterTest extends TestLogger {
+
+	private static RedisCluster cluster;
+	private static final String REDIS_MASTER = "master";
+	private static final String TEST_KEY = "testKey";
+	private static final String TEST_VALUE = "testValue";
+	private static final List<Integer> sentinels = Arrays.asList(getAvailablePort(), getAvailablePort());
+	private static final List<Integer> group1 = Arrays.asList(getAvailablePort(), getAvailablePort());
+
+	private JedisSentinelPool jedisSentinelPool;
+	private FlinkJedisSentinelConfig jedisSentinelConfig;
+
+	@BeforeClass
+	public static void setUpCluster(){
+		cluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1)
+			.serverPorts(group1).replicationGroup(REDIS_MASTER, 1)
+			.build();
+		cluster.start();
+	}
+
+	@Before
+	public void setUp() {
+		Set<String> hosts = JedisUtil.sentinelHosts(cluster);
+		jedisSentinelConfig = new FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER)
+			.setSentinels(hosts).build();
+		jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+			jedisSentinelConfig.getSentinels());
+	}
+
+	@Test
+	public void testRedisSentinelOperation() {
+		RedisCommandsContainer redisContainer = RedisCommandsContainerBuilder.build(jedisSentinelConfig);
+		Jedis jedis = null;
+		try{
+			jedis = jedisSentinelPool.getResource();
+			redisContainer.set(TEST_KEY, TEST_VALUE);
+			assertEquals(TEST_VALUE, jedis.get(TEST_KEY));
+		}finally {
+			if (jedis != null){
+				jedis.close();
+			}
+		}
+	}
+
+	@After
+	public void tearDown() throws IOException {
+		if (jedisSentinelPool != null) {
+			jedisSentinelPool.close();
+		}
+	}
+
+	@AfterClass
+	public static void tearDownCluster() throws IOException {
+		if (!cluster.isActive()) {
+			cluster.stop();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
new file mode 100644
index 0000000..237d9e5
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkITCase extends RedisITCaseBase {
+
+	private FlinkJedisPoolConfig jedisPoolConfig;
+	private static final Long NUM_ELEMENTS = 20L;
+	private static final String REDIS_KEY = "TEST_KEY";
+	private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
+
+	StreamExecutionEnvironment env;
+
+
+	private Jedis jedis;
+
+	@Before
+	public void setUp(){
+		jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+			.setHost(REDIS_HOST)
+			.setPort(REDIS_PORT).build();
+		jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+		env = StreamExecutionEnvironment.getExecutionEnvironment();
+	}
+
+	@Test
+	public void testRedisListDataType() throws Exception {
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+			new RedisCommandMapper(RedisCommand.LPUSH));
+
+		source.addSink(redisSink);
+		env.execute("Test Redis List Data Type");
+
+		assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+		jedis.del(REDIS_KEY);
+	}
+
+	@Test
+	public void testRedisSetDataType() throws Exception {
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+			new RedisCommandMapper(RedisCommand.SADD));
+
+		source.addSink(redisSink);
+		env.execute("Test Redis Set Data Type");
+
+		assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+		jedis.del(REDIS_KEY);
+	}
+
+	@Test
+	public void testRedisHyperLogLogDataType() throws Exception {
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+			new RedisCommandMapper(RedisCommand.PFADD));
+
+		source.addSink(redisSink);
+		env.execute("Test Redis Hyper Log Log Data Type");
+
+		assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+		jedis.del(REDIS_KEY);
+	}
+
+	@Test
+	public void testRedisSortedSetDataType() throws Exception {
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet());
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+			new RedisAdditionalDataMapper(RedisCommand.ZADD));
+
+		source.addSink(redisSink);
+		env.execute("Test Redis Sorted Set Data Type");
+
+		assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
+
+		jedis.del(REDIS_ADDITIONAL_KEY);
+	}
+
+	@Test
+	public void testRedisHashDataType() throws Exception {
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+			new RedisAdditionalDataMapper(RedisCommand.HSET));
+
+		source.addSink(redisSink);
+		env.execute("Test Redis Hash Data Type");
+
+		assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+
+		jedis.del(REDIS_ADDITIONAL_KEY);
+	}
+
+	@After
+	public void tearDown(){
+		if(jedis != null){
+			jedis.close();
+		}
+	}
+
+	private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>>
{
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception
{
+			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+				ctx.collect(new Tuple2<>(REDIS_KEY, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	private static class TestSourceFunctionHash implements SourceFunction<Tuple2<String,
String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception
{
+			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+				ctx.collect(new Tuple2<>("" + i, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	private static class TestSourceFunctionSortedSet implements SourceFunction<Tuple2<String,
String>> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception
{
+			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+				ctx.collect(new Tuple2<>( "message #" + i, "" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	public static class RedisCommandMapper implements RedisMapper<Tuple2<String, String>>{
+
+		private RedisCommand redisCommand;
+
+		public RedisCommandMapper(RedisCommand redisCommand){
+			this.redisCommand = redisCommand;
+		}
+
+		@Override
+		public RedisCommandDescription getCommandDescription() {
+			return new RedisCommandDescription(redisCommand);
+		}
+
+		@Override
+		public String getKeyFromData(Tuple2<String, String> data) {
+			return data.f0;
+		}
+
+		@Override
+		public String getValueFromData(Tuple2<String, String> data) {
+			return data.f1;
+		}
+	}
+
+	public static class RedisAdditionalDataMapper implements RedisMapper<Tuple2<String,
String>>{
+
+		private RedisCommand redisCommand;
+
+		public RedisAdditionalDataMapper(RedisCommand redisCommand){
+			this.redisCommand = redisCommand;
+		}
+
+		@Override
+		public RedisCommandDescription getCommandDescription() {
+			return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
+		}
+
+		@Override
+		public String getKeyFromData(Tuple2<String, String> data) {
+			return data.f0;
+		}
+
+		@Override
+		public String getValueFromData(Tuple2<String, String> data) {
+			return data.f1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
new file mode 100644
index 0000000..caf3945
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.JedisPool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import redis.clients.jedis.JedisPubSub;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkPublishITCase extends RedisITCaseBase {
+
+	private static final int NUM_ELEMENTS = 20;
+	private static final String REDIS_CHANNEL = "CHANNEL";
+
+	private static final List<String> sourceList = new ArrayList<>();
+	private Thread sinkThread;
+	private PubSub pubSub;
+
+	@Before
+	public void before() throws Exception {
+		pubSub = new PubSub();
+		sinkThread = new Thread(new Subscribe(pubSub));
+	}
+
+	@Test
+	public void redisSinkTest() throws Exception {
+		sinkThread.start();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+			.setHost(REDIS_HOST)
+			.setPort(REDIS_PORT).build();
+		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
new RedisTestMapper());
+
+		source.addSink(redisSink);
+
+		env.execute("Redis Sink Test");
+
+		assertEquals(NUM_ELEMENTS, sourceList.size());
+	}
+
+	@After
+	public void after() throws Exception {
+		pubSub.unsubscribe();
+		sinkThread.join();
+		sourceList.clear();
+	}
+
+	private class Subscribe implements Runnable {
+		private PubSub localPubSub;
+		private Subscribe(PubSub pubSub){
+			this.localPubSub = pubSub;
+		}
+
+		@Override
+		public void run() {
+			JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT);
+			pool.getResource().subscribe(localPubSub, REDIS_CHANNEL);
+		}
+	}
+
+	private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>>
{
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception
{
+			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+				ctx.collect(new Tuple2<>(REDIS_CHANNEL, "message #" + i));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+
+	public static class PubSub extends JedisPubSub {
+
+		@Override
+		public void onMessage(String channel, String message) {
+			sourceList.add(message);
+		}
+
+	}
+
+	private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>>{
+
+		@Override
+		public RedisCommandDescription getCommandDescription() {
+			return new RedisCommandDescription(RedisCommand.PUBLISH);
+		}
+
+		@Override
+		public String getKeyFromData(Tuple2<String, String> data) {
+			return data.f0;
+		}
+
+		@Override
+		public String getValueFromData(Tuple2<String, String> data) {
+			return data.f1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
new file mode 100644
index 0000000..848af57
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class RedisSinkTest extends TestLogger {
+
+	@Test(expected=NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfDataMapperIsNull(){
+		new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), null);
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointerExceptionIfCommandDescriptionIsNull(){
+		new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), new TestMapper(null));
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointerExceptionIfConfigurationIsNull(){
+		new RedisSink<>(null, new TestMapper(new RedisCommandDescription(RedisCommand.LPUSH)));
+	}
+
+	private class TestMapper implements RedisMapper<Tuple2<String, String>>{
+		private RedisCommandDescription redisCommandDescription;
+
+		public TestMapper(RedisCommandDescription redisCommandDescription){
+			this.redisCommandDescription = redisCommandDescription;
+		}
+		@Override
+		public RedisCommandDescription getCommandDescription() {
+			return redisCommandDescription;
+		}
+
+		@Override
+		public String getKeyFromData(Tuple2<String, String> data) {
+			return data.f0;
+		}
+
+		@Override
+		public String getValueFromData(Tuple2<String, String> data) {
+			return data.f1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
new file mode 100644
index 0000000..ed1d713
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class FlinkJedisConfigBaseTest extends TestLogger {
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){
+		new TestConfig(-1, 0, 0, 0);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){
+		new TestConfig(1, -1, 0, 0);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){
+		new TestConfig(0, 0, -1, 0);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){
+		new TestConfig(0, 0, 0, -1);
+	}
+
+	private class TestConfig extends FlinkJedisConfigBase{
+
+		protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) {
+			super(connectionTimeout, maxTotal, maxIdle, minIdle);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
new file mode 100644
index 0000000..40db578
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisClusterConfigTest extends TestLogger {
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfNodeValueIsNull(){
+		FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+		builder.setMinIdle(0)
+			.setMaxIdle(0)
+			.setMaxTotal(0)
+			.setTimeout(0)
+			.build();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowIllegalArgumentExceptionIfNodeValuesAreEmpty(){
+		Set<InetSocketAddress> set = new HashSet<>();
+		FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+		builder.setMinIdle(0)
+			.setMaxIdle(0)
+			.setMaxTotal(0)
+			.setTimeout(0)
+			.setNodes(set)
+			.build();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
new file mode 100644
index 0000000..dc16cfe
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class JedisPoolConfigTest extends TestLogger {
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfHostValueIsNull(){
+		FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder();
+		builder.build();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
new file mode 100644
index 0000000..8445fae
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisSentinelConfigTest extends TestLogger {
+
+	public static final String MASTER_NAME = "test-master";
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfMasterValueIsNull(){
+		FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+		Set<String> sentinels = new HashSet<>();
+		sentinels.add("127.0.0.1");
+		builder.setSentinels(sentinels).build();
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void shouldThrowNullPointExceptionIfSentinelsValueIsNull(){
+		FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+		builder.setMasterName(MASTER_NAME).build();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void shouldThrowNullPointExceptionIfSentinelsValueIsEmpty(){
+		FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+		Set<String> sentinels = new HashSet<>();
+		builder.setMasterName(MASTER_NAME).setSentinels(sentinels).build();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
new file mode 100644
index 0000000..b0eee48
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.streaming.connectors.redis.RedisSinkITCase;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisDataTypeDescriptionTest extends TestLogger {
+
+	@Test(expected=IllegalArgumentException.class)
+	public void shouldThrowExceptionIfAdditionalKeyIsNotGivenForHashDataType(){
+		RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.HSET);
+		redisCommandMapper.getCommandDescription();
+	}
+
+	@Test
+	public void shouldReturnNullForAdditionalDataType(){
+		RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH);
+		RedisCommandDescription redisDataTypeDescription = redisCommandMapper.getCommandDescription();
+		assertEquals(RedisDataType.LIST, redisDataTypeDescription.getCommand().getRedisDataType());
+		assertNull(redisDataTypeDescription.getAdditionalKey());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/pom.xml b/flink-streaming-connectors/pom.xml
index 91691cf..91213f2 100644
--- a/flink-streaming-connectors/pom.xml
+++ b/flink-streaming-connectors/pom.xml
@@ -46,6 +46,7 @@ under the License.
 		<module>flink-connector-twitter</module>
 		<module>flink-connector-nifi</module>
 		<module>flink-connector-cassandra</module>
+		<module>flink-connector-redis</module>
 	</modules>
 
 	<!-- See main pom.xml for explanation of profiles -->


Mime
View raw message