nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pvill...@apache.org
Subject nifi git commit: NIFI-3867: Add Expression Language support to HiveConnectionPool properties
Date Mon, 15 May 2017 11:46:46 GMT
Repository: nifi
Updated Branches:
  refs/heads/master bc68eb754 -> 3353865ce


NIFI-3867: Add Expression Language support to HiveConnectionPool properties

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #1783.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3353865c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3353865c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3353865c

Branch: refs/heads/master
Commit: 3353865ce916b8b332ba62782e1d808ffff68d83
Parents: bc68eb7
Author: Matt Burgess <mattyb149@apache.org>
Authored: Thu May 11 12:56:06 2017 -0400
Committer: Pierre Villard <pierre.villard.fr@gmail.com>
Committed: Mon May 15 13:46:29 2017 +0200

----------------------------------------------------------------------
 .../nifi/dbcp/hive/HiveConnectionPool.java      | 28 ++++++-----
 .../org/apache/nifi/util/hive/HiveUtils.java    |  3 ++
 .../nifi/dbcp/hive/HiveConnectionPoolTest.java  | 49 +++++++++++++++++++-
 3 files changed, 68 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3353865c/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
index 64f3027..c3724c3 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
@@ -75,6 +75,7 @@ public class HiveConnectionPool extends AbstractControllerService implements
Hiv
             .defaultValue(null)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(true)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
@@ -83,7 +84,10 @@ public class HiveConnectionPool extends AbstractControllerService implements
Hiv
             .description("A file or comma separated list of files which contains the Hive
configuration (hive-site.xml, e.g.). Without this, Hadoop "
                     + "will search the classpath for a 'hive-site.xml' file or will revert
to a default configuration. Note that to enable authentication "
                     + "with Kerberos e.g., the appropriate properties must be set in the
configuration files. Please see the Hive documentation for more details.")
-            .required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).build();
+            .required(false)
+            .addValidator(HiveUtils.createMultipleFilesExistValidator())
+            .expressionLanguageSupported(true)
+            .build();
 
     public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
             .name("hive-db-user")
@@ -91,6 +95,7 @@ public class HiveConnectionPool extends AbstractControllerService implements
Hiv
             .description("Database user name")
             .defaultValue(null)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
@@ -101,6 +106,7 @@ public class HiveConnectionPool extends AbstractControllerService implements
Hiv
             .required(false)
             .sensitive(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
@@ -111,7 +117,7 @@ public class HiveConnectionPool extends AbstractControllerService implements
Hiv
             .defaultValue("500 millis")
             .required(true)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .sensitive(false)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
@@ -122,7 +128,7 @@ public class HiveConnectionPool extends AbstractControllerService implements
Hiv
             .defaultValue("8")
             .required(true)
             .addValidator(StandardValidators.INTEGER_VALIDATOR)
-            .sensitive(false)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
@@ -183,7 +189,7 @@ public class HiveConnectionPool extends AbstractControllerService implements
Hiv
         final List<ValidationResult> problems = new ArrayList<>();
 
         if (confFileProvided) {
-            final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
+            final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
             final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
             final String keyTab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
             problems.addAll(hiveConfigurator.validate(configFiles, principal, keyTab, validationResourceHolder,
getLogger()));
@@ -211,7 +217,7 @@ public class HiveConnectionPool extends AbstractControllerService implements
Hiv
 
         ComponentLog log = getLogger();
 
-        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
+        final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
         final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
         final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
 
@@ -219,7 +225,7 @@ public class HiveConnectionPool extends AbstractControllerService implements
Hiv
         for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet())
{
             final PropertyDescriptor descriptor = entry.getKey();
             if (descriptor.isDynamic()) {
-                hiveConfig.set(descriptor.getName(), entry.getValue());
+                hiveConfig.set(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue());
             }
         }
 
@@ -237,15 +243,15 @@ public class HiveConnectionPool extends AbstractControllerService implements
Hiv
             getLogger().info("Successfully logged in as principal {} with keytab {}", new
Object[]{principal, keyTab});
 
         }
-        final String user = context.getProperty(DB_USER).getValue();
-        final String passw = context.getProperty(DB_PASSWORD).getValue();
-        final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
-        final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
+        final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
+        final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
+        final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
 
         dataSource = new BasicDataSource();
         dataSource.setDriverClassName(drv);
 
-        final String dburl = context.getProperty(DATABASE_URL).getValue();
+        final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
 
         dataSource.setMaxWait(maxWaitMillis);
         dataSource.setMaxActive(maxTotal);

http://git-wip-us.apache.org/repos/asf/nifi/blob/3353865c/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java
index 3e375f9..2dc67f7 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java
@@ -57,6 +57,9 @@ public class HiveUtils {
      */
     public static Validator createMultipleFilesExistValidator() {
         return (subject, input, context) -> {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input))
{
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression
Language Present").valid(true).build();
+            }
             final String[] files = input.split("\\s*,\\s*");
             for (String filename : files) {
                 try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/3353865c/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
index 0b5cd8f..79bcb7a 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
@@ -19,9 +19,13 @@ package org.apache.nifi.dbcp.hive;
 
 import org.apache.commons.dbcp.BasicDataSource;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockVariableRegistry;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -30,6 +34,8 @@ import java.lang.reflect.Field;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.isA;
@@ -51,7 +57,7 @@ public class HiveConnectionPoolTest {
         when(userGroupInformation.doAs(isA(PrivilegedExceptionAction.class))).thenAnswer(invocation
-> {
             try {
                 return ((PrivilegedExceptionAction) invocation.getArguments()[0]).run();
-            } catch (IOException |Error|RuntimeException|InterruptedException e) {
+            } catch (IOException | Error | RuntimeException | InterruptedException e) {
                 throw e;
             } catch (Throwable e) {
                 throw new UndeclaredThrowableException(e);
@@ -87,4 +93,45 @@ public class HiveConnectionPoolTest {
             throw e;
         }
     }
+
+    @Test
+    public void testExpressionLanguageSupport() throws Exception {
+        final String URL = "jdbc:hive2://localhost:10000/default";
+        final String USER = "user";
+        final String PASS = "pass";
+        final int MAX_CONN = 7;
+        final String MAX_WAIT = "10 sec"; // 10000 milliseconds
+        final String CONF = "/path/to/hive-site.xml";
+        hiveConnectionPool = new HiveConnectionPool();
+
+        Map<PropertyDescriptor, String> props = new HashMap<PropertyDescriptor,
String>() {{
+            put(HiveConnectionPool.DATABASE_URL, "${url}");
+            put(HiveConnectionPool.DB_USER, "${username}");
+            put(HiveConnectionPool.DB_PASSWORD, "${password}");
+            put(HiveConnectionPool.MAX_TOTAL_CONNECTIONS, "${maxconn}");
+            put(HiveConnectionPool.MAX_WAIT_TIME, "${maxwait}");
+            put(HiveConnectionPool.HIVE_CONFIGURATION_RESOURCES, "${hiveconf}");
+        }};
+
+        MockVariableRegistry registry = new MockVariableRegistry();
+        registry.setVariable(new VariableDescriptor("url"), URL);
+        registry.setVariable(new VariableDescriptor("username"), USER);
+        registry.setVariable(new VariableDescriptor("password"), PASS);
+        registry.setVariable(new VariableDescriptor("maxconn"), Integer.toString(MAX_CONN));
+        registry.setVariable(new VariableDescriptor("maxwait"), MAX_WAIT);
+        registry.setVariable(new VariableDescriptor("hiveconf"), CONF);
+
+
+        MockConfigurationContext context = new MockConfigurationContext(props, null, registry);
+        hiveConnectionPool.onConfigured(context);
+
+        Field dataSourceField = HiveConnectionPool.class.getDeclaredField("dataSource");
+        dataSourceField.setAccessible(true);
+        basicDataSource = (BasicDataSource) dataSourceField.get(hiveConnectionPool);
+        assertEquals(URL, basicDataSource.getUrl());
+        assertEquals(USER, basicDataSource.getUsername());
+        assertEquals(PASS, basicDataSource.getPassword());
+        assertEquals(MAX_CONN, basicDataSource.getMaxActive());
+        assertEquals(10000L, basicDataSource.getMaxWait());
+    }
 }


Mime
View raw message