flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] flink git commit: [FLINK4429] Remove redis connector (now in Apache Bahir)
Date Wed, 14 Dec 2016 15:03:41 GMT
Repository: flink
Updated Branches:
  refs/heads/master 79d7e3017 -> 8038ae4c8


http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
deleted file mode 100644
index dc59ba4..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
-import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
-import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisSentinelPool;
-import redis.embedded.RedisCluster;
-import redis.embedded.util.JedisUtil;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.apache.flink.util.NetUtils.getAvailablePort;
-
-public class RedisSentinelClusterTest extends TestLogger {
-
-	private static RedisCluster cluster;
-	private static final String REDIS_MASTER = "master";
-	private static final String TEST_KEY = "testKey";
-	private static final String TEST_VALUE = "testValue";
-	private static final List<Integer> sentinels = Arrays.asList(getAvailablePort(), getAvailablePort());
-	private static final List<Integer> group1 = Arrays.asList(getAvailablePort(), getAvailablePort());
-
-	private JedisSentinelPool jedisSentinelPool;
-	private FlinkJedisSentinelConfig jedisSentinelConfig;
-
-	@BeforeClass
-	public static void setUpCluster(){
-		cluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1)
-			.serverPorts(group1).replicationGroup(REDIS_MASTER, 1)
-			.build();
-		cluster.start();
-	}
-
-	@Before
-	public void setUp() {
-		Set<String> hosts = JedisUtil.sentinelHosts(cluster);
-		jedisSentinelConfig = new FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER)
-			.setSentinels(hosts).build();
-		jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
-			jedisSentinelConfig.getSentinels());
-	}
-
-	@Test
-	public void testRedisSentinelOperation() {
-		RedisCommandsContainer redisContainer = RedisCommandsContainerBuilder.build(jedisSentinelConfig);
-		Jedis jedis = null;
-		try{
-			jedis = jedisSentinelPool.getResource();
-			redisContainer.set(TEST_KEY, TEST_VALUE);
-			assertEquals(TEST_VALUE, jedis.get(TEST_KEY));
-		}finally {
-			if (jedis != null){
-				jedis.close();
-			}
-		}
-	}
-
-	@After
-	public void tearDown() throws IOException {
-		if (jedisSentinelPool != null) {
-			jedisSentinelPool.close();
-		}
-	}
-
-	@AfterClass
-	public static void tearDownCluster() throws IOException {
-		if (!cluster.isActive()) {
-			cluster.stop();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
deleted file mode 100644
index 21f3cca..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import redis.clients.jedis.Jedis;
-
-import static org.junit.Assert.assertEquals;
-
-public class RedisSinkITCase extends RedisITCaseBase {
-
-	private FlinkJedisPoolConfig jedisPoolConfig;
-	private static final Long NUM_ELEMENTS = 20L;
-	private static final String REDIS_KEY = "TEST_KEY";
-	private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
-
-	StreamExecutionEnvironment env;
-
-
-	private Jedis jedis;
-
-	@Before
-	public void setUp(){
-		jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
-			.setHost(REDIS_HOST)
-			.setPort(REDIS_PORT).build();
-		jedis = new Jedis(REDIS_HOST, REDIS_PORT);
-		env = StreamExecutionEnvironment.getExecutionEnvironment();
-	}
-
-	@Test
-	public void testRedisListDataType() throws Exception {
-		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
-		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
-			new RedisCommandMapper(RedisCommand.LPUSH));
-
-		source.addSink(redisSink);
-		env.execute("Test Redis List Data Type");
-
-		assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
-
-		jedis.del(REDIS_KEY);
-	}
-
-	@Test
-	public void testRedisSetDataType() throws Exception {
-		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
-		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
-			new RedisCommandMapper(RedisCommand.SADD));
-
-		source.addSink(redisSink);
-		env.execute("Test Redis Set Data Type");
-
-		assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
-
-		jedis.del(REDIS_KEY);
-	}
-
-	@Test
-	public void testRedisHyperLogLogDataType() throws Exception {
-		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
-		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
-			new RedisCommandMapper(RedisCommand.PFADD));
-
-		source.addSink(redisSink);
-		env.execute("Test Redis Hyper Log Log Data Type");
-
-		assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY)));
-
-		jedis.del(REDIS_KEY);
-	}
-
-	@Test
-	public void testRedisSortedSetDataType() throws Exception {
-		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet());
-		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
-			new RedisAdditionalDataMapper(RedisCommand.ZADD));
-
-		source.addSink(redisSink);
-		env.execute("Test Redis Sorted Set Data Type");
-
-		assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
-
-		jedis.del(REDIS_ADDITIONAL_KEY);
-	}
-
-	@Test
-	public void testRedisHashDataType() throws Exception {
-		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
-		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
-			new RedisAdditionalDataMapper(RedisCommand.HSET));
-
-		source.addSink(redisSink);
-		env.execute("Test Redis Hash Data Type");
-
-		assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
-
-		jedis.del(REDIS_ADDITIONAL_KEY);
-	}
-
-	@After
-	public void tearDown(){
-		if(jedis != null){
-			jedis.close();
-		}
-	}
-
-	private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>>
{
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean running = true;
-
-		@Override
-		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception
{
-			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
-				ctx.collect(new Tuple2<>(REDIS_KEY, "message #" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-	}
-
-	private static class TestSourceFunctionHash implements SourceFunction<Tuple2<String,
String>> {
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean running = true;
-
-		@Override
-		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception
{
-			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
-				ctx.collect(new Tuple2<>("" + i, "message #" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-	}
-
-	private static class TestSourceFunctionSortedSet implements SourceFunction<Tuple2<String,
String>> {
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean running = true;
-
-		@Override
-		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception
{
-			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
-				ctx.collect(new Tuple2<>( "message #" + i, "" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-	}
-
-	public static class RedisCommandMapper implements RedisMapper<Tuple2<String, String>>{
-
-		private RedisCommand redisCommand;
-
-		public RedisCommandMapper(RedisCommand redisCommand){
-			this.redisCommand = redisCommand;
-		}
-
-		@Override
-		public RedisCommandDescription getCommandDescription() {
-			return new RedisCommandDescription(redisCommand);
-		}
-
-		@Override
-		public String getKeyFromData(Tuple2<String, String> data) {
-			return data.f0;
-		}
-
-		@Override
-		public String getValueFromData(Tuple2<String, String> data) {
-			return data.f1;
-		}
-	}
-
-	public static class RedisAdditionalDataMapper implements RedisMapper<Tuple2<String,
String>>{
-
-		private RedisCommand redisCommand;
-
-		public RedisAdditionalDataMapper(RedisCommand redisCommand){
-			this.redisCommand = redisCommand;
-		}
-
-		@Override
-		public RedisCommandDescription getCommandDescription() {
-			return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
-		}
-
-		@Override
-		public String getKeyFromData(Tuple2<String, String> data) {
-			return data.f0;
-		}
-
-		@Override
-		public String getValueFromData(Tuple2<String, String> data) {
-			return data.f1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
deleted file mode 100644
index caf3945..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-
-import redis.clients.jedis.JedisPool;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Before;
-import org.junit.After;
-import org.junit.Test;
-import redis.clients.jedis.JedisPubSub;
-
-import static org.junit.Assert.assertEquals;
-
-public class RedisSinkPublishITCase extends RedisITCaseBase {
-
-	private static final int NUM_ELEMENTS = 20;
-	private static final String REDIS_CHANNEL = "CHANNEL";
-
-	private static final List<String> sourceList = new ArrayList<>();
-	private Thread sinkThread;
-	private PubSub pubSub;
-
-	@Before
-	public void before() throws Exception {
-		pubSub = new PubSub();
-		sinkThread = new Thread(new Subscribe(pubSub));
-	}
-
-	@Test
-	public void redisSinkTest() throws Exception {
-		sinkThread.start();
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
-			.setHost(REDIS_HOST)
-			.setPort(REDIS_PORT).build();
-		DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
-
-		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
new RedisTestMapper());
-
-		source.addSink(redisSink);
-
-		env.execute("Redis Sink Test");
-
-		assertEquals(NUM_ELEMENTS, sourceList.size());
-	}
-
-	@After
-	public void after() throws Exception {
-		pubSub.unsubscribe();
-		sinkThread.join();
-		sourceList.clear();
-	}
-
-	private class Subscribe implements Runnable {
-		private PubSub localPubSub;
-		private Subscribe(PubSub pubSub){
-			this.localPubSub = pubSub;
-		}
-
-		@Override
-		public void run() {
-			JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT);
-			pool.getResource().subscribe(localPubSub, REDIS_CHANNEL);
-		}
-	}
-
-	private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>>
{
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean running = true;
-
-		@Override
-		public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception
{
-			for (int i = 0; i < NUM_ELEMENTS && running; i++) {
-				ctx.collect(new Tuple2<>(REDIS_CHANNEL, "message #" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-	}
-
-	public static class PubSub extends JedisPubSub {
-
-		@Override
-		public void onMessage(String channel, String message) {
-			sourceList.add(message);
-		}
-
-	}
-
-	private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>>{
-
-		@Override
-		public RedisCommandDescription getCommandDescription() {
-			return new RedisCommandDescription(RedisCommand.PUBLISH);
-		}
-
-		@Override
-		public String getKeyFromData(Tuple2<String, String> data) {
-			return data.f0;
-		}
-
-		@Override
-		public String getValueFromData(Tuple2<String, String> data) {
-			return data.f1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
deleted file mode 100644
index 59f59f2..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-import redis.clients.jedis.exceptions.JedisConnectionException;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.fail;
-
-public class RedisSinkTest extends TestLogger {
-
-	@Test(expected=NullPointerException.class)
-	public void shouldThrowNullPointExceptionIfDataMapperIsNull(){
-		new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), null);
-	}
-
-	@Test(expected = NullPointerException.class)
-	public void shouldThrowNullPointerExceptionIfCommandDescriptionIsNull(){
-		new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), new TestMapper(null));
-	}
-
-	@Test(expected = NullPointerException.class)
-	public void shouldThrowNullPointerExceptionIfConfigurationIsNull(){
-		new RedisSink<>(null, new TestMapper(new RedisCommandDescription(RedisCommand.LPUSH)));
-	}
-
-	@Test
-	public void testRedisDownBehavior() throws Exception {
-
-		// create a wrong configuration so that open() fails.
-
-		FlinkJedisPoolConfig wrongJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
-			.setHost("127.0.0.1")
-			.setPort(1234).build();
-
-		testDownBehavior(wrongJedisPoolConfig);
-	}
-
-	@Test
-	public void testRedisClusterDownBehavior() throws Exception {
-
-		Set<InetSocketAddress> hosts = new HashSet<>();
-		hosts.add(new InetSocketAddress("127.0.0.1", 1234));
-
-		// create a wrong configuration so that open() fails.
-
-		FlinkJedisClusterConfig wrongJedisClusterConfig = new FlinkJedisClusterConfig.Builder()
-			.setNodes(hosts)
-			.setTimeout(100)
-			.setMaxIdle(1)
-			.setMaxTotal(1)
-			.setMinIdle(1).build();
-
-		testDownBehavior(wrongJedisClusterConfig);
-	}
-
-	@Test
-	public void testRedisSentinelDownBehavior() throws Exception {
-
-		Set<String> hosts = new HashSet<>();
-		hosts.add("localhost:55095");
-
-		// create a wrong configuration so that open() fails.
-
-		FlinkJedisSentinelConfig wrongJedisSentinelConfig = new FlinkJedisSentinelConfig.Builder()
-			.setMasterName("master")
-			.setSentinels(hosts)
-			.build();
-
-		testDownBehavior(wrongJedisSentinelConfig);
-	}
-
-	private void testDownBehavior(FlinkJedisConfigBase config) throws Exception {
-		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(config,
-			new RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD));
-
-		try {
-			redisSink.open(new Configuration());
-		} catch (Exception e) {
-
-			// search for nested JedisConnectionExceptions
-			// because this is the expected behavior
-
-			Throwable t = e;
-			int depth = 0;
-			while (!(t instanceof JedisConnectionException)) {
-				t = t.getCause();
-				if (t == null || depth++ == 20) {
-					throw e;
-				}
-			}
-		}
-	}
-
-	private class TestMapper implements RedisMapper<Tuple2<String, String>>{
-		private RedisCommandDescription redisCommandDescription;
-
-		public TestMapper(RedisCommandDescription redisCommandDescription){
-			this.redisCommandDescription = redisCommandDescription;
-		}
-		@Override
-		public RedisCommandDescription getCommandDescription() {
-			return redisCommandDescription;
-		}
-
-		@Override
-		public String getKeyFromData(Tuple2<String, String> data) {
-			return data.f0;
-		}
-
-		@Override
-		public String getValueFromData(Tuple2<String, String> data) {
-			return data.f1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
deleted file mode 100644
index ed1d713..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-public class FlinkJedisConfigBaseTest extends TestLogger {
-
-	@Test(expected = IllegalArgumentException.class)
-	public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){
-		new TestConfig(-1, 0, 0, 0);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){
-		new TestConfig(1, -1, 0, 0);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){
-		new TestConfig(0, 0, -1, 0);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){
-		new TestConfig(0, 0, 0, -1);
-	}
-
-	private class TestConfig extends FlinkJedisConfigBase{
-
-		protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) {
-			super(connectionTimeout, maxTotal, maxIdle, minIdle);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
deleted file mode 100644
index 40db578..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-public class JedisClusterConfigTest extends TestLogger {
-
-	@Test(expected = NullPointerException.class)
-	public void shouldThrowNullPointExceptionIfNodeValueIsNull(){
-		FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
-		builder.setMinIdle(0)
-			.setMaxIdle(0)
-			.setMaxTotal(0)
-			.setTimeout(0)
-			.build();
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void shouldThrowIllegalArgumentExceptionIfNodeValuesAreEmpty(){
-		Set<InetSocketAddress> set = new HashSet<>();
-		FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
-		builder.setMinIdle(0)
-			.setMaxIdle(0)
-			.setMaxTotal(0)
-			.setTimeout(0)
-			.setNodes(set)
-			.build();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
deleted file mode 100644
index dc16cfe..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-public class JedisPoolConfigTest extends TestLogger {
-
-	@Test(expected = NullPointerException.class)
-	public void shouldThrowNullPointExceptionIfHostValueIsNull(){
-		FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder();
-		builder.build();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
deleted file mode 100644
index 8445fae..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class JedisSentinelConfigTest extends TestLogger {
-
-	public static final String MASTER_NAME = "test-master";
-
-	@Test(expected = NullPointerException.class)
-	public void shouldThrowNullPointExceptionIfMasterValueIsNull(){
-		FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
-		Set<String> sentinels = new HashSet<>();
-		sentinels.add("127.0.0.1");
-		builder.setSentinels(sentinels).build();
-	}
-
-	@Test(expected = NullPointerException.class)
-	public void shouldThrowNullPointExceptionIfSentinelsValueIsNull(){
-		FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
-		builder.setMasterName(MASTER_NAME).build();
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void shouldThrowNullPointExceptionIfSentinelsValueIsEmpty(){
-		FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
-		Set<String> sentinels = new HashSet<>();
-		builder.setMasterName(MASTER_NAME).setSentinels(sentinels).build();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
deleted file mode 100644
index b0eee48..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.mapper;
-
-import org.apache.flink.streaming.connectors.redis.RedisSinkITCase;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class RedisDataTypeDescriptionTest extends TestLogger {
-
-	@Test(expected=IllegalArgumentException.class)
-	public void shouldThrowExceptionIfAdditionalKeyIsNotGivenForHashDataType(){
-		RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.HSET);
-		redisCommandMapper.getCommandDescription();
-	}
-
-	@Test
-	public void shouldReturnNullForAdditionalDataType(){
-		RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH);
-		RedisCommandDescription redisDataTypeDescription = redisCommandMapper.getCommandDescription();
-		assertEquals(RedisDataType.LIST, redisDataTypeDescription.getCommand().getRedisDataType());
-		assertNull(redisDataTypeDescription.getAdditionalKey());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index dcb33eb..695c34b 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -52,7 +52,6 @@ under the License.
 		<module>flink-connector-twitter</module>
 		<module>flink-connector-nifi</module>
 		<module>flink-connector-cassandra</module>
-		<module>flink-connector-redis</module>
 		<module>flink-connector-filesystem</module>
 	</modules>
 


Mime
View raw message