metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From o...@apache.org
Subject [01/21] metron git commit: METRON-1379: Add an OBJECT_GET stellar function closes apache/incubator-metron#880
Date Thu, 25 Jan 2018 14:05:24 GMT
Repository: metron
Updated Branches:
  refs/heads/feature/METRON-1211-extensions-parsers-gradual e82139189 -> 40411d4bd


METRON-1379: Add an OBJECT_GET stellar function closes apache/incubator-metron#880


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/c559ed7e
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/c559ed7e
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/c559ed7e

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: c559ed7e1838ec71344eae3d9e37771db2641635
Parents: e821391
Author: cstella <cestella@gmail.com>
Authored: Tue Jan 9 15:28:47 2018 -0500
Committer: cstella <cestella@gmail.com>
Committed: Tue Jan 9 15:28:47 2018 -0500

----------------------------------------------------------------------
 .../metron/enrichment/stellar/ObjectGet.java    | 156 +++++++++++++++++++
 .../enrichment/stellar/ObjectGetTest.java       |  90 +++++++++++
 metron-stellar/stellar-common/README.md         |   9 ++
 3 files changed, 255 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/c559ed7e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java
new file mode 100644
index 0000000..ebb94da
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.stellar;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+@Stellar(namespace="OBJECT"
+        ,name="GET"
+        ,description="Retrieve and deserialize a serialized object from HDFS.  " +
+        "The cache can be specified via two properties in the global config: " +
+        "\"" + ObjectGet.OBJECT_CACHE_SIZE_KEY + "\" (default " + ObjectGet.OBJECT_CACHE_SIZE_DEFAULT
+ ")," +
+        "\"" + ObjectGet.OBJECT_CACHE_EXPIRATION_KEY+ "\" (default 1440).  Note, if these
are changed in global config, " +
+        "topology restart is required."
+        , params = {
+            "path - The path in HDFS to the serialized object"
+          }
+        , returns="The deserialized object."
+)
+public class ObjectGet implements StellarFunction {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final String OBJECT_CACHE_SIZE_KEY = "object.cache.size";
+  public static final String OBJECT_CACHE_EXPIRATION_KEY = "object.cache.expiration.minutes";
+  public static final int OBJECT_CACHE_SIZE_DEFAULT = 1000;
+  public static final long OBJECT_CACHE_EXPIRATION_MIN_DEFAULT = TimeUnit.HOURS.toMinutes(24);
+  protected static LoadingCache<String, Object> cache;
+  private static ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  public static class Loader extends CacheLoader<String, Object> {
+    FileSystem fs;
+    public Loader(Configuration hadoopConfig) throws IOException {
+      this.fs = FileSystem.get(hadoopConfig);
+    }
+    @Override
+    public Object load(String s) throws Exception {
+      if(StringUtils.isEmpty(s)) {
+        return null;
+      }
+      Path p = new Path(s);
+      if(fs.exists(p)) {
+        try(InputStream is = new BufferedInputStream(fs.open(p))) {
+          byte[] serialized = IOUtils.toByteArray(is);
+          if(serialized.length > 0) {
+            Object ret = SerDeUtils.fromBytes(serialized, Object.class);
+            return ret;
+          }
+        }
+      }
+      return null;
+    }
+  }
+
+  @Override
+  public Object apply(List<Object> args, Context context) throws ParseException {
+    if(!isInitialized()) {
+      return null;
+    }
+    if(args.size() < 1) {
+      return null;
+    }
+    Object o = args.get(0);
+    if(o == null) {
+      return null;
+    }
+    if(o instanceof String) {
+      try {
+        return cache.get((String)o);
+      } catch (ExecutionException e) {
+        throw new IllegalStateException("Unable to retrieve " + o + " because " + e.getMessage(),
e);
+      }
+    }
+    else {
+      throw new IllegalStateException("Unable to retrieve " + o + " as it is not a path");
+    }
+  }
+
+  @Override
+  public void initialize(Context context) {
+    try {
+      lock.writeLock().lock();
+      Map<String, Object> config = getConfig(context);
+      long size = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_SIZE_KEY, OBJECT_CACHE_SIZE_DEFAULT),
Long.class);
+      long expiryMin = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_EXPIRATION_KEY,
OBJECT_CACHE_EXPIRATION_MIN_DEFAULT), Long.class);
+      cache = setupCache(size, expiryMin);
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to initialize: " + e.getMessage(), e);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean isInitialized() {
+    try {
+      lock.readLock().lock();
+      return cache != null;
+    }
+    finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  protected LoadingCache<String, Object> setupCache(long size, long expiryMin) throws
IOException {
+    return CacheBuilder.newBuilder()
+                       .maximumSize(size)
+                       .expireAfterAccess(expiryMin, TimeUnit.MINUTES)
+                       .build(new Loader(new Configuration()));
+  }
+
+  protected Map<String, Object> getConfig(Context context) {
+      return (Map<String, Object>) context.getCapability(Context.Capabilities.GLOBAL_CONFIG,
false).orElse(new HashMap<>());
+    }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/c559ed7e/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
new file mode 100644
index 0000000..400dfb8
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.enrichment.stellar;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class ObjectGetTest {
+  FileSystem fs;
+  List<String> data;
+
+  @Before
+  public void setup() throws IOException {
+    fs = FileSystem.get(new Configuration());
+    data = new ArrayList<>();
+    {
+      data.add("apache");
+      data.add("metron");
+      data.add("is");
+      data.add("great");
+    }
+
+  }
+
+  @Test
+  public void test() throws Exception {
+    String filename = "target/ogt/test.ser";
+    Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename));
+    assertDataIsReadCorrectly(filename);
+  }
+
+  public void assertDataIsReadCorrectly(String filename) throws IOException {
+    try(BufferedOutputStream bos = new BufferedOutputStream(fs.create(new Path(filename),
true))) {
+      IOUtils.write(SerDeUtils.toBytes(data), bos);
+    }
+    List<String> readData = (List<String>) StellarProcessorUtils.run("OBJECT_GET(loc)",
ImmutableMap.of("loc", filename));
+    Assert.assertEquals(readData, data);
+    Assert.assertTrue(ObjectGet.cache.asMap().containsKey(filename));
+  }
+
+  @Test
+  public void testMultithreaded() throws Exception {
+    String filename = "target/ogt/testmulti.ser";
+    Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename));
+    Thread[] ts = new Thread[10];
+    for(int i = 0;i < ts.length;++i) {
+      ts[i] = new Thread(() -> {
+        try {
+          assertDataIsReadCorrectly(filename);
+        } catch (IOException e) {
+          throw new IllegalStateException(e.getMessage(), e);
+        }
+      });
+      ts[i].start();
+    }
+    for(Thread t : ts) {
+      t.join();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/c559ed7e/metron-stellar/stellar-common/README.md
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/README.md b/metron-stellar/stellar-common/README.md
index 6c420b6..2ef81e8 100644
--- a/metron-stellar/stellar-common/README.md
+++ b/metron-stellar/stellar-common/README.md
@@ -221,6 +221,7 @@ Where:
 | [ `MULTISET_MERGE`](#multiset_merge)                                                  
            |
 | [ `MULTISET_REMOVE`](#multiset_remove)                                                
            |
 | [ `MULTISET_TO_SET`](#multiset_to_set)                                                
            |
+| [ `OBJECT_GET`](#object_get)                                                          
            |
 | [ `PREPEND_IF_MISSING`](#prepend_if_missing)                                          
            |
 | [ `PROFILE_GET`](#profile_get)                                                        
            |
 | [ `PROFILE_FIXED`](#profile_fixed)                                                    
            |
@@ -805,6 +806,14 @@ Where:
     * multiset - The multiset to convert.
   * Returns: The set of objects in the multiset ignoring multiplicity
 
+### `OBJECT_GET`
+  * Description: Retrieve and deserialize a serialized object from HDFS.  The cache can be
specified via two properties
+  in the global config: "object.cache.size" (default 1000), "object.cache.expiration.minutes"
(default 1440).  Note, if
+  these are changed in global config, topology restart is required.
+  * Input:
+    * path - The path in HDFS to the serialized object
+  * Returns: The deserialized object.
+
 ### `PREPEND_IF_MISSING`
   * Description: Prepends the prefix to the start of the string if the string does not already
start with any of the prefixes.
   * Input:


Mime
View raw message