nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pvill...@apache.org
Subject nifi git commit: NIFI-5830 - RedisConnectionPoolService does not work with Standalone Redis using non-localhost deployment
Date Thu, 06 Dec 2018 22:31:37 GMT
Repository: nifi
Updated Branches:
  refs/heads/master f1e03b5ed -> 84c32f913


NIFI-5830 - RedisConnectionPoolService does not work with Standalone Redis using non-localhost
deployment

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3176.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/84c32f91
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/84c32f91
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/84c32f91

Branch: refs/heads/master
Commit: 84c32f913780d585ba16960c3f23e83313e1bcab
Parents: f1e03b5
Author: Alexander Bukarev <bukarev@yandex.ru>
Authored: Tue Dec 4 22:15:37 2018 +0300
Committer: Pierre Villard <pierre.villard.fr@gmail.com>
Committed: Thu Dec 6 23:31:25 2018 +0100

----------------------------------------------------------------------
 .../org/apache/nifi/redis/util/RedisUtils.java  | 60 ++++++++++----------
 1 file changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/84c32f91/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
index aed823b..6489fcc 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
@@ -27,11 +27,15 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.redis.RedisType;
 import org.apache.nifi.util.StringUtils;
 import org.springframework.data.redis.connection.RedisClusterConfiguration;
+import org.springframework.data.redis.connection.RedisConfiguration;
+import org.springframework.data.redis.connection.RedisPassword;
 import org.springframework.data.redis.connection.RedisSentinelConfiguration;
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
 import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 import redis.clients.jedis.JedisPoolConfig;
-import redis.clients.jedis.JedisShardInfo;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -264,19 +268,29 @@ public class RedisUtils {
         final Integer timeout = context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
         final JedisPoolConfig poolConfig = createJedisPoolConfig(context);
 
+        final JedisClientConfiguration jedisClientConfiguration = JedisClientConfiguration.builder()
+                .connectTimeout(Duration.ofMillis(timeout))
+                .readTimeout(Duration.ofMillis(timeout))
+                .usePooling()
+                .poolConfig(poolConfig)
+                .build();
         JedisConnectionFactory connectionFactory;
 
         if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) {
-            final JedisShardInfo jedisShardInfo = createJedisShardInfo(connectionString,
timeout, password);
-
             logger.info("Connecting to Redis in standalone mode at " + connectionString);
-            connectionFactory = new JedisConnectionFactory(jedisShardInfo);
+            final String[] hostAndPortSplit = connectionString.split("[:]");
+            final String host = hostAndPortSplit[0].trim();
+            final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
+            final RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host,
port);
+            enrichRedisConfiguration(redisStandaloneConfiguration, dbIndex, password);
+
+            connectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration,
jedisClientConfiguration);
 
         } else if (RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) {
             final String[] sentinels = connectionString.split("[,]");
             final String sentinelMaster = context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue();
             final RedisSentinelConfiguration sentinelConfiguration = new RedisSentinelConfiguration(sentinelMaster,
new HashSet<>(getTrimmedValues(sentinels)));
-            final JedisShardInfo jedisShardInfo = createJedisShardInfo(sentinels[0], timeout,
password);
+            enrichRedisConfiguration(sentinelConfiguration, dbIndex, password);
 
             logger.info("Connecting to Redis in sentinel mode...");
             logger.info("Redis master = " + sentinelMaster);
@@ -285,14 +299,14 @@ public class RedisUtils {
                 logger.info("Redis sentinel at " + sentinel);
             }
 
-            connectionFactory = new JedisConnectionFactory(sentinelConfiguration, poolConfig);
-            connectionFactory.setShardInfo(jedisShardInfo);
+            connectionFactory = new JedisConnectionFactory(sentinelConfiguration, jedisClientConfiguration);
 
         } else {
             final String[] clusterNodes = connectionString.split("[,]");
             final Integer maxRedirects = context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger();
 
             final RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration(getTrimmedValues(clusterNodes));
+            enrichRedisConfiguration(clusterConfiguration, dbIndex, password);
             clusterConfiguration.setMaxRedirects(maxRedirects);
 
             logger.info("Connecting to Redis in clustered mode...");
@@ -300,16 +314,7 @@ public class RedisUtils {
                 logger.info("Redis cluster node at " + clusterNode);
             }
 
-            connectionFactory = new JedisConnectionFactory(clusterConfiguration, poolConfig);
-        }
-
-        connectionFactory.setUsePool(true);
-        connectionFactory.setPoolConfig(poolConfig);
-        connectionFactory.setDatabase(dbIndex);
-        connectionFactory.setTimeout(timeout);
-
-        if (!StringUtils.isBlank(password)) {
-            connectionFactory.setPassword(password);
+            connectionFactory = new JedisConnectionFactory(clusterConfiguration, jedisClientConfiguration);
         }
 
         // need to call this to initialize the pool/connections
@@ -325,20 +330,15 @@ public class RedisUtils {
         return trimmedValues;
     }
 
-    private static JedisShardInfo createJedisShardInfo(final String hostAndPort, final Integer
timeout, final String password) {
-        final String[] hostAndPortSplit = hostAndPort.split("[:]");
-        final String host = hostAndPortSplit[0].trim();
-        final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
-
-        final JedisShardInfo jedisShardInfo = new JedisShardInfo(host, port);
-        jedisShardInfo.setConnectionTimeout(timeout);
-        jedisShardInfo.setSoTimeout(timeout);
-
-        if (!StringUtils.isEmpty(password)) {
-            jedisShardInfo.setPassword(password);
+    private static void enrichRedisConfiguration(final RedisConfiguration redisConfiguration,
+                                                 final Integer dbIndex,
+                                                 final String password) {
+        if (redisConfiguration instanceof RedisConfiguration.WithDatabaseIndex) {
+            ((RedisConfiguration.WithDatabaseIndex) redisConfiguration).setDatabase(dbIndex);
+        }
+        if (redisConfiguration instanceof RedisConfiguration.WithPassword) {
+            ((RedisConfiguration.WithPassword) redisConfiguration).setPassword(RedisPassword.of(password));
         }
-
-        return jedisShardInfo;
     }
 
     private static JedisPoolConfig createJedisPoolConfig(final PropertyContext context) {


Mime
View raw message