nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joew...@apache.org
Subject [3/6] nifi git commit: NIFI-3863: Initial implementation of Lookup Services. Implemented LookupRecord processors. This required some refactoring of RecordSetWriter interface, so refactored that interface and all implementations and references of it
Date Fri, 19 May 2017 06:10:05 GMT
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
new file mode 100644
index 0000000..26e78b4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.Tuple;
+
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
+    @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile")
+})
+@Tags({"lookup", "enrich", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"})
+@CapabilityDescription("Extracts a field from a Record and looks up its value in a LookupService. If a result is returned by the LookupService, "
+    + "that result is optionally added to the Record. In this case, the processor functions as an Enrichment processor. Regardless, the Record is then "
+    + "routed to either the 'matched' relationship or 'unmatched' relationship, indicating whether or not a result was returned by the LookupService, "
+    + "allowing the processor to also function as a Routing processor. If any record in the incoming FlowFile has multiple fields match the configured "
+    + "Lookup RecordPath or if no fields match, then that record will be routed to failure. If one or more fields match the Result RecordPath, all fields "
+    + "that match will be updated.")
+@SeeAlso({ConvertRecord.class, SplitRecord.class})
+public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPath>> {
+
+    private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
+    private volatile LookupService<?> lookupService;
+
+    static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
+        .name("lookup-service")
+        .displayName("Lookup Service")
+        .description("The Lookup Service to use in order to lookup a value in each Record")
+        .identifiesControllerService(LookupService.class)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor LOOKUP_RECORD_PATH = new PropertyDescriptor.Builder()
+        .name("lookup-record-path")
+        .displayName("Lookup RecordPath")
+        .description("A RecordPath that points to the field whose value will be looked up in the configured Lookup Service")
+        .addValidator(new RecordPathValidator())
+        .expressionLanguageSupported(true)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor RESULT_RECORD_PATH = new PropertyDescriptor.Builder()
+        .name("result-record-path")
+        .displayName("Result RecordPath")
+        .description("A RecordPath that points to the field whose value should be updated with whatever value is returned from the Lookup Service. "
+            + "If not specified, the value that is returned from the Lookup Service will be ignored, except for determining whether the FlowFile should "
+            + "be routed to the 'matched' or 'unmatched' Relationship.")
+        .addValidator(new RecordPathValidator())
+        .expressionLanguageSupported(true)
+        .required(false)
+        .build();
+
+    static final Relationship REL_MATCHED = new Relationship.Builder()
+        .name("matched")
+        .description("All records for which the lookup returns a value will be routed to this relationship")
+        .build();
+    static final Relationship REL_UNMATCHED = new Relationship.Builder()
+        .name("unmatched")
+        .description("All records for which the lookup does not have a matching value will be routed to this relationship")
+        .build();
+
+    private static final Set<Relationship> MATCHED_COLLECTION = Collections.singleton(REL_MATCHED);
+    private static final Set<Relationship> UNMATCHED_COLLECTION = Collections.singleton(REL_UNMATCHED);
+    private static final Set<Relationship> FAILURE_COLLECTION = Collections.singleton(REL_FAILURE);
+
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        this.lookupService = context.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_MATCHED);
+        relationships.add(REL_UNMATCHED);
+        relationships.add(REL_FAILURE);
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.addAll(super.getSupportedPropertyDescriptors());
+        properties.add(LOOKUP_SERVICE);
+        properties.add(LOOKUP_RECORD_PATH);
+        properties.add(RESULT_RECORD_PATH);
+        return properties;
+    }
+
+    @Override
+    protected Set<Relationship> route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context,
+        final Tuple<RecordPath, RecordPath> flowFileContext) {
+
+        final RecordPathResult lookupPathResult = flowFileContext.getKey().evaluate(record);
+        final List<FieldValue> lookupFieldValues = lookupPathResult.getSelectedFields()
+            .filter(fieldVal -> fieldVal.getValue() != null)
+            .collect(Collectors.toList());
+        if (lookupFieldValues.isEmpty()) {
+            getLogger().error("Lookup RecordPath did not match any fields in a record for {}; routing record to failure", new Object[] {flowFile});
+            return FAILURE_COLLECTION;
+        }
+
+        if (lookupFieldValues.size() > 1) {
+            getLogger().error("Lookup RecordPath matched {} fields in a record for {}; routing record to failure", new Object[] {lookupFieldValues.size(), flowFile});
+            return FAILURE_COLLECTION;
+        }
+
+        final FieldValue fieldValue = lookupFieldValues.get(0);
+        final String lookupKey = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+
+        final Optional<?> lookupValue;
+        try {
+            lookupValue = lookupService.lookup(lookupKey);
+        } catch (final Exception e) {
+            getLogger().error("Failed to lookup value '{}' in Lookup Service for a record in {}; routing record to failure", new Object[] {lookupKey, flowFile, e});
+            return Collections.singleton(REL_FAILURE);
+        }
+
+        if (!lookupValue.isPresent()) {
+            return UNMATCHED_COLLECTION;
+        }
+
+        // Ensure that the Record has the appropriate schema to account for the newly added values
+        final RecordPath resultPath = flowFileContext.getValue();
+        if (resultPath != null) {
+            record.incorporateSchema(writeSchema);
+
+            final Object replacementValue = lookupValue.get();
+            final RecordPathResult resultPathResult = flowFileContext.getValue().evaluate(record);
+            resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue));
+        }
+
+        return MATCHED_COLLECTION;
+    }
+
+    @Override
+    protected boolean isRouteOriginal() {
+        return false;
+    }
+
+    @Override
+    protected Tuple<RecordPath, RecordPath> getFlowFileContext(final FlowFile flowFile, final ProcessContext context) {
+        final String lookupPathText = context.getProperty(LOOKUP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+        final RecordPath lookupRecordPath = recordPathCache.getCompiled(lookupPathText);
+
+        final RecordPath resultRecordPath;
+        if (context.getProperty(RESULT_RECORD_PATH).isSet()) {
+            final String resultPathText = context.getProperty(RESULT_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
+            resultRecordPath = recordPathCache.getCompiled(resultPathText);
+        } else {
+            resultRecordPath = null;
+        }
+
+        return new Tuple<>(lookupRecordPath, resultRecordPath);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index 3cb5cce..5551ed1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -76,6 +76,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
 import org.apache.nifi.util.StopWatch;
 
@@ -248,15 +249,20 @@ public class QueryRecord extends AbstractProcessor {
         final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>();
         final Set<FlowFile> createdFlowFiles = new HashSet<>();
 
+        final RecordSchema recordSchema;
+
+        try (final InputStream rawIn = session.read(original);
+            final InputStream in = new BufferedInputStream(rawIn)) {
+            recordSchema = resultSetWriterFactory.getSchema(original, in);
+        } catch (final Exception e) {
+            getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
         int recordsRead = 0;
 
         try {
-            final RecordSetWriter resultSetWriter;
-            try (final InputStream rawIn = session.read(original);
-                final InputStream in = new BufferedInputStream(rawIn)) {
-                resultSetWriter = resultSetWriterFactory.createWriter(getLogger(), resultSetWriterFactory.getSchema(original, in));
-            }
-
             for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
                 if (!descriptor.isDynamic()) {
                     continue;
@@ -280,14 +286,17 @@ public class QueryRecord extends AbstractProcessor {
                         queryResult = query(session, original, sql, context, recordParserFactory);
                     }
 
+                    final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
+                    final FlowFile outFlowFile = transformed;
                     try {
                         final ResultSet rs = queryResult.getResultSet();
                         transformed = session.write(transformed, new OutputStreamCallback() {
                             @Override
                             public void process(final OutputStream out) throws IOException {
-                                try {
-                                    final ResultSetRecordSet recordSet = new ResultSetRecordSet(rs);
-                                    writeResultRef.set(resultSetWriter.write(recordSet, out));
+                                try (final RecordSetWriter resultSetWriter = resultSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, out)) {
+                                    final ResultSetRecordSet resultSet = new ResultSetRecordSet(rs);
+                                    writeResultRef.set(resultSetWriter.write(resultSet));
+                                    mimeTypeRef.set(resultSetWriter.getMimeType());
                                 } catch (final Exception e) {
                                     throw new IOException(e);
                                 }
@@ -310,7 +319,7 @@ public class QueryRecord extends AbstractProcessor {
                             attributesToAdd.putAll(result.getAttributes());
                         }
 
-                        attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), resultSetWriter.getMimeType());
+                        attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
                         attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
                         transformed = session.putAllAttributes(transformed, attributesToAdd);
                         transformedFlowFiles.put(transformed, relationship);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
index 62ca521..6463374 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
@@ -58,6 +58,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.PushBackRecordSet;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
 @EventDriven
@@ -133,10 +134,10 @@ public class SplitRecord extends AbstractProcessor {
 
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-        final RecordSetWriter writer;
+        final RecordSchema schema;
         try (final InputStream rawIn = session.read(original);
             final InputStream in = new BufferedInputStream(rawIn)) {
-            writer = writerFactory.createWriter(getLogger(), writerFactory.getSchema(original, in));
+            schema = writerFactory.getSchema(original, in);
         } catch (final Exception e) {
             getLogger().error("Failed to create Record Writer for {}; routing to failure", new Object[] {original, e});
             session.transfer(original, REL_FAILURE);
@@ -159,28 +160,26 @@ public class SplitRecord extends AbstractProcessor {
                             FlowFile split = session.create(original);
 
                             try {
-                                final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
-                                split = session.write(split, new OutputStreamCallback() {
-                                    @Override
-                                    public void process(final OutputStream out) throws IOException {
+                                final Map<String, String> attributes = new HashMap<>();
+                                final WriteResult writeResult;
+
+                                try (final OutputStream out = session.write(split);
+                                    final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, split, out)) {
                                         if (maxRecords == 1) {
                                             final Record record = pushbackSet.next();
-                                            writeResultRef.set(writer.write(record, out));
+                                            writeResult = writer.write(record);
                                         } else {
                                             final RecordSet limitedSet = pushbackSet.limit(maxRecords);
-                                            writeResultRef.set(writer.write(limitedSet, out));
+                                            writeResult = writer.write(limitedSet);
                                         }
-                                    }
-                                });
 
-                                final WriteResult writeResult = writeResultRef.get();
+                                        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                                        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                                        attributes.putAll(writeResult.getAttributes());
 
-                                final Map<String, String> attributes = new HashMap<>();
-                                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
-                                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
-                                attributes.putAll(writeResult.getAttributes());
+                                        session.adjustCounter("Records Split", writeResult.getRecordCount(), false);
+                                }
 
-                                session.adjustCounter("Records Split", writeResult.getRecordCount(), false);
                                 split = session.putAllAttributes(split, attributes);
                             } finally {
                                 splits.add(split);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index f345524..7ed3736 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -59,6 +59,7 @@ org.apache.nifi.processors.standard.ListenUDP
 org.apache.nifi.processors.standard.ListSFTP
 org.apache.nifi.processors.standard.LogAttribute
 org.apache.nifi.processors.standard.LogMessage
+org.apache.nifi.processors.standard.LookupRecord
 org.apache.nifi.processors.standard.MergeContent
 org.apache.nifi.processors.standard.ModifyBytes
 org.apache.nifi.processors.standard.MonitorActivity

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
new file mode 100644
index 0000000..5f2cd6c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLookupRecord {
+
+    private TestRunner runner;
+    private MapLookup lookupService;
+    private MockRecordParser recordReader;
+    private MockRecordWriter recordWriter;
+
+    @Before
+    public void setup() throws InitializationException {
+        recordReader = new MockRecordParser();
+        recordWriter = new MockRecordWriter(null, false);
+        lookupService = new MapLookup();
+
+        runner = TestRunners.newTestRunner(LookupRecord.class);
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", recordWriter);
+        runner.enableControllerService(recordWriter);
+        runner.addControllerService("lookup", lookupService);
+        runner.enableControllerService(lookupService);
+
+        runner.setProperty(LookupRecord.RECORD_READER, "reader");
+        runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
+        runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
+        runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/name");
+        runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, null);
+        recordReader.addRecord("Jane Doe", 47, null);
+        recordReader.addRecord("Jimmy Doe", 14, null);
+    }
+
+    @Test
+    public void testAllMatch() throws InitializationException {
+        lookupService.addValue("John Doe", "Soccer");
+        lookupService.addValue("Jane Doe", "Basketball");
+        lookupService.addValue("Jimmy Doe", "Football");
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+
+        out.assertAttributeEquals("record.count", "3");
+        out.assertAttributeEquals("mime.type", "text/plain");
+        out.assertContentEquals("John Doe,48,Soccer\nJane Doe,47,Basketball\nJimmy Doe,14,Football\n");
+    }
+
+    @Test
+    public void testAllUnmatched() throws InitializationException {
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
+
+        out.assertAttributeEquals("record.count", "3");
+        out.assertAttributeEquals("mime.type", "text/plain");
+        out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
+    }
+
+    @Test
+    public void testMixtureOfMatch() throws InitializationException {
+        lookupService.addValue("John Doe", "Soccer");
+        lookupService.addValue("Jimmy Doe", "Football");
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(LookupRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(LookupRecord.REL_MATCHED, 1);
+        runner.assertTransferCount(LookupRecord.REL_UNMATCHED, 1);
+
+        final MockFlowFile matched = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+        matched.assertAttributeEquals("record.count", "2");
+        matched.assertAttributeEquals("mime.type", "text/plain");
+        matched.assertContentEquals("John Doe,48,Soccer\nJimmy Doe,14,Football\n");
+
+        final MockFlowFile unmatched = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
+        unmatched.assertAttributeEquals("record.count", "1");
+        unmatched.assertAttributeEquals("mime.type", "text/plain");
+        unmatched.assertContentEquals("Jane Doe,47,\n");
+    }
+
+
+    @Test
+    public void testResultPathNotFound() throws InitializationException {
+        runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/other");
+
+        lookupService.addValue("John Doe", "Soccer");
+        lookupService.addValue("Jane Doe", "Basketball");
+        lookupService.addValue("Jimmy Doe", "Football");
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+
+        out.assertAttributeEquals("record.count", "3");
+        out.assertAttributeEquals("mime.type", "text/plain");
+        out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
+    }
+
+    @Test
+    public void testLookupPathNotFound() throws InitializationException {
+        runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/other");
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0);
+
+        out.assertAttributeEquals("record.count", "3");
+        out.assertAttributeEquals("mime.type", "text/plain");
+        out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
+    }
+
+    @Test
+    public void testUnparseableData() throws InitializationException {
+        recordReader.failAfter(1);
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0);
+        out.assertAttributeNotExists("record.count");
+        out.assertContentEquals("");
+    }
+
+    @Test
+    public void testNoResultPath() throws InitializationException {
+        lookupService.addValue("John Doe", "Soccer");
+        lookupService.addValue("Jane Doe", "Basketball");
+        lookupService.addValue("Jimmy Doe", "Football");
+
+        runner.removeProperty(LookupRecord.RESULT_RECORD_PATH);
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+
+        out.assertAttributeEquals("record.count", "3");
+        out.assertAttributeEquals("mime.type", "text/plain");
+        out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
+    }
+
+
+    @Test
+    public void testMultipleLookupPaths() throws InitializationException {
+        lookupService.addValue("John Doe", "Soccer");
+        lookupService.addValue("Jane Doe", "Basketball");
+        lookupService.addValue("Jimmy Doe", "Football");
+
+        runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/*");
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0);
+
+        out.assertAttributeEquals("record.count", "3");
+        out.assertAttributeEquals("mime.type", "text/plain");
+        out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
+    }
+
+
+
+    private static class MapLookup extends AbstractControllerService implements StringLookupService {
+        private final Map<String, String> values = new HashMap<>();
+
+        public void addValue(final String key, final String value) {
+            values.put(key, value);
+        }
+
+        @Override
+        public Class<?> getValueType() {
+            return String.class;
+        }
+
+        @Override
+        public Optional<String> lookup(final String key) {
+            return Optional.ofNullable(values.get(key));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 023fba7..c00eb4b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -269,10 +269,10 @@ public class TestQueryRecord {
         }
 
         @Override
-        public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) {
+        public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
             return new RecordSetWriter() {
                 @Override
-                public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
+                public WriteResult write(final RecordSet rs) throws IOException {
                     final int colCount = rs.getSchema().getFieldCount();
                     Assert.assertEquals(columnNames.size(), colCount);
 
@@ -299,9 +299,23 @@ public class TestQueryRecord {
                 }
 
                 @Override
-                public WriteResult write(Record record, OutputStream out) throws IOException {
+                public WriteResult write(Record record) throws IOException {
                     return null;
                 }
+
+                @Override
+                public void close() throws IOException {
+                    out.close();
+                }
+
+                @Override
+                public void beginRecordSet() throws IOException {
+                }
+
+                @Override
+                public WriteResult finishRecordSet() throws IOException {
+                    return WriteResult.EMPTY;
+                }
             };
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/pom.xml
new file mode 100644
index 0000000..127895a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/pom.xml
@@ -0,0 +1,36 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-standard-services</artifactId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>nifi-lookup-service-api</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupFailureException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupFailureException.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupFailureException.java
new file mode 100644
index 0000000..acd1407
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupFailureException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lookup;
+
+public class LookupFailureException extends Exception {
+
+    public LookupFailureException() {
+        super();
+    }
+
+    public LookupFailureException(final String message) {
+        super(message);
+    }
+
+    public LookupFailureException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+    public LookupFailureException(final Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
new file mode 100644
index 0000000..2796ff5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lookup;
+
+import java.util.Optional;
+
+import org.apache.nifi.controller.ControllerService;
+
+public interface LookupService<T> extends ControllerService {
+
+    /**
+     * Looks up a value that corresponds to the given key
+     *
+     * @param key the key to lookup
+     * @return a value that corresponds to the given key
+     *
+     * @throws if unable to lookup a value for the given key
+     */
+    Optional<T> lookup(String key) throws LookupFailureException;
+
+    /**
+     * @return the Class that represents the type of value that will be returned by {@link #lookup(String)}
+     */
+    Class<?> getValueType();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
new file mode 100644
index 0000000..57bc8e0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lookup;
+
+import java.util.Optional;
+
+import org.apache.nifi.serialization.record.Record;
+
+public interface RecordLookupService extends LookupService<Record> {
+
+    /**
+     * Returns an Optional Record that corresponds to the given key
+     *
+     * @param key the key to lookup
+     * @return an Optional Record that corresponds to the given key
+     *
+     * @throws if unable to lookup a value for the given key
+     */
+    @Override
+    Optional<Record> lookup(String key) throws LookupFailureException;
+
+    @Override
+    default Class<?> getValueType() {
+        return Record.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
new file mode 100644
index 0000000..8f5199e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lookup;
+
+import java.util.Optional;
+
+public interface StringLookupService extends LookupService<String> {
+
+    /**
+     * Returns an Optional value that corresponds to the given key
+     *
+     * @param key the key to lookup
+     * @return an Optional String that represents the value for the given key
+     *
+     * @throws if unable to lookup a value for the given key
+     */
+    @Override
+    Optional<String> lookup(String key);
+
+    @Override
+    default Class<?> getValueType() {
+        return String.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/pom.xml
new file mode 100644
index 0000000..c73f246
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/pom.xml
@@ -0,0 +1,35 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-lookup-services-bundle</artifactId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-lookup-services-nar</artifactId>
+    <packaging>nar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-lookup-services</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..0fe0466
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,73 @@
+nifi-lookup-services-nar
+Copyright 2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Apache HttpComponents
+    The following NOTICE information applies:
+      Apache HttpClient
+      Copyright 1999-2014 The Apache Software Foundation
+      
+      Apache HttpCore
+      Copyright 2005-2014 The Apache Software Foundation
+
+      This project contains annotations derived from JCIP-ANNOTATIONS
+      Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
+
+  (ASLv2) Apache Commons Codec
+    The following NOTICE information applies:
+      Apache Commons Codec
+      Copyright 2002-2014 The Apache Software Foundation
+
+      src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+      contains test data from http://aspell.net/test/orig/batch0.tab.
+      Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+      ===============================================================================
+
+      The content of package org.apache.commons.codec.language.bm has been translated
+      from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+      with permission from the original authors.
+      Original source copyright:
+      Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+  (ASLv2) Apache Commons Logging
+    The following NOTICE information applies:
+      Apache Commons Logging
+      Copyright 2003-2013 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Net
+      The following NOTICE information applies:
+        Apache Commons Net
+        Copyright 2001-2016 The Apache Software Foundation
+
+  (ASLv2) GeoIP2 Java API
+    The following NOTICE information applies:
+      GeoIP2 Java API
+      This software is Copyright (c) 2013 by MaxMind, Inc.
+      
+************************
+Creative Commons Attribution-ShareAlike 3.0
+************************
+
+The following binary components are provided under the Creative Commons Attribution-ShareAlike 3.0.  See project link for details.
+
+	(CCAS 3.0) MaxMind DB (https://github.com/maxmind/MaxMind-DB)
+
+
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
new file mode 100644
index 0000000..bd711a1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
@@ -0,0 +1,51 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+        contributor license agreements. See the NOTICE file distributed with this 
+        work for additional information regarding copyright ownership. The ASF licenses 
+        this file to You under the Apache License, Version 2.0 (the "License"); you 
+        may not use this file except in compliance with the License. You may obtain 
+        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
+        required by applicable law or agreed to in writing, software distributed 
+        under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+        OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+        the specific language governing permissions and limitations under the License. -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-lookup-services-bundle</artifactId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-lookup-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-lookup-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.maxmind.geoip2</groupId>
+            <artifactId>geoip2</artifactId>
+            <version>2.1.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>jsr305</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java
new file mode 100644
index 0000000..59f7ca6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lookup;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+
+@Tags({"lookup", "enrich", "key", "value"})
+@CapabilityDescription("Allows users to add key/value pairs as User-defined Properties. Each property that is added can be looked up by Property Name.")
+public class SimpleKeyValueLookupService extends AbstractControllerService implements StringLookupService {
+    private volatile Map<String, String> lookupValues = new HashMap<>();
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyDescriptorName)
+            .required(false)
+            .dynamic(true)
+            .addValidator(Validator.VALID)
+            .build();
+    }
+
+    @OnEnabled
+    public void cacheConfiguredValues(final ConfigurationContext context) {
+        lookupValues = context.getProperties().entrySet().stream()
+            .collect(Collectors.toMap(entry -> entry.getKey().getName(), entry -> context.getProperty(entry.getKey()).getValue()));
+    }
+
+    @Override
+    public Optional<String> lookup(final String key) {
+        return Optional.ofNullable(lookupValues.get(key));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/AnonymousIpSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/AnonymousIpSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/AnonymousIpSchema.java
new file mode 100644
index 0000000..2e164ac
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/AnonymousIpSchema.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lookup.maxmind;
+
+import java.util.Arrays;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class AnonymousIpSchema {
+    static final RecordField ANONYMOUS = new RecordField("anonymous", RecordFieldType.BOOLEAN.getDataType());
+    static final RecordField ANONYMOUS_VPN = new RecordField("anonymousVpn", RecordFieldType.BOOLEAN.getDataType());
+    static final RecordField HOSTING_PROVIDER = new RecordField("hostingProvider", RecordFieldType.BOOLEAN.getDataType());
+    static final RecordField PUBLIC_PROXY = new RecordField("publicProxy", RecordFieldType.BOOLEAN.getDataType());
+    static final RecordField TOR_EXIT_NODE = new RecordField("torExitNode", RecordFieldType.BOOLEAN.getDataType());
+
+    static final RecordSchema ANONYMOUS_IP_SCHEMA = new SimpleRecordSchema(Arrays.asList(ANONYMOUS, ANONYMOUS_VPN, HOSTING_PROVIDER, PUBLIC_PROXY, TOR_EXIT_NODE));
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/CitySchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/CitySchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/CitySchema.java
new file mode 100644
index 0000000..9bb6dbf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/CitySchema.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lookup.maxmind;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class CitySchema {
+    static final RecordField SUBDIVISION_NAME = new RecordField("name", RecordFieldType.STRING.getDataType());
+    static final RecordField SUBDIVISION_ISO = new RecordField("isoCode", RecordFieldType.STRING.getDataType());
+
+    private static final List<RecordField> SUBDIVISION_FIELDS = Arrays.asList(SUBDIVISION_NAME, SUBDIVISION_ISO);
+    static final RecordSchema SUBDIVISION_SCHEMA = new SimpleRecordSchema(SUBDIVISION_FIELDS);
+    static final DataType SUBDIVISION_DATA_TYPE = RecordFieldType.RECORD.getRecordDataType(SUBDIVISION_SCHEMA);
+
+    static final RecordField COUNTRY_NAME = new RecordField("name", RecordFieldType.STRING.getDataType());
+    static final RecordField COUNTRY_ISO = new RecordField("isoCode", RecordFieldType.STRING.getDataType());
+    private static final List<RecordField> COUNTRY_FIELDS = Arrays.asList(COUNTRY_NAME, COUNTRY_ISO);
+    static final RecordSchema COUNTRY_SCHEMA = new SimpleRecordSchema(COUNTRY_FIELDS);
+
+    static final RecordField CITY = new RecordField("city", RecordFieldType.STRING.getDataType());
+    static final RecordField ACCURACY = new RecordField("accuracy", RecordFieldType.INT.getDataType());
+    static final RecordField METRO_CODE = new RecordField("metroCode", RecordFieldType.INT.getDataType());
+    static final RecordField TIMEZONE = new RecordField("timeZone", RecordFieldType.STRING.getDataType());
+    static final RecordField LATITUDE = new RecordField("latitude", RecordFieldType.DOUBLE.getDataType());
+    static final RecordField LONGITUDE = new RecordField("longitude", RecordFieldType.DOUBLE.getDataType());
+    static final RecordField SUBDIVISIONS = new RecordField("subdivisions", RecordFieldType.ARRAY.getArrayDataType(SUBDIVISION_DATA_TYPE));
+    static final RecordField COUNTRY = new RecordField("country", RecordFieldType.RECORD.getRecordDataType(COUNTRY_SCHEMA));
+    static final RecordField CONTINENT = new RecordField("continent", RecordFieldType.STRING.getDataType());
+    static final RecordField POSTALCODE = new RecordField("postalCode", RecordFieldType.STRING.getDataType());
+
+    private static final List<RecordField> GEO_FIELDS = Arrays.asList(CITY, ACCURACY, LATITUDE, LONGITUDE, SUBDIVISIONS, COUNTRY, POSTALCODE);
+    static final RecordSchema GEO_SCHEMA = new SimpleRecordSchema(GEO_FIELDS);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/ContainerSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/ContainerSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/ContainerSchema.java
new file mode 100644
index 0000000..2be40df
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/ContainerSchema.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lookup.maxmind;
+
+import java.util.Arrays;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class ContainerSchema {
+
+    static final RecordField GEO = new RecordField("geo", RecordFieldType.RECORD.getRecordDataType(CitySchema.GEO_SCHEMA));
+    static final RecordField ISP = new RecordField("isp", RecordFieldType.RECORD.getRecordDataType(IspSchema.ISP_SCHEMA));
+    static final RecordField DOMAIN_NAME = new RecordField("domainName", RecordFieldType.STRING.getDataType());
+    static final RecordField CONNECTION_TYPE = new RecordField("connectionType", RecordFieldType.STRING.getDataType());
+    static final RecordField ANONYMOUS_IP = new RecordField("anonymousIp", RecordFieldType.RECORD.getRecordDataType(AnonymousIpSchema.ANONYMOUS_IP_SCHEMA));
+
+    static final RecordSchema CONTAINER_SCHEMA = new SimpleRecordSchema(Arrays.asList(GEO, ISP, DOMAIN_NAME, CONNECTION_TYPE, ANONYMOUS_IP));
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/DatabaseReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/DatabaseReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/DatabaseReader.java
new file mode 100644
index 0000000..1849f4f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/DatabaseReader.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.maxmind;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.maxmind.db.Metadata;
+import com.maxmind.db.Reader;
+import com.maxmind.db.Reader.FileMode;
+import com.maxmind.geoip2.GeoIp2Provider;
+import com.maxmind.geoip2.exception.AddressNotFoundException;
+import com.maxmind.geoip2.exception.GeoIp2Exception;
+import com.maxmind.geoip2.model.AnonymousIpResponse;
+import com.maxmind.geoip2.model.CityResponse;
+import com.maxmind.geoip2.model.ConnectionTypeResponse;
+import com.maxmind.geoip2.model.CountryResponse;
+import com.maxmind.geoip2.model.DomainResponse;
+import com.maxmind.geoip2.model.IspResponse;
+
+/**
+ * <p>
+ * This class was copied from https://raw.githubusercontent.com/maxmind/GeoIP2-java/master/src/main/java/com/maxmind/geoip2/DatabaseReader.java It is written by Maxmind and it is available under
+ * Apache Software License V2
+ *
+ * The modification we're making to the code below is to stop using exceptions for mainline flow control. Specifically we don't want to throw an exception simply because an address was not found.
+ * </p>
+ *
+ * Instances of this class provide a reader for the GeoIP2 database format. IP addresses can be looked up using the <code>get</code> method.
+ */
+public class DatabaseReader implements GeoIp2Provider, Closeable {
+
+    private final Reader reader;
+    private final ObjectMapper om;
+
+    private DatabaseReader(Builder builder) throws IOException {
+        if (builder.stream != null) {
+            this.reader = new Reader(builder.stream);
+        } else if (builder.database != null) {
+            this.reader = new Reader(builder.database, builder.mode);
+        } else {
+            // This should never happen. If it does, review the Builder class
+            // constructors for errors.
+            throw new IllegalArgumentException("Unsupported Builder configuration: expected either File or URL");
+        }
+
+        this.om = new ObjectMapper();
+        this.om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        this.om.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
+        InjectableValues inject = new InjectableValues.Std().addValue("locales", builder.locales);
+        this.om.setInjectableValues(inject);
+    }
+
+    /**
+     * <p>
+     * Constructs a Builder for the DatabaseReader. The file passed to it must be a valid GeoIP2 database file.
+     * </p>
+     * <p>
+     * <code>Builder</code> creates instances of <code>DatabaseReader</code> from values set by the methods.
+     * </p>
+     * <p>
+     * Only the values set in the <code>Builder</code> constructor are required.
+     * </p>
+     */
+    public final static class Builder {
+
+        final File database;
+        final InputStream stream;
+
+        List<String> locales = Arrays.asList("en");
+        FileMode mode = FileMode.MEMORY_MAPPED;
+
+        /**
+         * @param stream the stream containing the GeoIP2 database to use.
+         */
+        public Builder(InputStream stream) {
+            this.stream = stream;
+            this.database = null;
+        }
+
+        /**
+         * @param database the GeoIP2 database file to use.
+         */
+        public Builder(File database) {
+            this.database = database;
+            this.stream = null;
+        }
+
+        /**
+         * @param val List of locale codes to use in name property from most preferred to least preferred.
+         * @return Builder object
+         */
+        public Builder locales(List<String> val) {
+            this.locales = val;
+            return this;
+        }
+
+        /**
+         * @param val The file mode used to open the GeoIP2 database
+         * @return Builder object
+         * @throws java.lang.IllegalArgumentException if you initialized the Builder with a URL, which uses {@link FileMode#MEMORY}, but you provided a different FileMode to this method.
+         */
+        public Builder fileMode(FileMode val) {
+            if (this.stream != null && !FileMode.MEMORY.equals(val)) {
+                throw new IllegalArgumentException("Only FileMode.MEMORY is supported when using an InputStream.");
+            }
+
+            this.mode = val;
+            return this;
+        }
+
+        /**
+         * @return an instance of <code>DatabaseReader</code> created from the fields set on this builder.
+         * @throws IOException if there is an error reading the database
+         */
+        public DatabaseReader build() throws IOException {
+            return new DatabaseReader(this);
+        }
+    }
+
+    /**
+     * @param ipAddress IPv4 or IPv6 address to lookup.
+     * @return An object of type T with the data for the IP address or null if no information could be found for the given IP address
+     * @throws IOException if there is an error opening or reading from the file.
+     */
+    private <T> T get(InetAddress ipAddress, Class<T> cls, boolean hasTraits, String type) throws IOException, AddressNotFoundException {
+        ObjectNode node = (ObjectNode) this.reader.get(ipAddress);
+        if (node == null) {
+            return null;
+        }
+
+        ObjectNode ipNode;
+        if (hasTraits) {
+            if (!node.has("traits")) {
+                node.set("traits", this.om.createObjectNode());
+            }
+
+            ipNode = (ObjectNode) node.get("traits");
+        } else {
+            ipNode = node;
+        }
+
+        ipNode.put("ip_address", ipAddress.getHostAddress());
+        return this.om.treeToValue(node, cls);
+    }
+
+    /**
+     * <p>
+     * Closes the database.
+     * </p>
+     * <p>
+     * If you are using <code>FileMode.MEMORY_MAPPED</code>, this will
+     * <em>not</em> unmap the underlying file due to a limitation in Java's <code>MappedByteBuffer</code>. It will however set the reference to the buffer to <code>null</code>, allowing the garbage
+     * collector to collect it.
+     * </p>
+     *
+     * @throws IOException if an I/O error occurs.
+     */
+    @Override
+    public void close() throws IOException {
+        this.reader.close();
+    }
+
+    @Override
+    public CountryResponse country(InetAddress ipAddress) throws IOException, GeoIp2Exception {
+        return this.get(ipAddress, CountryResponse.class, true, "Country");
+    }
+
+    @Override
+    public CityResponse city(InetAddress ipAddress) throws IOException, GeoIp2Exception {
+        return this.get(ipAddress, CityResponse.class, true, "City");
+    }
+
+    /**
+     * Look up an IP address in a GeoIP2 Anonymous IP.
+     *
+     * @param ipAddress IPv4 or IPv6 address to lookup.
+     * @return a AnonymousIpResponse for the requested IP address.
+     * @throws GeoIp2Exception if there is an error looking up the IP
+     * @throws IOException if there is an IO error
+     */
+    public AnonymousIpResponse anonymousIp(InetAddress ipAddress) throws IOException, GeoIp2Exception {
+        return this.get(ipAddress, AnonymousIpResponse.class, false, "GeoIP2-Anonymous-IP");
+    }
+
+    /**
+     * Look up an IP address in a GeoIP2 Connection Type database.
+     *
+     * @param ipAddress IPv4 or IPv6 address to lookup.
+     * @return a ConnectTypeResponse for the requested IP address.
+     * @throws GeoIp2Exception if there is an error looking up the IP
+     * @throws IOException if there is an IO error
+     */
+    public ConnectionTypeResponse connectionType(InetAddress ipAddress) throws IOException, GeoIp2Exception {
+        return this.get(ipAddress, ConnectionTypeResponse.class, false,
+            "GeoIP2-Connection-Type");
+    }
+
+    /**
+     * Look up an IP address in a GeoIP2 Domain database.
+     *
+     * @param ipAddress IPv4 or IPv6 address to lookup.
+     * @return a DomainResponse for the requested IP address.
+     * @throws GeoIp2Exception if there is an error looking up the IP
+     * @throws IOException if there is an IO error
+     */
+    public DomainResponse domain(InetAddress ipAddress) throws IOException, GeoIp2Exception {
+        return this.get(ipAddress, DomainResponse.class, false, "GeoIP2-Domain");
+    }
+
+    /**
+     * Look up an IP address in a GeoIP2 ISP database.
+     *
+     * @param ipAddress IPv4 or IPv6 address to lookup.
+     * @return an IspResponse for the requested IP address.
+     * @throws GeoIp2Exception if there is an error looking up the IP
+     * @throws IOException if there is an IO error
+     */
+    public IspResponse isp(InetAddress ipAddress) throws IOException, GeoIp2Exception {
+        return this.get(ipAddress, IspResponse.class, false, "GeoIP2-ISP");
+    }
+
+    /**
+     * @return the metadata for the open MaxMind DB file.
+     */
+    public Metadata getMetadata() {
+        return this.reader.getMetadata();
+    }
+}


Mime
View raw message