From commits-return-4153-archive-asf-public=cust-asf.ponee.io@metron.apache.org Tue Dec 4 15:27:40 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D1142180652 for ; Tue, 4 Dec 2018 15:27:39 +0100 (CET) Received: (qmail 1430 invoked by uid 500); 4 Dec 2018 14:27:39 -0000 Mailing-List: contact commits-help@metron.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@metron.apache.org Delivered-To: mailing list commits@metron.apache.org Received: (qmail 1421 invoked by uid 99); 4 Dec 2018 14:27:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Dec 2018 14:27:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B7A5EE1395; Tue, 4 Dec 2018 14:27:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mmiklavcic@apache.org To: commits@metron.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: metron git commit: METRON-1889: Add any missing timestamp fields to unified enrichment topology (mmiklavc via mmiklavc) closes apache/metron#1286 Date: Tue, 4 Dec 2018 14:27:38 +0000 (UTC) Repository: metron Updated Branches: refs/heads/master b4d76f98e -> 4ef65e09e METRON-1889: Add any missing timestamp fields to unified enrichment topology (mmiklavc via mmiklavc) closes apache/metron#1286 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/4ef65e09 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/4ef65e09 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/4ef65e09 Branch: refs/heads/master Commit: 4ef65e09ea4a1eac8abf89521e5a999faeca1f37 Parents: b4d76f9 Author: mmiklavc Authored: Tue Dec 4 07:27:33 2018 -0700 Committer: Michael Miklavcic Committed: Tue Dec 4 07:27:33 2018 -0700 ---------------------------------------------------------------------- .../enrichment/parallel/ParallelEnricher.java | 10 +- .../enrichment/utils/EnrichmentUtils.java | 13 +-- .../parallel/ParallelEnricherTest.java | 104 ++++++++++++------- 3 files changed, 77 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java index b10c148..1de8945 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java @@ -157,6 +157,7 @@ public class ParallelEnricher { throw new IllegalStateException("Unable to find an adapter for " + task.getKey() + ", possible adapters are: " + Joiner.on(",").join(enrichmentsByType.keySet())); } + message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis()); for(JSONObject m : task.getValue()) { /* now for each unit of work (each of these only has one element in them) * the key is the field name and the value is value associated with that field. @@ -171,6 +172,7 @@ public class ParallelEnricher { String field = (String) o; Object value = m.get(o); if(value == null) { + message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis()); continue; } CacheKey cacheKey = new CacheKey(field, value, config); @@ -182,7 +184,10 @@ public class ParallelEnricher { ret = new JSONObject(); } //each enrichment has their own unique prefix to use to adjust the keys for the enriched fields. - return EnrichmentUtils.adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix); + JSONObject adjustedKeys = EnrichmentUtils + .adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix); + adjustedKeys.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis()); + return adjustedKeys; } catch (Throwable e) { JSONObject errorMessage = new JSONObject(); errorMessage.putAll(m); @@ -197,11 +202,12 @@ public class ParallelEnricher { } } if(taskList.isEmpty()) { + message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis()); return new EnrichmentResult(message, errors); } EnrichmentResult ret = new EnrichmentResult(all(taskList, message, (left, right) -> join(left, right)).get(), errors); - message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis()); + ret.getResult().put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis()); if(perfLog != null) { String key = message.get(Constants.GUID) + ""; perfLog.log("enrich", "key={}, elapsed time to enrich", key); http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java index 63d39c5..9a36a87 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java @@ -21,21 +21,18 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.metron.common.configuration.enrichment.EnrichmentConfig; -import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.lookup.EnrichmentLookup; import org.apache.metron.enrichment.lookup.handler.KeyWithContext; import org.apache.metron.hbase.TableProvider; -import org.apache.metron.enrichment.converter.EnrichmentKey; import org.json.simple.JSONObject; -import sun.management.Sensor; - -import javax.annotation.Nullable; -import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; -import java.util.Map; public class EnrichmentUtils { http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java index d4fcdf4..a6832d6 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java @@ -65,51 +65,57 @@ public class ParallelEnricherTest { private static Context stellarContext; private static AtomicInteger numAccesses = new AtomicInteger(0); private static Map> enrichmentsByType; - @BeforeClass - public static void setup() { - ConcurrencyContext infrastructure = new ConcurrencyContext(); - infrastructure.initialize(5, 100, 10, null, null, false); - stellarContext = new Context.Builder() - .build(); - StellarFunctions.initialize(stellarContext); - StellarAdapter adapter = new StellarAdapter(){ - @Override - public void logAccess(CacheKey value) { - numAccesses.incrementAndGet(); - } - }.ofType("ENRICHMENT"); - adapter.initializeAdapter(new HashMap<>()); - EnrichmentAdapter dummy = new EnrichmentAdapter() { - @Override - public void logAccess(CacheKey value) { + // Declaring explicit class bc getClass().getSimpleName() returns "" for anon classes + public static class DummyEnrichmentAdapter implements EnrichmentAdapter { + @Override + public void logAccess(CacheKey value) { - } + } - @Override - public JSONObject enrich(CacheKey value) { - return null; - } + @Override + public JSONObject enrich(CacheKey value) { + return null; + } - @Override - public boolean initializeAdapter(Map config) { - return false; - } + @Override + public boolean initializeAdapter(Map config) { + return false; + } - @Override - public void updateAdapter(Map config) { + @Override + public void updateAdapter(Map config) { - } + } - @Override - public void cleanup() { + @Override + public void cleanup() { - } + } - @Override - public String getOutputPrefix(CacheKey value) { - return null; - } - }; + @Override + public String getOutputPrefix(CacheKey value) { + return null; + } + } + + // Declaring explicit class bc getClass().getSimpleName() returns "" for anon classes + public static class AccessLoggingStellarAdapter extends StellarAdapter { + @Override + public void logAccess(CacheKey value) { + numAccesses.incrementAndGet(); + } + } + + @BeforeClass + public static void setup() { + ConcurrencyContext infrastructure = new ConcurrencyContext(); + infrastructure.initialize(5, 100, 10, null, null, false); + stellarContext = new Context.Builder() + .build(); + StellarFunctions.initialize(stellarContext); + StellarAdapter adapter = new AccessLoggingStellarAdapter().ofType("ENRICHMENT"); + adapter.initializeAdapter(new HashMap<>()); + EnrichmentAdapter dummy = new DummyEnrichmentAdapter(); enrichmentsByType = ImmutableMap.of("stellar", adapter, "dummy", dummy); enricher = new ParallelEnricher(enrichmentsByType, infrastructure, false); @@ -139,13 +145,19 @@ public class ParallelEnricherTest { }}; ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); JSONObject ret = result.getResult(); - Assert.assertEquals("Got the wrong result count: " + ret, 8, ret.size()); + Assert.assertEquals("Got the wrong result count: " + ret, 11, ret.size()); Assert.assertEquals(1, ret.get("map.blah")); Assert.assertEquals("test", ret.get("source.type")); Assert.assertEquals(1, ret.get("one")); Assert.assertEquals(2, ret.get("foo")); Assert.assertEquals("TEST", ret.get("ALL_CAPS")); Assert.assertEquals(0, result.getEnrichmentErrors().size()); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts")); } /** * { @@ -170,7 +182,13 @@ public class ParallelEnricherTest { }}; ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); JSONObject ret = result.getResult(); - Assert.assertEquals("Got the wrong result count: " + ret, 4, ret.size()); + Assert.assertEquals("Got the wrong result count: " + ret, 7, ret.size()); + Assert.assertTrue(result.getResult().containsKey("adapter.dummyenrichmentadapter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("adapter.dummyenrichmentadapter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts")); } /** @@ -208,13 +226,19 @@ public class ParallelEnricherTest { }}; ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); JSONObject ret = result.getResult(); - Assert.assertEquals(ret + " is not what I expected", 8, ret.size()); + Assert.assertEquals(ret + " is not what I expected", 11, ret.size()); Assert.assertEquals(1, ret.get("map.blah")); Assert.assertEquals("test", ret.get("source.type")); Assert.assertEquals(1, ret.get("one")); Assert.assertEquals(2, ret.get("foo")); Assert.assertEquals("TEST", ret.get("ALL_CAPS")); Assert.assertEquals(1, result.getEnrichmentErrors().size()); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts")); } /**