metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [1/3] incubator-metron git commit: METRON-283 Migrate Geo Enrichment outside of MySQL (justinleet) closes apache/incubator-metron#421
Date Thu, 26 Jan 2017 16:21:54 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master c74c7de27 -> fa6f3df51


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java
new file mode 100644
index 0000000..11e024e
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctions.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.stellar;
+
+import org.apache.log4j.Logger;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.dsl.Stellar;
+import org.apache.metron.common.dsl.StellarFunction;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class GeoEnrichmentFunctions {
+  private static final Logger LOG = Logger.getLogger(GeoEnrichmentFunctions.class);
+
+  @Stellar(name="GET"
+          ,namespace="GEO"
+          ,description="Look up an IPV4 address and returns geographic information about it"
+          ,params = {
+                      "ip - The IPV4 address to lookup" +
+                      "fields - Optional list of GeoIP fields to grab. Options are locID, country, city, postalCode, dmaCode, latitude, longitude, location_point"
+                    }
+          ,returns = "If a Single field is requested a string of the field, If multiple fields a map of string of the fields, and null otherwise"
+  )
+  public static class GeoGet implements StellarFunction {
+    boolean initialized = false;
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws ParseException {
+      if(!initialized) {
+        return null;
+      }
+      if(args.size() > 2) {
+        throw new IllegalArgumentException("GEO_GET received more arguments than expected: " + args.size());
+      }
+
+      if(args.size() == 1 && args.get(0) instanceof String) {
+        // If no fields are provided, return everything
+        String ip = (String) args.get(0);
+        if(ip == null || ip.trim().isEmpty()) {
+          return null;
+        }
+
+        Optional<HashMap<String, String>> result = GeoLiteDatabase.INSTANCE.get(ip);
+        if(result.isPresent()) {
+          return result.get();
+        }
+      } else if (args.size() == 2 && args.get(1) instanceof List) {
+        // If fields are provided, return just those fields.
+        String ip = (String) args.get(0);
+        @SuppressWarnings("unchecked")
+        List<String> fields = (List) args.get(1);
+        Optional<HashMap<String, String>> result = GeoLiteDatabase.INSTANCE.get(ip);
+
+        // If only one field is requested, just return it directly
+        if(fields.size() == 1 && result.isPresent()) {
+          return result.get().get(fields.get(0));
+        } else if (result.isPresent()) {
+          // If multiple fields are requested, return all of them
+          HashMap<String, String> filteredInfo = new HashMap<>();
+          for(String field : fields) {
+            HashMap<String, String> geoInfo = result.get();
+            filteredInfo.put(field, geoInfo.get(field));
+          }
+          return filteredInfo;
+        }
+      }
+
+      return null;
+    }
+
+    @Override
+    public void initialize(Context context) {
+        LOG.info("Initializing GeoEnrichmentFunctions");
+        Map<String, Object> config = getConfig(context);
+        String hdfsDir = (String) config.get(GeoLiteDatabase.GEO_HDFS_FILE);
+        GeoLiteDatabase.INSTANCE.update(hdfsDir);
+        initialized = true;
+    }
+
+    private static Map<String, Object> getConfig(Context context) {
+      return (Map<String, Object>) context.getCapability(Context.Capabilities.GLOBAL_CONFIG, false).orElse(new HashMap<>());
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return initialized;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/main/resources/ddl/geoip_ddl.sql
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/resources/ddl/geoip_ddl.sql b/metron-platform/metron-enrichment/src/main/resources/ddl/geoip_ddl.sql
deleted file mode 100644
index 02616c6..0000000
--- a/metron-platform/metron-enrichment/src/main/resources/ddl/geoip_ddl.sql
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-CREATE DATABASE IF NOT EXISTS GEO;
-
-USE GEO;
-
-DROP TABLE IF EXISTS `blocks`;
-CREATE TABLE  `blocks` ( `startIPNum` int(10) unsigned NOT NULL,`endIPNum` int(10) unsigned NOT NULL,`locID`
-int(10) unsigned NOT NULL, PRIMARY KEY  (`startIPNum`,`endIPNum`) )
-ENGINE=MyISAM DEFAULT CHARSET=latin1 PACK_KEYS=1 DELAY_KEY_WRITE=1;
-
-DROP TABLE IF EXISTS `location`;
-CREATE TABLE  `location` (`locID` int(10) unsigned NOT NULL,`country` char(2) default NULL,`region` char(2)
- default NULL,`city` varchar(45) default NULL,`postalCode` char(7) default NULL,`latitude` double default
-NULL,`longitude` double default NULL,`dmaCode` char(3) default NULL,`areaCode` char(3) default NULL,PRIMARY KEY
-  (`locID`),KEY `Index_Country` (`country`) ) ENGINE=MyISAM DEFAULT CHARSET=latin1 ROW_FORMAT=FIXED;
-
-load data infile '/var/lib/mysql-files/GeoLiteCity-Blocks.csv'  into table `blocks`  fields terminated by ',' optionally enclosed by '"'  lines terminated by '\n' ignore 2 lines;
-load data infile '/var/lib/mysql-files/GeoLiteCity-Location.csv'  into table `location`  fields terminated by ',' optionally enclosed by '"'  lines terminated by '\n' ignore 2 lines;
-
-
-DELIMITER $$
-DROP FUNCTION IF EXISTS `IPTOLOCID` $$
-CREATE FUNCTION `IPTOLOCID`( ip VARCHAR(15)) RETURNS int(10) unsigned
-  BEGIN
-    DECLARE ipn INTEGER UNSIGNED;
-    DECLARE locID_var INTEGER;
-    IF ip LIKE '192.168.%' OR ip LIKE '10.%' THEN RETURN 0;
-    END IF;
-    SET ipn = INET_ATON(ip);
-    SELECT locID INTO locID_var FROM `blocks` INNER JOIN (SELECT MAX(startIPNum) AS start FROM `blocks` WHERE startIPNum <= ipn) AS s ON (startIPNum = s.start) WHERE endIPNum >= ipn;
-    RETURN locID_var;
-  END
-$$
-DELIMITER ;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
index f899cf9..f91939d 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoAdapterTest.java
@@ -17,100 +17,66 @@
  */
 package org.apache.metron.enrichment.adapters.geo;
 
+import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.enrichment.bolt.CacheKey;
+import org.apache.metron.test.utils.UnitTestHelper;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
 import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
 
-import java.sql.ResultSet;
-import java.sql.Statement;
-
-import static org.mockito.Mockito.when;
+import java.io.File;
+import java.util.Map;
 
 public class GeoAdapterTest {
-
-
-  private String ip = "72.163.4.161";
-
+  private static final String IP = "216.160.83.56";
 
   /**
    * {
-   * "locID":"1",
-   * "country":"test country",
-   * "city":"test city",
-   * "postalCode":"test zip",
-   * "latitude":"test latitude",
-   * "longitude":"test longitude",
-   * "dmaCode":"test dma",
-   * "location_point":"test latitude,test longitude"
+   * "locID":"6252001",
+   * "country":"US",
+   * "city":"Milton",
+   * "postalCode":"98354",
+   * "latitude":"47.2513",
+   * "longitude":"-122.3149",
+   * "dmaCode":"819",
+   * "location_point":"47.2513,-122.3149"
    * }
    */
   @Multiline
-  private String expectedMessageString;
+  private static String expectedMessageString;
 
-  private JSONObject expectedMessage;
+  private static JSONObject expectedMessage;
 
-  @Mock
-  Statement statetment;
-  @Mock
-  ResultSet resultSet, resultSet1;
+  private static GeoAdapter geo;
+  private static File geoHdfsFile;
 
-
-  @Before
-  public void setup() throws Exception {
+  @BeforeClass
+  public static void setupOnce() throws ParseException {
     JSONParser jsonParser = new JSONParser();
     expectedMessage = (JSONObject) jsonParser.parse(expectedMessageString);
-    MockitoAnnotations.initMocks(this);
-    when(statetment.executeQuery("select IPTOLOCID(\"" + ip + "\") as ANS")).thenReturn(resultSet);
-    when(statetment.executeQuery("select * from location where locID = 1")).thenReturn(resultSet1);
-    when(resultSet.next()).thenReturn(Boolean.TRUE, Boolean.FALSE);
-    when(resultSet.getString("ANS")).thenReturn("1");
-    when(resultSet1.next()).thenReturn(Boolean.TRUE, Boolean.FALSE);
-    when(resultSet1.getString("locID")).thenReturn("1");
-    when(resultSet1.getString("country")).thenReturn("test country");
-    when(resultSet1.getString("city")).thenReturn("test city");
-    when(resultSet1.getString("postalCode")).thenReturn("test zip");
-    when(resultSet1.getString("latitude")).thenReturn("test latitude");
-    when(resultSet1.getString("longitude")).thenReturn("test longitude");
-    when(resultSet1.getString("dmaCode")).thenReturn("test dma");
-  }
 
+    String baseDir = UnitTestHelper.findDir("GeoLite");
+    geoHdfsFile = new File(new File(baseDir), "GeoIP2-City-Test.mmdb.gz");
+
+    geo = new GeoAdapter();
+    geo.initializeAdapter(ImmutableMap.of(GeoLiteDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath()));
+  }
 
   @Test
   public void testEnrich() throws Exception {
-    GeoAdapter geo = new GeoAdapter() {
-      @Override
-      public boolean initializeAdapter() {
-        return true;
-      }
-    };
-    geo.setStatement(statetment);
-    JSONObject actualMessage = geo.enrich(new CacheKey("dummy", ip, null));
+    JSONObject actualMessage = geo.enrich(new CacheKey("dummy", IP, null));
+
     Assert.assertNotNull(actualMessage.get("locID"));
     Assert.assertEquals(expectedMessage, actualMessage);
   }
 
   @Test
   public void testEnrichNonString() throws Exception {
-    GeoAdapter geo = new GeoAdapter() {
-      @Override
-      public boolean initializeAdapter() {
-        return true;
-      }
-    };
-    geo.setStatement(statetment);
-    JSONObject actualMessage = geo.enrich(new CacheKey("dummy", ip, null));
-    Assert.assertNotNull(actualMessage.get("locID"));
-    Assert.assertEquals(expectedMessage, actualMessage);
-
-    actualMessage = geo.enrich(new CacheKey("dummy", 10L, null));
-    Assert.assertEquals(actualMessage,new JSONObject());
+    JSONObject actualMessage = geo.enrich(new CacheKey("dummy", 10L, null));
+    Assert.assertEquals(new JSONObject(), actualMessage);
   }
-
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabaseTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabaseTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabaseTest.java
new file mode 100644
index 0000000..cc891c6
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/geo/GeoLiteDatabaseTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.adapters.geo;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class GeoLiteDatabaseTest {
+  private static Context context;
+  private static File geoHdfsFile;
+  private static File geoHdfsFile_update;
+  private static final String IP_WITH_DMA = "81.2.69.192";
+  private static final String IP_NO_DMA = "216.160.83.56";
+
+  /**
+   * {
+   * "locID":"6252001",
+   * "country":"US",
+   * "city":"Milton",
+   * "postalCode":"98354",
+   * "latitude":"47.2513",
+   * "longitude":"-122.3149",
+   * "dmaCode":"819",
+   * "location_point":"47.2513,-122.3149"
+   * }
+   */
+  @Multiline
+  private static String expectedNoDmaMessageString;
+
+  private static JSONObject expectedNoDmaMessage;
+
+  /**
+   * {
+   * "locID":"2635167",
+   * "country":"GB",
+   * "city":"London",
+   * "postalCode":"",
+   * "latitude":"51.5142",
+   * "longitude":"-0.0931",
+   * "dmaCode":"",
+   * "location_point":"51.5142,-0.0931"
+   * }
+   */
+  @Multiline
+  private static String expectedDmaMessageString;
+
+  private static JSONObject expectedDmaMessage;
+
+  private static FileSystem fs;
+
+  @Rule
+  public TemporaryFolder testFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setupOnce() throws ParseException, IOException {
+    JSONParser jsonParser = new JSONParser();
+    expectedNoDmaMessage = (JSONObject) jsonParser.parse(expectedNoDmaMessageString);
+    expectedDmaMessage = (JSONObject) jsonParser.parse(expectedDmaMessageString);
+
+    String baseDir = UnitTestHelper.findDir("GeoLite");
+    geoHdfsFile = new File(new File(baseDir), "GeoIP2-City-Test.mmdb.gz");
+    geoHdfsFile_update = new File(new File(baseDir), "GeoIP2-City-Test-2.mmdb.gz");
+
+    Configuration config = new Configuration();
+    fs = FileSystem.get(config);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    testFolder.create();
+    context = new Context.Builder().with(Context.Capabilities.GLOBAL_CONFIG
+            , () -> ImmutableMap.of(GeoLiteDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath())
+    )
+            .build();
+  }
+
+  @Test
+  public void testGetLocal() throws Exception {
+    GeoLiteDatabase.INSTANCE.update(geoHdfsFile.getAbsolutePath());
+
+    Optional<HashMap<String, String>> result = GeoLiteDatabase.INSTANCE.get("192.168.0.1");
+    Assert.assertEquals("Local IP should return empty map", new HashMap<String, String>(), result.get());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetRemoteWithDma() throws Exception {
+    GeoLiteDatabase.INSTANCE.update(geoHdfsFile.getAbsolutePath());
+
+    Optional<HashMap<String, String>> result = GeoLiteDatabase.INSTANCE.get(IP_WITH_DMA);
+    Assert.assertEquals("Remote Local IP should return result based on DB", expectedDmaMessage, result.get());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetRemoteNoDma() throws Exception {
+    GeoLiteDatabase.INSTANCE.update(geoHdfsFile.getAbsolutePath());
+
+    Optional<HashMap<String, String>> result = GeoLiteDatabase.INSTANCE.get(IP_NO_DMA);
+    Assert.assertEquals("Remote Local IP should return result based on DB", expectedNoDmaMessage, result.get());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testMultipleUpdates() throws Exception {
+    GeoLiteDatabase.INSTANCE.update(geoHdfsFile.getAbsolutePath());
+    GeoLiteDatabase.INSTANCE.update(geoHdfsFile.getAbsolutePath());
+
+    Optional<HashMap<String, String>> result = GeoLiteDatabase.INSTANCE.get(IP_NO_DMA);
+    Assert.assertEquals("Remote Local IP should return result based on DB", expectedNoDmaMessage, result.get());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testUpdateIfNecessary() throws Exception {
+    HashMap<String, Object> globalConfig = new HashMap<>();
+    globalConfig.put(GeoLiteDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath());
+    GeoLiteDatabase.INSTANCE.updateIfNecessary(globalConfig);
+
+    Optional<HashMap<String, String>> result = GeoLiteDatabase.INSTANCE.get(IP_NO_DMA);
+    Assert.assertEquals("Remote Local IP should return result based on DB", expectedNoDmaMessage, result.get());
+  }
+
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testMultipleUpdateIfNecessary() throws Exception {
+    HashMap<String, Object> globalConfig = new HashMap<>();
+    globalConfig.put(GeoLiteDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath());
+    GeoLiteDatabase.INSTANCE.updateIfNecessary(globalConfig);
+    GeoLiteDatabase.INSTANCE.updateIfNecessary(globalConfig);
+
+    Optional<HashMap<String, String>> result = GeoLiteDatabase.INSTANCE.get(IP_NO_DMA);
+    Assert.assertEquals("Remote Local IP should return result based on DB", expectedNoDmaMessage, result.get());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testDifferingUpdateIfNecessary() throws Exception {
+    HashMap<String, Object> globalConfig = new HashMap<>();
+    globalConfig.put(GeoLiteDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath());
+    GeoLiteDatabase.INSTANCE.updateIfNecessary(globalConfig);
+    Optional<HashMap<String, String>> result = GeoLiteDatabase.INSTANCE.get(IP_NO_DMA);
+    Assert.assertEquals("Remote Local IP should return result based on DB", expectedNoDmaMessage, result.get());
+
+    globalConfig.put(GeoLiteDatabase.GEO_HDFS_FILE, geoHdfsFile_update.getAbsolutePath());
+    GeoLiteDatabase.INSTANCE.updateIfNecessary(globalConfig);
+    result = GeoLiteDatabase.INSTANCE.get(IP_NO_DMA);
+
+    Assert.assertEquals("Remote Local IP should return result based on DB", expectedNoDmaMessage, result.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapterTest.java
index cd2ba17..767b17d 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/host/HostFromJSONListAdapterTest.java
@@ -85,7 +85,7 @@ public class HostFromJSONListAdapterTest {
   @Test
   public void testInitializeAdapter() throws Exception {
     HostFromJSONListAdapter hja = new HostFromJSONListAdapter(expectedKnownHostsString);
-    Assert.assertTrue(hja.initializeAdapter());
+    Assert.assertTrue(hja.initializeAdapter(null));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapterTest.java
index ea5cabe..8ce93c1 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/host/HostFromPropertiesFileAdapterTest.java
@@ -90,7 +90,7 @@ public class HostFromPropertiesFileAdapterTest {
     public void testInitializeAdapter() throws Exception {
         Map<String, JSONObject> mapKnownHosts = new HashMap<>();
         HostFromPropertiesFileAdapter hfa = new HostFromPropertiesFileAdapter(mapKnownHosts);
-        Assert.assertFalse(hfa.initializeAdapter());
+        Assert.assertFalse(hfa.initializeAdapter(null));
         JSONArray jsonArray = (JSONArray) JSONValue.parse(expectedKnownHostsString);
         Iterator jsonArrayIterator = jsonArray.iterator();
         while(jsonArrayIterator.hasNext()) {
@@ -99,7 +99,7 @@ public class HostFromPropertiesFileAdapterTest {
             mapKnownHosts.put(host, jsonObject);
         }
         hfa = new HostFromPropertiesFileAdapter(mapKnownHosts);
-        Assert.assertTrue(hfa.initializeAdapter());
+        Assert.assertTrue(hfa.initializeAdapter(null));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
index 7999a4c..35a90b7 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
@@ -173,7 +173,7 @@ public class SimpleHBaseAdapterTest {
   public void testInitializeAdapter() {
     SimpleHBaseConfig config = new SimpleHBaseConfig();
     SimpleHBaseAdapter sha = new SimpleHBaseAdapter(config);
-    sha.initializeAdapter();
+    sha.initializeAdapter(null);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
index 966538f..e96c7a7 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
@@ -157,7 +157,7 @@ public class ThreatIntelAdapterTest {
     config.withProviderImpl(ExceptionProvider.class.getName());
     ThreatIntelAdapter tia = new ThreatIntelAdapter(config);
     UnitTestHelper.setLog4jLevel(ThreatIntelAdapter.class, Level.FATAL);
-    tia.initializeAdapter();
+    tia.initializeAdapter(null);
     UnitTestHelper.setLog4jLevel(ThreatIntelAdapter.class, Level.ERROR);
     Assert.assertFalse(tia.isInitialized());
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
index 27acda3..b0076a4 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
@@ -18,6 +18,7 @@
 package org.apache.metron.enrichment.bolt;
 
 import org.apache.log4j.Level;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.storm.tuple.Values;
 import com.google.common.collect.ImmutableMap;
@@ -38,6 +39,7 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.HashMap;
@@ -137,6 +139,8 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
 
   @Test
   public void test() throws IOException {
+    when(tuple.getSourceComponent()).thenReturn("unit test component");
+    when(tuple.getSourceStreamId()).thenReturn("unit test stream");
     String key = "someKey";
     String enrichmentType = "enrichmentType";
     Enrichment<EnrichmentAdapter<CacheKey>> testEnrichment = new Enrichment<>();
@@ -151,6 +155,13 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
     genericEnrichmentBolt.setCuratorFramework(client);
     genericEnrichmentBolt.setTreeCache(cache);
     genericEnrichmentBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+
+    HashMap<String, Object> globalConfig = new HashMap<>();
+    String baseDir = UnitTestHelper.findDir("GeoLite");
+    File geoHdfsFile = new File(new File(baseDir), "GeoIP2-City-Test.mmdb.gz");
+    globalConfig.put(GeoLiteDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath());
+    genericEnrichmentBolt.getConfigurations().updateGlobalConfig(globalConfig);
+
     try {
       genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
       fail("Should fail if a maxCacheSize property is not set");
@@ -166,10 +177,10 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest {
       fail("Should fail if an adapter is not set");
     } catch(IllegalStateException e) {}
     genericEnrichmentBolt.withEnrichment(testEnrichment);
-    when(enrichmentAdapter.initializeAdapter()).thenReturn(true);
+    when(enrichmentAdapter.initializeAdapter(globalConfig)).thenReturn(true);
     genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    verify(enrichmentAdapter, times(1)).initializeAdapter();
-    when(enrichmentAdapter.initializeAdapter()).thenReturn(false);
+    verify(enrichmentAdapter, times(1)).initializeAdapter(globalConfig);
+    when(enrichmentAdapter.initializeAdapter(globalConfig)).thenReturn(false);
     UnitTestHelper.setLog4jLevel(GenericEnrichmentBolt.class, Level.FATAL);
     try {
       genericEnrichmentBolt.prepare(new HashMap(), topologyContext, outputCollector);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
index bb539f3..d942d9b 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
@@ -21,16 +21,20 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 import junit.framework.Assert;
 import junit.framework.TestCase;
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
 import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
+import org.apache.metron.test.utils.UnitTestHelper;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.HashMap;
@@ -158,6 +162,11 @@ public class ThreatIntelJoinBoltTest extends BaseEnrichmentBoltTest {
       }
     }
     threatIntelJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, enrichmentConfig);
+    HashMap<String, Object> globalConfig = new HashMap<>();
+    String baseDir = UnitTestHelper.findDir("GeoLite");
+    File geoHdfsFile = new File(new File(baseDir), "GeoIP2-City-Test.mmdb.gz");
+    globalConfig.put(GeoLiteDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath());
+    threatIntelJoinBolt.getConfigurations().updateGlobalConfig(globalConfig);
     threatIntelJoinBolt.withMaxCacheSize(100);
     threatIntelJoinBolt.withMaxTimeRetain(10000);
     threatIntelJoinBolt.prepare(new HashMap<>(), topologyContext, outputCollector);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index f399c81..27c1d11 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -25,19 +25,16 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
 import org.apache.metron.enrichment.bolt.ErrorEnrichmentBolt;
 import org.apache.metron.enrichment.converter.EnrichmentHelper;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
 import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
-import org.apache.metron.enrichment.integration.mock.MockGeoAdapter;
 import org.apache.metron.enrichment.lookup.LookupKV;
 import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator;
 import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions;
 import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.enrichment.converter.EnrichmentKey;
-import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.enrichment.converter.EnrichmentHelper;
 import org.apache.metron.integration.*;
 import org.apache.metron.integration.components.FluxTopologyComponent;
 import org.apache.metron.integration.components.KafkaComponent;
@@ -46,20 +43,17 @@ import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.integration.processors.KafkaProcessor;
 import org.apache.metron.integration.utils.TestUtils;
 import org.apache.metron.test.mock.MockHTable;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.json.simple.parser.ParseException;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
 
 public class EnrichmentIntegrationTest extends BaseIntegrationTest {
   private static final String SRC_IP = "ip_src_addr";
@@ -69,10 +63,21 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
   private static final Map<String, Object> PLAYFUL_ENRICHMENT = new HashMap<String, Object>() {{
     put("orientation", "north");
   }};
+
+  public static final String DEFAULT_COUNTRY = "test country";
+  public static final String DEFAULT_CITY = "test city";
+  public static final String DEFAULT_POSTAL_CODE = "test postalCode";
+  public static final String DEFAULT_LATITUDE = "test latitude";
+  public static final String DEFAULT_LONGITUDE = "test longitude";
+  public static final String DEFAULT_DMACODE= "test dmaCode";
+  public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE);
+
   protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/test.yaml";
   protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
-  private String sampleIndexedPath = TestConstants.SAMPLE_DATA_INDEXED_PATH + "TestIndexed";
   private final List<byte[]> inputMessages = getInputMessages(sampleParsedPath);
+
+  private static File geoHdfsFile;
+
   public static class Provider implements TableProvider, Serializable {
     MockHTable.Provider  provider = new MockHTable.Provider();
     @Override
@@ -88,6 +93,13 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
       return null;
     }
   }
+
+  @BeforeClass
+  public static void setupOnce() throws ParseException {
+    String baseDir = UnitTestHelper.findDir("GeoLite");
+    geoHdfsFile = new File(new File(baseDir), "GeoIP2-City-Test.mmdb.gz");
+  }
+
   @Test
   public void test() throws Exception {
     final String cf = "cf";
@@ -124,6 +136,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
       config.put(SimpleHBaseEnrichmentFunctions.ACCESS_TRACKER_TYPE_CONF, "PERSISTENT_BLOOM");
       config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_TABLE, trackerHBaseTableName);
       config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_CF, cf);
+      config.put(GeoLiteDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath());
       globalConfigStr = JSONUtils.INSTANCE.toJSON(config, true);
     }
     ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
@@ -366,21 +379,25 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
   }
 
   private static void geoEnrichmentValidation(Map<String, Object> indexedDoc) {
-    //should have geo enrichment on every message due to mock geo adapter
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP +".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
+    // Need to check both separately. Local IPs will have no Geo entries
+    if(indexedDoc.containsKey("enrichments.geo." + DST_IP + ".location_point")) {
+      Assert.assertEquals(DEFAULT_LOCATION_POINT, indexedDoc.get("enrichments.geo." + DST_IP + ".location_point"));
+      Assert.assertEquals(DEFAULT_LONGITUDE, indexedDoc.get("enrichments.geo." + DST_IP + ".longitude"));
+      Assert.assertEquals(DEFAULT_CITY, indexedDoc.get("enrichments.geo." + DST_IP + ".city"));
+      Assert.assertEquals(DEFAULT_LATITUDE, indexedDoc.get("enrichments.geo." + DST_IP + ".latitude"));
+      Assert.assertEquals(DEFAULT_COUNTRY, indexedDoc.get("enrichments.geo." + DST_IP + ".country"));
+      Assert.assertEquals(DEFAULT_DMACODE, indexedDoc.get("enrichments.geo." + DST_IP + ".dmaCode"));
+      Assert.assertEquals(DEFAULT_POSTAL_CODE, indexedDoc.get("enrichments.geo." + DST_IP + ".postalCode"));
+    }
+    if(indexedDoc.containsKey("enrichments.geo." + SRC_IP + ".location_point")) {
+      Assert.assertEquals(DEFAULT_LOCATION_POINT, indexedDoc.get("enrichments.geo." + SRC_IP + ".location_point"));
+      Assert.assertEquals(DEFAULT_LONGITUDE, indexedDoc.get("enrichments.geo." + SRC_IP + ".longitude"));
+      Assert.assertEquals(DEFAULT_CITY, indexedDoc.get("enrichments.geo." + SRC_IP + ".city"));
+      Assert.assertEquals(DEFAULT_LATITUDE, indexedDoc.get("enrichments.geo." + SRC_IP + ".latitude"));
+      Assert.assertEquals(DEFAULT_COUNTRY, indexedDoc.get("enrichments.geo." + SRC_IP + ".country"));
+      Assert.assertEquals(DEFAULT_DMACODE, indexedDoc.get("enrichments.geo." + SRC_IP + ".dmaCode"));
+      Assert.assertEquals(DEFAULT_POSTAL_CODE, indexedDoc.get("enrichments.geo." + SRC_IP + ".postalCode"));
+    }
   }
 
   private static void hostEnrichmentValidation(Map<String, Object> indexedDoc) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockGeoAdapter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockGeoAdapter.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockGeoAdapter.java
deleted file mode 100644
index b0491b8..0000000
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockGeoAdapter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.integration.mock;
-
-import com.google.common.base.Joiner;
-import org.apache.metron.enrichment.bolt.CacheKey;
-import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.json.simple.JSONObject;
-
-import java.io.Serializable;
-
-public class MockGeoAdapter implements EnrichmentAdapter<CacheKey>,
-        Serializable {
-
-  public static final String DEFAULT_LOC_ID = "1";
-  public static final String DEFAULT_COUNTRY = "test country";
-  public static final String DEFAULT_CITY = "test city";
-  public static final String DEFAULT_POSTAL_CODE = "test postalCode";
-  public static final String DEFAULT_LATITUDE = "test latitude";
-  public static final String DEFAULT_LONGITUDE = "test longitude";
-  public static final String DEFAULT_DMACODE= "test dmaCode";
-  public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE);
-
-  @Override
-  public void logAccess(CacheKey value) {
-
-  }
-
-  @Override
-  public JSONObject enrich(CacheKey cache ) {
-    JSONObject enriched = new JSONObject();
-    enriched.put("locID", DEFAULT_LOC_ID);
-    enriched.put("country", DEFAULT_COUNTRY);
-    enriched.put("city", DEFAULT_CITY);
-    enriched.put("postalCode", DEFAULT_POSTAL_CODE);
-    enriched.put("latitude", DEFAULT_LATITUDE);
-    enriched.put("longitude", DEFAULT_LONGITUDE);
-    enriched.put("dmaCode", DEFAULT_DMACODE);
-    enriched.put("location_point", DEFAULT_LOCATION_POINT);
-    return enriched;
-  }
-
-  @Override
-  public boolean initializeAdapter() {
-    return true;
-  }
-
-  @Override
-  public void cleanup() {
-
-  }
-
-  @Override
-  public String getOutputPrefix(CacheKey value) {
-    return value.getField();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctionsTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctionsTest.java
new file mode 100644
index 0000000..c87449d
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/GeoEnrichmentFunctionsTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.stellar;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+public class GeoEnrichmentFunctionsTest {
+  private static Context context;
+  private static File geoHdfsFile;
+
+  /**
+   * {
+   * "locID":"6252001",
+   * "country":"US",
+   * "city":"Milton",
+   * "postalCode":"98354",
+   * "latitude":"47.2513",
+   * "longitude":"-122.3149",
+   * "dmaCode":"819",
+   * "location_point":"47.2513,-122.3149"
+   * }
+   */
+  @Multiline
+  private static String expectedMessageString;
+
+  private static JSONObject expectedMessage;
+
+  /**
+   * {
+   * "country":"US",
+   * "city":"Milton",
+   * "dmaCode":"819",
+   * "location_point":"47.2513,-122.3149"
+   * }
+   */
+  @Multiline
+  private static String expectedSubsetString;
+
+  private static JSONObject expectedSubsetMessage;
+
+  @BeforeClass
+  public static void setupOnce() throws ParseException {
+    JSONParser jsonParser = new JSONParser();
+    expectedMessage = (JSONObject) jsonParser.parse(expectedMessageString);
+
+    expectedSubsetMessage = (JSONObject) jsonParser.parse(expectedSubsetString);
+
+    String baseDir = UnitTestHelper.findDir("GeoLite");
+    geoHdfsFile = new File(new File(baseDir), "GeoIP2-City-Test.mmdb.gz");
+  }
+
+  @Before
+  public void setup() throws Exception {
+    context = new Context.Builder().with(Context.Capabilities.GLOBAL_CONFIG
+            , () -> ImmutableMap.of(GeoLiteDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath())
+    )
+            .build();
+  }
+
+  public Object run(String rule, Map<String, Object> variables) throws Exception {
+    StellarProcessor processor = new StellarProcessor();
+    Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));
+    return processor.parse(rule, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), context);
+  }
+
+  @Test
+  public void testGetNull() throws Exception {
+    String stellar = "GEO_GET()";
+    Object result = run(stellar, ImmutableMap.of());
+    Assert.assertEquals("Null IP should return null", null, result);
+  }
+
+  @Test
+  public void testGetEmptyString() throws Exception {
+    String stellar = "GEO_GET('  ')";
+    Object result = run(stellar, ImmutableMap.of());
+    Assert.assertEquals("Empty IP should return null", null, result);
+  }
+
+  @Test
+  public void testGetLocal() throws Exception {
+    String stellar = "GEO_GET('192.168.0.1')";
+    Object result = run(stellar, ImmutableMap.of());
+    Assert.assertEquals("Local IP should return empty map", new HashMap<String, String>(), result);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetRemote() throws Exception {
+    String stellar = "GEO_GET('216.160.83.56')";
+    Object result = run(stellar, ImmutableMap.of());
+    Assert.assertEquals("Remote Local IP should return result based on DB", expectedMessage, result);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetRemoteSingleField() throws Exception {
+    String stellar = "GEO_GET('216.160.83.56', ['country'])";
+    Object result = run(stellar, ImmutableMap.of());
+    Assert.assertEquals("Remote Local IP should return country result based on DB", "US", result);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetRemoteMultipleFields() throws Exception {
+    String stellar = "GEO_GET('216.160.83.56', ['country', 'city', 'dmaCode', 'location_point'])";
+    Object result = run(stellar, ImmutableMap.of());
+    Assert.assertEquals("Remote Local IP should return country result based on DB", expectedSubsetMessage, result);
+  }
+
+  @Test(expected=org.apache.metron.common.dsl.ParseException.class)
+  @SuppressWarnings("unchecked")
+  public void testGetTooManyParams() throws Exception {
+    String stellar = "GEO_GET('216.160.83.56', ['country', 'city', 'dmaCode', 'location_point'], 'garbage')";
+    run(stellar, ImmutableMap.of());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/resources/GeoLite/GeoIP2-City-Test-2.mmdb.gz
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/resources/GeoLite/GeoIP2-City-Test-2.mmdb.gz b/metron-platform/metron-enrichment/src/test/resources/GeoLite/GeoIP2-City-Test-2.mmdb.gz
new file mode 100644
index 0000000..406c656
Binary files /dev/null and b/metron-platform/metron-enrichment/src/test/resources/GeoLite/GeoIP2-City-Test-2.mmdb.gz differ

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz b/metron-platform/metron-enrichment/src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz
new file mode 100644
index 0000000..406c656
Binary files /dev/null and b/metron-platform/metron-enrichment/src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz differ

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/resources/GeoMysqlAdapterTest.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/resources/GeoMysqlAdapterTest.properties b/metron-platform/metron-enrichment/src/test/resources/GeoMysqlAdapterTest.properties
deleted file mode 100644
index ef7126f..0000000
--- a/metron-platform/metron-enrichment/src/test/resources/GeoMysqlAdapterTest.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-mysql.ip=172.30.9.120
-mysql.port=3306
-mysql.username=test
-mysql.password=123123
-
-#GeoEnrichment
-bolt.enrichment.geo.enrichment_tag=geo
-bolt.enrichment.geo.adapter.table=GEO
-bolt.enrichment.geo.MAX_CACHE_SIZE_OBJECTS_NUM=10000
-bolt.enrichment.geo.MAX_TIME_RETAIN_MINUTES=10
-bolt.enrichment.geo.source=ip_src_addr,ip_dst_addr

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-enrichment/src/test/resources/TestSchemas/GeoMySqlSchema.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/resources/TestSchemas/GeoMySqlSchema.json b/metron-platform/metron-enrichment/src/test/resources/TestSchemas/GeoMySqlSchema.json
deleted file mode 100644
index c4f2a82..0000000
--- a/metron-platform/metron-enrichment/src/test/resources/TestSchemas/GeoMySqlSchema.json
+++ /dev/null
@@ -1,42 +0,0 @@
-{
-"title": "GeoMySql Schema",
-"type": "object",
-"properties": {
-
-         "city"    : {
-					   "type": "string"
-				  },
-		 "country" : {
-						"type": "string"
-					},
-		 "dmaCode" :
-		 			 {
-						"type": "string"
-					},
-	     "geoHash" : 
-	     			{
-						"type": "string"
-					},
-		 "latitude" : 
-		 			{
-						"type": "string"
-				   },
-		 "locID" : 
-		 			{
-					   "type": "string"
-				   },
-		 "location_point" : 
-		 			{
-					   "type": "string"
-				    },
-		 "longitude" : 
-		 			{
-						"type": "string"
-					},
-		 "postalCode" : 
-		 			{
-						"type": "string"
-					}
-   },
-   "required": ["city", "country", "dmaCode","latitude","locID","location_point","postalCode"]
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/pom.xml b/metron-platform/metron-hbase/pom.xml
index 5b302f5..834c197 100644
--- a/metron-platform/metron-hbase/pom.xml
+++ b/metron-platform/metron-hbase/pom.xml
@@ -24,7 +24,6 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-        <mysql.version>5.1.31</mysql.version>
         <slf4j.version>1.7.7</slf4j.version>
         <storm.hdfs.version>0.1.2</storm.hdfs.version>
         <guava.version>${global_hbase_guava_version}</guava.version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
index 5ac0579..8d3005f 100644
--- a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
@@ -21,5 +21,7 @@
   "profiler.client.hbase.table": "profiler",
   "profiler.client.hbase.column.family": "P",
   "profiler.client.salt.divisor": "1000",
-  "hbase.provider.impl": "org.apache.metron.hbase.HTableProvider"
+  "hbase.provider.impl": "org.apache.metron.hbase.HTableProvider",
+
+  "geo.hdfs.file": "src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index abbd9d8..416d3ed 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.parsers.bolt;
 
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -72,6 +73,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     super.prepare(stormConf, context, collector);
+    String hdfsFile = (String) getConfigurations().getGlobalConfig().get(GeoLiteDatabase.GEO_HDFS_FILE);
+    GeoLiteDatabase.INSTANCE.update(hdfsFile);
     this.collector = collector;
     initializeStellar();
     if(getSensorParserConfig() != null && filter == null) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index cf90178..de98a64 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -19,6 +19,8 @@ package org.apache.metron.parsers.bolt;
 
 import org.apache.metron.common.configuration.*;
 
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.ImmutableList;
@@ -44,6 +46,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mock;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -135,6 +138,9 @@ public class ParserBoltTest extends BaseBoltTest {
       }
 
     };
+
+    buildGlobalConfig(parserBolt);
+
     parserBolt.setCuratorFramework(client);
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
@@ -172,6 +178,9 @@ public class ParserBoltTest extends BaseBoltTest {
       }
 
     };
+
+    buildGlobalConfig(parserBolt);
+
     parserBolt.setCuratorFramework(client);
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
@@ -228,6 +237,9 @@ public void testImplicitBatchOfOne() throws Exception {
       };
     }
   };
+
+  buildGlobalConfig(parserBolt);
+
   parserBolt.setCuratorFramework(client);
   parserBolt.setTreeCache(cache);
   parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
@@ -273,6 +285,9 @@ public void testImplicitBatchOfOne() throws Exception {
         }
       }
     };
+
+    buildGlobalConfig(parserBolt);
+
     parserBolt.setCuratorFramework(client);
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
@@ -309,6 +324,9 @@ public void testImplicitBatchOfOne() throws Exception {
         }
       }
     };
+
+    buildGlobalConfig(parserBolt);
+
     parserBolt.setCuratorFramework(client);
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
@@ -377,6 +395,9 @@ public void testImplicitBatchOfOne() throws Exception {
         }
       }
     };
+
+    buildGlobalConfig(parserBolt);
+
     parserBolt.setCuratorFramework(client);
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
@@ -387,8 +408,6 @@ public void testImplicitBatchOfOne() throws Exception {
     Assert.assertEquals(expected, recordingWriter.getRecords().get(0).get("timestamp"));
   }
 
-
-
   @Test
   public void testBatchOfOne() throws Exception {
 
@@ -412,6 +431,9 @@ public void testImplicitBatchOfOne() throws Exception {
         };
       }
     };
+
+    buildGlobalConfig(parserBolt);
+
     parserBolt.setCuratorFramework(client);
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
@@ -450,6 +472,9 @@ public void testImplicitBatchOfOne() throws Exception {
         };
       }
     };
+
+    buildGlobalConfig(parserBolt);
+
     parserBolt.setCuratorFramework(client);
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
@@ -498,6 +523,9 @@ public void testImplicitBatchOfOne() throws Exception {
         };
       }
     };
+
+    buildGlobalConfig(parserBolt);
+
     parserBolt.setCuratorFramework(client);
     parserBolt.setTreeCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
@@ -522,6 +550,13 @@ public void testImplicitBatchOfOne() throws Exception {
 
   }
 
+  protected void buildGlobalConfig(ParserBolt parserBolt) {
+    HashMap<String, Object> globalConfig = new HashMap<>();
+    String baseDir = UnitTestHelper.findDir("GeoLite");
+    File geoHdfsFile = new File(new File(baseDir), "GeoIP2-City-Test.mmdb.gz");
+    globalConfig.put(GeoLiteDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath());
+    parserBolt.getConfigurations().updateGlobalConfig(globalConfig);
+  }
 
   private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) {
     bolt.execute(t);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index f37b1fc..eaeaeb5 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -61,6 +61,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
   }
   /**
    {
+   "geo.hdfs.file": "src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz",
     "fieldValidations" : [
         {
           "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/fa6f3df5/metron-platform/metron-parsers/src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz b/metron-platform/metron-parsers/src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz
new file mode 100644
index 0000000..406c656
Binary files /dev/null and b/metron-platform/metron-parsers/src/test/resources/GeoLite/GeoIP2-City-Test.mmdb.gz differ


Mime
View raw message