nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbe...@apache.org
Subject [2/2] nifi git commit: NIFI-4346 Modifying HBase_1_1_2_LookupService to use HBase_1_1_2_ClientService, instead of extend it
Date Sat, 07 Oct 2017 19:37:33 GMT
NIFI-4346 Modifying HBase_1_1_2_LookupService to use HBase_1_1_2_ClientService, instead of
extend it

This closes #2125.

Signed-off-by: Bryan Bende <bbende@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5930c0c2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5930c0c2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5930c0c2

Branch: refs/heads/master
Commit: 5930c0c21246c4365698cfb1c90ebe2c5a446d07
Parents: eb97a68
Author: Bryan Bende <bbende@apache.org>
Authored: Fri Oct 6 14:09:36 2017 -0400
Committer: Bryan Bende <bbende@apache.org>
Committed: Sat Oct 7 15:36:49 2017 -0400

----------------------------------------------------------------------
 .../nifi/hbase/HBase_1_1_2_LookupService.java   | 172 ----------------
 .../hbase/HBase_1_1_2_RecordLookupService.java  | 197 +++++++++++++++++++
 ...org.apache.nifi.controller.ControllerService |   2 +-
 .../hbase/TestHBase_1_1_2_LookupService.java    | 133 -------------
 .../TestHBase_1_1_2_RecordLookupService.java    | 123 ++++++++++++
 .../apache/nifi/hbase/TestLookupProcessor.java  |  47 -----
 .../nifi/hbase/TestRecordLookupProcessor.java   | 118 +++++++++++
 7 files changed, 439 insertions(+), 353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5930c0c2/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java
deleted file mode 100644
index 5f47c31..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_LookupService.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.hbase;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.lookup.LookupFailureException;
-import org.apache.nifi.lookup.LookupService;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Optional;
-import java.util.Set;
-
-@Tags({"hbase", "record", "lookup", "service"})
-@CapabilityDescription(
-    "A lookup service that retrieves one or more columns from HBase based on a supplied rowKey."
-)
-public class HBase_1_1_2_LookupService extends HBase_1_1_2_ClientService implements LookupService<Record>
{
-    private static final Set<String> REQUIRED_KEYS = Collections.singleton("rowKey");
-
-    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
-            .name("hb-lu-table-name")
-            .displayName("Table Name")
-            .description("The name of the table where look ups will be run.")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor RETURN_CFS = new PropertyDescriptor.Builder()
-            .name("hb-lu-return-cfs")
-            .displayName("Column Families")
-            .description("The column families that will be returned.")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor RETURN_QFS = new PropertyDescriptor.Builder()
-            .name("hb-lu-return-qfs")
-            .displayName("Column Qualifiers")
-            .description("The column qualifies that will be returned.")
-            .required(false)
-            .addValidator(Validator.VALID)
-            .build();
-    protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
-            .name("hb-lu-charset")
-            .displayName("Character Set")
-            .description("Specifies the character set of the document data.")
-            .required(true)
-            .defaultValue("UTF-8")
-            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-            .build();
-
-    private String tableName;
-    private List<byte[]> families;
-    private List<byte[]> qualifiers;
-    private Charset charset;
-
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        List<PropertyDescriptor> retVal = new ArrayList<>();
-        retVal.add(TABLE_NAME);
-        retVal.add(RETURN_CFS);
-        retVal.add(RETURN_QFS);
-        retVal.add(CHARSET);
-        return retVal;
-    }
-
-    @Override
-    public Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException
{
-        byte[] rowKey = coordinates.get("rowKey").getBytes();
-        try {
-            Map<String, Object> values = new HashMap<>();
-            try (Table table = getConnection().getTable(TableName.valueOf(tableName))) {
-                Get get = new Get(rowKey);
-                Result result = table.get(get);
-
-                for (byte[] fam : families) {
-                    NavigableMap<byte[], byte[]>  map = result.getFamilyMap(fam);
-                    for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
-                        if (qualifiers.contains(entry.getKey()) || qualifiers.size() == 0)
{
-                            values.put(new String(entry.getKey(), charset), new String(entry.getValue(),
charset));
-                        }
-                    }
-                }
-            }
-
-            if (values.size() > 0) {
-                final List<RecordField> fields = new ArrayList<>();
-                for (String key : values.keySet()) {
-                    fields.add(new RecordField(key, RecordFieldType.STRING.getDataType()));
-                }
-                final RecordSchema schema = new SimpleRecordSchema(fields);
-                return Optional.ofNullable(new MapRecord(schema, values));
-            } else {
-                return Optional.empty();
-            }
-        } catch (IOException e) {
-            getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey")
}, e);
-            throw new LookupFailureException(e);
-        }
-    }
-
-    @Override
-    public Class<?> getValueType() {
-        return Record.class;
-    }
-
-    @Override
-    public Set<String> getRequiredKeys() {
-        return REQUIRED_KEYS;
-    }
-
-    @OnEnabled
-    public void onEnabled(final ConfigurationContext context) throws InitializationException,
IOException, InterruptedException {
-        super.onEnabled(context);
-
-        this.tableName = context.getProperty(TABLE_NAME).getValue();
-        this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
-
-        String families = context.getProperty(RETURN_CFS).getValue();
-        String[] familiesSplit = families.split(",");
-        this.families = new ArrayList<>();
-        for (String fs : familiesSplit) {
-            this.families.add(fs.trim().getBytes());
-        }
-        this.qualifiers = new ArrayList<>();
-        String quals = context.getProperty(RETURN_QFS).getValue();
-
-        if (quals != null && quals.length() > 0) {
-            String[] qualsSplit = quals.split(",");
-            for (String q : qualsSplit) {
-                this.qualifiers.add(q.trim().getBytes());
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5930c0c2/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
new file mode 100644
index 0000000..20dc0d4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_RecordLookupService.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@Tags({"hbase", "record", "lookup", "service"})
+@CapabilityDescription("A lookup service that retrieves one or more columns from HBase and
returns them as a record. The lookup coordinates " +
+        "must contain 'rowKey' which will be the HBase row id.")
+public class HBase_1_1_2_RecordLookupService extends AbstractControllerService implements
LookupService<Record> {
+
+    static final String ROW_KEY_KEY = "rowKey";
+    private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(new
HashSet<>(Arrays.asList(ROW_KEY_KEY)));
+
+    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
+            .name("hbase-client-service")
+            .displayName("HBase Client Service")
+            .description("Specifies the HBase Client Controller Service to use for accessing
HBase.")
+            .required(true)
+            .identifiesControllerService(HBaseClientService.class)
+            .build();
+
+    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("hb-lu-table-name")
+            .displayName("Table Name")
+            .description("The name of the table where look ups will be run.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder()
+            .name("hb-lu-return-cols")
+            .displayName("Columns")
+            .description("A comma-separated list of \\\"<colFamily>:<colQualifier>\\\"
pairs to return when scanning. " +
+                    "To return all columns for a given family, leave off the qualifier such
as \\\"<colFamily1>,<colFamily2>\\\".")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("hb-lu-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set used to decode bytes retrieved from
HBase.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTIES;
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(HBASE_CLIENT_SERVICE);
+        props.add(TABLE_NAME);
+        props.add(RETURN_COLUMNS);
+        props.add(CHARSET);
+        PROPERTIES = Collections.unmodifiableList(props);
+    }
+
+    private String tableName;
+    private List<Column> columns;
+    private Charset charset;
+    private HBaseClientService hBaseClientService;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException
{
+        final String rowKey = coordinates.get(ROW_KEY_KEY);
+        if (StringUtils.isBlank(rowKey)) {
+            return Optional.empty();
+        }
+
+        final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
+        try {
+            final Map<String, Object> values = new HashMap<>();
+            hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, (byte[]
row, ResultCell[] resultCells) ->  {
+                for (final ResultCell cell : resultCells) {
+                    final byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength());
+                    final byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(),
cell.getValueOffset() + cell.getValueLength());
+                    values.put(new String(qualifier, charset), new String(value, charset));
+                }
+            });
+
+            if (values.size() > 0) {
+                final List<RecordField> fields = new ArrayList<>();
+                for (String key : values.keySet()) {
+                    fields.add(new RecordField(key, RecordFieldType.STRING.getDataType()));
+                }
+                final RecordSchema schema = new SimpleRecordSchema(fields);
+                return Optional.ofNullable(new MapRecord(schema, values));
+            } else {
+                return Optional.empty();
+            }
+        } catch (IOException e) {
+            getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey")
}, e);
+            throw new LookupFailureException(e);
+        }
+    }
+
+    @Override
+    public Class<?> getValueType() {
+        return Record.class;
+    }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws InitializationException,
IOException, InterruptedException {
+        this.hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
+        this.tableName = context.getProperty(TABLE_NAME).getValue();
+        this.columns = getColumns(context.getProperty(RETURN_COLUMNS).getValue());
+        this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        this.hBaseClientService = null;
+        this.tableName = null;
+        this.columns = null;
+        this.charset = null;
+    }
+
+    private List<Column> getColumns(final String columnsValue) {
+        final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0]
: columnsValue.split(","));
+
+        final List<Column> columnsList = new ArrayList<>();
+
+        for (final String column : columns) {
+            if (column.contains(":"))  {
+                final String[] parts = column.trim().split(":");
+                final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
+                final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
+                columnsList.add(new Column(cf, cq));
+            } else {
+                final byte[] cf = column.trim().getBytes(StandardCharsets.UTF_8);
+                columnsList.add(new Column(cf, null));
+            }
+        }
+
+        return columnsList;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/5930c0c2/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 1fcd3d9..5087688 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -14,4 +14,4 @@
 # limitations under the License.
 org.apache.nifi.hbase.HBase_1_1_2_ClientService
 org.apache.nifi.hbase.HBase_1_1_2_ClientMapCacheService
-org.apache.nifi.hbase.HBase_1_1_2_LookupService
+org.apache.nifi.hbase.HBase_1_1_2_RecordLookupService

http://git-wip-us.apache.org/repos/asf/nifi/blob/5930c0c2/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_LookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_LookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_LookupService.java
deleted file mode 100644
index 9ca7940..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_LookupService.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.hbase;
-
-import org.apache.nifi.hbase.put.PutColumn;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
-
-@Ignore("This is an integration test. It requires a table with the name guids and a column
family named property. Example" +
-        "Docker Compose configuration:" +
-        "  hbase-docker:\n" +
-        "    container_name: hbase-docker\n" +
-        "    image: \"dajobe/hbase\"\n" +
-        "    ports:\n" +
-        "      - 16010:16010\n" +
-        "      - 2181:2181\n" +
-        "      - 60000:60000\n" +
-        "      - 60010:60010\n" +
-        "      - 60020:60020\n" +
-        "      - 60030:60030\n" +
-        "      - 9090:9090\n" +
-        "      - 9095:9095\n" +
-        "    hostname: hbase-docker")
-public class TestHBase_1_1_2_LookupService {
-
-    TestRunner runner;
-    HBase_1_1_2_LookupService service;
-
-    static final byte[] FAM = "property".getBytes();
-    static final byte[] QUAL1 = "uuid".getBytes();
-    static final byte[] QUAL2 = "uuid2".getBytes();
-
-    static final String TABLE_NAME = "guids";
-
-    @Before
-    public void before() throws Exception {
-        runner = TestRunners.newTestRunner(TestLookupProcessor.class);
-        service = new HBase_1_1_2_LookupService();
-        runner.addControllerService("lookupService", service);
-        runner.setProperty(service, HBaseClientService.ZOOKEEPER_QUORUM, "hbase-docker");
-        runner.setProperty(service, HBaseClientService.ZOOKEEPER_CLIENT_PORT, "2181");
-        runner.setProperty(service, HBaseClientService.ZOOKEEPER_ZNODE_PARENT, "/hbase");
-        runner.setProperty(service, HBaseClientService.HBASE_CLIENT_RETRIES, "3");
-        runner.setProperty(service, HBase_1_1_2_LookupService.TABLE_NAME, TABLE_NAME);
-        runner.setProperty(service, HBase_1_1_2_LookupService.RETURN_CFS, "property");
-        runner.setProperty(service, HBase_1_1_2_LookupService.CHARSET, "UTF-8");
-    }
-
-    @After
-    public void after() throws Exception {
-        service.shutdown();
-    }
-
-    @Test
-    public void testSingleLookup() throws Exception {
-        runner.enableControllerService(service);
-        runner.assertValid(service);
-
-        String uuid = UUID.randomUUID().toString();
-        String rowKey = String.format("x-y-z-%d", Calendar.getInstance().getTimeInMillis());
-
-        PutColumn column = new PutColumn(FAM, QUAL1, uuid.getBytes());
-
-        service.put(TABLE_NAME, rowKey.getBytes(), Arrays.asList(column));
-
-        Map<String, String> lookup = new HashMap<>();
-        lookup.put("rowKey", rowKey);
-        Optional result = service.lookup(lookup);
-
-        Assert.assertNotNull("Result was null", result);
-        Assert.assertNotNull("The value was null", result.get());
-        MapRecord record = (MapRecord)result.get();
-        Assert.assertEquals("The value didn't match.", uuid, record.getAsString("uuid"));
-    }
-
-
-    @Test
-    public void testMultipleLookup() throws Exception {
-        runner.enableControllerService(service);
-        runner.assertValid(service);
-
-        String uuid = UUID.randomUUID().toString();
-        String uuid2 = UUID.randomUUID().toString();
-        String rowKey = String.format("x-y-z-%d", Calendar.getInstance().getTimeInMillis());
-
-        List<PutColumn> columns = new ArrayList<>();
-        columns.add(new PutColumn(FAM, QUAL1, uuid.getBytes()));
-        columns.add(new PutColumn(FAM, QUAL2, uuid2.getBytes()));
-
-        service.put(TABLE_NAME, rowKey.getBytes(), columns);
-
-        Map<String, String> lookup = new HashMap<>();
-        lookup.put("rowKey", rowKey);
-        Optional result = service.lookup(lookup);
-
-        Assert.assertNotNull("Result was null", result);
-        Assert.assertNotNull("The value was null", result.get());
-        Assert.assertTrue("Wrong type.", result.get() instanceof MapRecord);
-        MapRecord record = (MapRecord)result.get();
-        Assert.assertEquals("Qual 1 was wrong", uuid, record.getAsString("uuid"));
-        Assert.assertEquals("Qual 2 was wrong", uuid2, record.getAsString("uuid2"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5930c0c2/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_RecordLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_RecordLookupService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_RecordLookupService.java
new file mode 100644
index 0000000..ab8a37c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_RecordLookupService.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+public class TestHBase_1_1_2_RecordLookupService {
+
+    static final String TABLE_NAME = "guids";
+    static final String ROW = "row1";
+    static final String COLS = "cf1:cq1,cf2:cq2";
+
+    private TestRunner runner;
+    private HBase_1_1_2_RecordLookupService lookupService;
+    private MockHBaseClientService clientService;
+    private TestRecordLookupProcessor testLookupProcessor;
+
+    @Before
+    public void before() throws Exception {
+        testLookupProcessor = new TestRecordLookupProcessor();
+        runner = TestRunners.newTestRunner(testLookupProcessor);
+
+        // setup mock HBaseClientService
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(TABLE_NAME));
+
+        final KerberosProperties kerberosProperties = new KerberosProperties(new File("src/test/resources/krb5.conf"));
+        clientService = new MockHBaseClientService(table, "family", kerberosProperties);
+        runner.addControllerService("clientService", clientService);
+        runner.setProperty(clientService, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
+        runner.enableControllerService(clientService);
+
+        // setup HBase LookupService
+        lookupService = new HBase_1_1_2_RecordLookupService();
+        runner.addControllerService("lookupService", lookupService);
+        runner.setProperty(lookupService, HBase_1_1_2_RecordLookupService.HBASE_CLIENT_SERVICE,
"clientService");
+        runner.setProperty(lookupService, HBase_1_1_2_RecordLookupService.TABLE_NAME, TABLE_NAME);
+        runner.enableControllerService(lookupService);
+
+        // setup test processor
+        runner.setProperty(TestRecordLookupProcessor.HBASE_LOOKUP_SERVICE, "lookupService");
+        runner.setProperty(TestRecordLookupProcessor.HBASE_ROW, ROW);
+    }
+
+    @Test
+    public void testSuccessfulLookupAllColumns() {
+        // setup some staged data in the mock client service
+        final Map<String,String> cells = new HashMap<>();
+        cells.put("cq1", "v1");
+        cells.put("cq2", "v2");
+        clientService.addResult("row1", cells, System.currentTimeMillis());
+
+        // run the processor
+        runner.enqueue("trigger flow file");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_SUCCESS);
+
+        final List<Record> records = testLookupProcessor.getLookedupRecords();
+        Assert.assertNotNull(records);
+        Assert.assertEquals(1, records.size());
+
+        final Record record = records.get(0);
+        Assert.assertEquals("v1", record.getAsString("cq1"));
+        Assert.assertEquals("v2", record.getAsString("cq2"));
+    }
+
+    @Test
+    public void testLookupWithNoResults() {
+        // run the processor
+        runner.enqueue("trigger flow file");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_FAILURE);
+
+        final List<Record> records = testLookupProcessor.getLookedupRecords();
+        Assert.assertNotNull(records);
+        Assert.assertEquals(0, records.size());
+    }
+
+    @Test
+    public void testLookupWhenMissingRowKeyCoordinate() {
+        runner.removeProperty(TestRecordLookupProcessor.HBASE_ROW);
+
+        // run the processor
+        runner.enqueue("trigger flow file");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_FAILURE);
+
+        final List<Record> records = testLookupProcessor.getLookedupRecords();
+        Assert.assertNotNull(records);
+        Assert.assertEquals(0, records.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5930c0c2/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestLookupProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestLookupProcessor.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestLookupProcessor.java
deleted file mode 100644
index 729c6f9..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestLookupProcessor.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.hbase;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class TestLookupProcessor extends AbstractProcessor {
-    static final PropertyDescriptor HBASE_LOOKUP_SERVICE = new PropertyDescriptor.Builder()
-            .name("HBase Lookup Service")
-            .description("HBaseLookupService")
-            .identifiesControllerService(HBase_1_1_2_LookupService.class)
-            .required(true)
-            .build();
-
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException
{
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        List<PropertyDescriptor> propDescs = new ArrayList<>();
-        propDescs.add(HBASE_LOOKUP_SERVICE);
-        return propDescs;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5930c0c2/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestRecordLookupProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestRecordLookupProcessor.java
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestRecordLookupProcessor.java
new file mode 100644
index 0000000..d3df016
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestRecordLookupProcessor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class TestRecordLookupProcessor extends AbstractProcessor {
+
+    static final PropertyDescriptor HBASE_LOOKUP_SERVICE = new PropertyDescriptor.Builder()
+            .name("HBase Lookup Service")
+            .description("HBaseLookupService")
+            .identifiesControllerService(LookupService.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HBASE_ROW = new PropertyDescriptor.Builder()
+            .name("HBase Row Id")
+            .description("The Row Id to Lookup.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All success FlowFiles are routed to this relationship")
+            .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All failed FlowFiles are routed to this relationship")
+            .build();
+
+    private List<Record> lookedupRecords = new ArrayList<>();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        List<PropertyDescriptor> propDescs = new ArrayList<>();
+        propDescs.add(HBASE_LOOKUP_SERVICE);
+        propDescs.add(HBASE_ROW);
+        return propDescs;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException
{
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String rowKey = context.getProperty(HBASE_ROW).getValue();
+
+        final Map<String,String> coordinates = new HashMap<>();
+        coordinates.put(HBase_1_1_2_RecordLookupService.ROW_KEY_KEY, rowKey);
+
+        final LookupService<Record> lookupService = context.getProperty(HBASE_LOOKUP_SERVICE).asControllerService(LookupService.class);
+        try {
+            final Optional<Record> record = lookupService.lookup(coordinates);
+            if (record.isPresent()) {
+                lookedupRecords.add(record.get());
+                session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                session.transfer(flowFile, REL_FAILURE);
+            }
+
+        } catch (LookupFailureException e) {
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+    }
+
+    public List<Record> getLookedupRecords() {
+        return new ArrayList<>(lookedupRecords);
+    }
+
+    public void clearLookedupRecords() {
+        this.lookedupRecords.clear();
+    }
+
+}


Mime
View raw message