metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [1/3] metron git commit: METRON-1241: Enable the REST API to use a cache for the zookeeper config similar to the Bolts closes apache/incubator-metron#795
Date Fri, 20 Oct 2017 21:20:19 GMT
Repository: metron
Updated Branches:
  refs/heads/master aee018476 -> cc111ec98


http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 49f111d..3316b32 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,10 +17,12 @@
  */
 package org.apache.metron.parsers.bolt;
 
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.*;
 
 import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
 import org.apache.metron.test.error.MetronErrorJSONMatcher;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -115,13 +117,35 @@ public class ParserBoltTest extends BaseBoltTest {
     }
   }
 
+  private static ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+    return new ConfigurationsUpdater<ParserConfigurations>(null, null) {
+      @Override
+      public void update(CuratorFramework client, String path, byte[] data) throws IOException
{ }
+
+      @Override
+      public void delete(CuratorFramework client, String path, byte[] data) throws IOException
{ }
+
+      @Override
+      public ConfigurationType getType() {
+        return ConfigurationType.PARSER;
+      }
+
+      @Override
+      public void update(String name, byte[] data) throws IOException { }
+
+      @Override
+      public void delete(String name) { }
+
+      @Override
+      public Class<ParserConfigurations> getConfigurationClass() {
+        return ParserConfigurations.class;
+      }
 
-  @Test
-  public void testEmpty() throws Exception {
-    String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer))
{
       @Override
-      protected ParserConfigurations defaultConfigurations() {
+      public void forceUpdate(CuratorFramework client) { }
+
+      @Override
+      public ParserConfigurations defaultConfigurations() {
         return new ParserConfigurations() {
           @Override
           public SensorParserConfig getSensorParserConfig(String sensorType) {
@@ -135,11 +159,22 @@ public class ParserBoltTest extends BaseBoltTest {
           }
         };
       }
+    };
+  }
+
 
+  @Test
+  public void testEmpty() throws Exception {
+    String sensorType = "yaf";
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer))
{
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
+      }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(writer, times(1)).init();
@@ -165,28 +200,15 @@ public class ParserBoltTest extends BaseBoltTest {
     String sensorType = "yaf";
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer))
{
       @Override
-      protected ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                }};
-              }
-
-
-            };
-          }
-        };
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
       }
     };
 
     buildGlobalConfig(parserBolt);
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     byte[] sampleBinary = "some binary message".getBytes();
 
@@ -218,23 +240,13 @@ public class ParserBoltTest extends BaseBoltTest {
     String sensorType = "yaf";
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer))
{
       @Override
-      protected ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                }};
-              }
-            };
-          }
-        };
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
       }
+
     };
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(writer, times(1)).init();
@@ -274,24 +286,13 @@ public void testImplicitBatchOfOne() throws Exception {
 
   ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter))
{
     @Override
-    protected ParserConfigurations defaultConfigurations() {
-      return new ParserConfigurations() {
-        @Override
-        public SensorParserConfig getSensorParserConfig(String sensorType) {
-          return new SensorParserConfig() {
-            @Override
-            public Map<String, Object> getParserConfig() {
-              return new HashMap<String, Object>() {{
-              }};
-            }
-          };
-        }
-      };
+    protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+      return ParserBoltTest.createUpdater();
     }
   };
 
   parserBolt.setCuratorFramework(client);
-  parserBolt.setTreeCache(cache);
+  parserBolt.setZKCache(cache);
   parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
   verify(parser, times(1)).init();
   verify(batchWriter, times(1)).init(any(), any(), any());
@@ -334,10 +335,14 @@ public void testImplicitBatchOfOne() throws Exception {
           throw new RuntimeException(e);
         }
       }
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
+      }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());
@@ -371,10 +376,15 @@ public void testImplicitBatchOfOne() throws Exception {
           throw new RuntimeException(e);
         }
       }
+
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
+      }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());
@@ -440,10 +450,15 @@ public void testImplicitBatchOfOne() throws Exception {
           throw new RuntimeException(e);
         }
       }
+
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
+      }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     when(t1.getBinary(0)).thenReturn(new byte[] {});
     parserBolt.execute(t1);
@@ -459,25 +474,13 @@ public void testImplicitBatchOfOne() throws Exception {
 
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter))
{
       @Override
-      protected ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                  put(IndexingConfigurations.BATCH_SIZE_CONF, "1");
-                }};
-              }
-            };
-          }
-        };
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
       }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());
@@ -498,25 +501,13 @@ public void testImplicitBatchOfOne() throws Exception {
 
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter))
{
       @Override
-      protected ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                  put(IndexingConfigurations.BATCH_SIZE_CONF, 5);
-                }};
-              }
-            };
-          }
-        };
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
       }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());
@@ -541,31 +532,20 @@ public void testImplicitBatchOfOne() throws Exception {
 
 
   }
+
   @Test
   public void testBatchOfFiveWithError() throws Exception {
 
     String sensorType = "yaf";
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter))
{
       @Override
-      protected ParserConfigurations defaultConfigurations() {
-        return new ParserConfigurations() {
-          @Override
-          public SensorParserConfig getSensorParserConfig(String sensorType) {
-            return new SensorParserConfig() {
-              @Override
-              public Map<String, Object> getParserConfig() {
-                return new HashMap<String, Object>() {{
-                  put(IndexingConfigurations.BATCH_SIZE_CONF, 5);
-                }};
-              }
-            };
-          }
-        };
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater();
       }
     };
 
     parserBolt.setCuratorFramework(client);
-    parserBolt.setTreeCache(cache);
+    parserBolt.setZKCache(cache);
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any(), any());

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-test-utilities/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/pom.xml b/metron-platform/metron-test-utilities/pom.xml
index 2502760..38f8a38 100644
--- a/metron-platform/metron-test-utilities/pom.xml
+++ b/metron-platform/metron-test-utilities/pom.xml
@@ -28,6 +28,11 @@
   </properties>
   <dependencies>
     <dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>metron-zookeeper</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
@@ -127,11 +132,7 @@
         </exclusion>
       </exclusions>
     </dependency>
-    <dependency>
-      <groupId>org.apache.curator</groupId>
-      <artifactId>curator-recipes</artifactId>
-      <version>${global_curator_version}</version>
-    </dependency>
+
     <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-test</artifactId>

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
index 75f999a..ac64b6a 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.test.bolt;
 
+import org.apache.metron.zookeeper.ZKCache;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -54,7 +55,7 @@ public abstract class BaseBoltTest {
   protected CuratorFramework client;
 
   @Mock
-  protected TreeCache cache;
+  protected ZKCache cache;
 
   @Before
   public void initMocks() {

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-zookeeper/pom.xml b/metron-platform/metron-zookeeper/pom.xml
new file mode 100644
index 0000000..e02cafd
--- /dev/null
+++ b/metron-platform/metron-zookeeper/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+  Licensed to the Apache Software 
+  Foundation (ASF) under one or more contributor license agreements. See the 
+  NOTICE file distributed with this work for additional information regarding 
+  copyright ownership. The ASF licenses this file to You under the Apache License, 
+  Version 2.0 (the "License"); you may not use this file except in compliance 
+  with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

+  Unless required by applicable law or agreed to in writing, software distributed 
+  under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+  OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+  the specific language governing permissions and limitations under the License. 
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.metron</groupId>
+    <artifactId>metron-platform</artifactId>
+    <version>0.4.1</version>
+  </parent>
+  <artifactId>metron-zookeeper</artifactId>
+  <name>metron-zookeeper</name>
+  <url>https://metron.apache.org/</url>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-client</artifactId>
+      <version>${global_curator_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <version>${global_curator_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${global_guava_version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java
b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java
new file mode 100644
index 0000000..1078f1e
--- /dev/null
+++ b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.zookeeper;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+
+/**
+ * This is a simple convenience implementation of a TreeCacheListener.
+ * It allows multiple callbacks to be called with one listener.
+ */
+public class SimpleEventListener implements TreeCacheListener {
+
+  private static final Logger LOG =  LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The callback interface.  This is to be implemented for all callbacks bound to a SimpleEventListener
+   */
+  public interface Callback {
+    /**
+     * Called upon an event.
+     * @param client The zookeeper client
+     * @param path The zookeeper path changed
+     * @param data The data that changed.
+     * @throws IOException
+     */
+    void apply(CuratorFramework client, String path, byte[] data) throws IOException;
+  }
+
+  /**
+   * Builder to create a SimpleEventListener
+   */
+  public static class Builder {
+    private EnumMap<TreeCacheEvent.Type, List<Callback>> callbacks = new EnumMap<>(TreeCacheEvent.Type.class);
+
+    /**
+     * Add a callback bound to one or more TreeCacheEvent.Type.
+     * @param callback The callback to be called when an event of each of types happens
+     * @param types The zookeeper event types to bind to
+     * @return The Builder
+     */
+    public Builder with(Callback callback, TreeCacheEvent.Type... types) {
+      return with(ImmutableList.of(callback), types);
+    }
+
+    /**
+     * Add a callback bound to one or more TreeCacheEvent.Type.
+     * @param callback The iterable of callbacks to be called when an event of each of types
happens
+     * @param types The zookeeper event types to bind to
+     * @return The Builder
+     */
+    public Builder with(Iterable<? extends Callback> callback, TreeCacheEvent.Type...
types) {
+      for(TreeCacheEvent.Type t : types) {
+        List<Callback> cbs = callbacks.get(t);
+        if(cbs == null) {
+          cbs = new ArrayList<>();
+        }
+        Iterables.addAll(cbs, callback);
+        callbacks.put(t, cbs);
+      }
+      return this;
+    }
+
+    /**
+     * Create the listener.
+     * @return The SimpleEventListener
+     */
+    public SimpleEventListener build() {
+      return new SimpleEventListener(callbacks);
+    }
+
+  }
+
+  EnumMap<TreeCacheEvent.Type, List<Callback>> callbacks;
+
+  private SimpleEventListener(EnumMap<TreeCacheEvent.Type, List<Callback>> callbacks)
{
+    this.callbacks = callbacks;
+  }
+
+  @Override
+  public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
{
+    String path = null;
+    byte[] data = null;
+    if(event != null && event.getData() != null) {
+      path = event.getData().getPath();
+      data = event.getData().getData();
+    }
+    LOG.debug("Type: {}, Path: {}, Data: {}", event.getType(), (path == null?"":path) , (data
== null?"":new String(data)));
+    List<Callback> callback = callbacks.get(event.getType());
+    if(callback != null) {
+      for(Callback cb : callback) {
+        cb.apply(client, path, data);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java
b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java
new file mode 100644
index 0000000..58a6329
--- /dev/null
+++ b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.zookeeper;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A zookeeper cache that composes the Curator TreeCache.  This is the basic point of
+ * abstraction to interact with metron configuration in Zookeeper.
+ */
+public class ZKCache implements AutoCloseable{
+  private static final Logger LOG =  LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final int DEFAULT_CLIENT_SLEEP_MS = 1000;
+  public static final int DEFAULT_MAX_RETRIES = 3;
+
+
+  /**
+   * Build a ZKCache instance.
+   */
+  public static class Builder {
+    private Optional<CuratorFramework> client = Optional.empty();
+    private boolean ownClient = false;
+    private List<TreeCacheListener> listener = new ArrayList<>();
+    private String zkRoot;
+
+    public Builder() { }
+
+    /**
+     * Specify your own client.  If you specify this, closing will not close your Client.
+     * If a client is not passed in, then one is created and will be closed when the ZKCache
+     * closes.
+     * @param client The CuratorFramework client.
+     * @return The Builder
+     */
+    public Builder withClient(CuratorFramework client) {
+      this.client = Optional.ofNullable(client);
+      ownClient = false;
+      return this;
+    }
+
+    /**
+     * Specify your own zookeeper URL.  If you pass this in, the ZKCache will own the client
+     * and it will be closed when the ZKCache is closed.
+     *
+     * @param zookeeperUrl The zookeeper quorum
+     * @return The Builder
+     */
+    public Builder withClient(String zookeeperUrl) {
+      this.client = Optional.ofNullable(createClient(zookeeperUrl, Optional.empty()));
+      ownClient = true;
+      return this;
+    }
+
+    /**
+     * Specify your own zookeeper URL.  If you pass this in, the ZKCache will own the client
+     * and it will be closed when the ZKCache is closed.
+     *
+     * @param zookeeperUrl The zookeeper quorum
+     * @param retryPolicy The RetryPolicy to use
+     * @return The Builder
+     */
+    public Builder withClient(String zookeeperUrl, RetryPolicy retryPolicy) {
+      this.client = Optional.ofNullable(createClient(zookeeperUrl, Optional.ofNullable(retryPolicy)));
+      ownClient = true;
+      return this;
+    }
+
+    /**
+     * Specify the treecache listener, which will be called when changes happen to the zookeeper
root.
+     *
+     * @param listener The callback which is called when changes happen in zookeeper.
+     * @return The Builder
+     */
+    public Builder withListener(TreeCacheListener listener) {
+      this.listener.add(listener);
+      return this;
+    }
+
+    /**
+     * Specify the root in zookeeper to monitor.
+     * @param zkRoot The root path in zookeeper
+     * @return The Builder
+     */
+    public Builder withRoot(String zkRoot) {
+      this.zkRoot = zkRoot;
+      return this;
+    }
+
+    /**
+     * Create the ZKCache object based on the config passed in the Builder.
+     * @return The ZKCache
+     */
+    public ZKCache build() {
+      if(!client.isPresent()) {
+        throw new IllegalArgumentException("Zookeeper client must be specified.");
+      }
+      if(listener.isEmpty()) {
+        LOG.warn("Zookeeper listener is null or empty, which is very likely an error.");
+      }
+      if(zkRoot == null) {
+        throw new IllegalArgumentException("Zookeeper root must not be null.");
+      }
+      return new ZKCache(client.get(), listener, zkRoot, ownClient);
+    }
+
+  }
+
+  private CuratorFramework client;
+  private List<TreeCacheListener> listeners;
+  private TreeCache cache;
+  private String zkRoot;
+  private boolean ownClient = false;
+
+  private ZKCache(CuratorFramework client, List<TreeCacheListener> listeners, String
zkRoot, boolean ownClient) {
+    this.client = client;
+    this.listeners = listeners;
+    this.ownClient = ownClient;
+    if(zkRoot == null) {
+      throw new IllegalArgumentException("Zookeeper root must not be null.");
+    }
+    this.zkRoot = zkRoot;
+  }
+
+  /**
+   * Return the client used.
+   * NOTE: DO NOT CLOSE THIS CLIENT OUT OF BAND.
+   * @return The Curator Client
+   */
+  public CuratorFramework getClient() {
+    return client;
+  }
+
+
+  /**
+   * Start the cache.
+   * @throws Exception If unable to be started.
+   */
+  public void start() throws Exception {
+    if(cache == null) {
+      if(ownClient) {
+        client.start();
+      }
+      TreeCache.Builder builder = TreeCache.newBuilder(client, zkRoot);
+      builder.setCacheData(true);
+      cache = builder.build();
+      for(TreeCacheListener l : listeners) {
+        cache.getListenable().addListener(l);
+      }
+      cache.start();
+    }
+  }
+
+  /**
+   * Close the cache, which closes the client if it's owned by the ZKCache.
+   */
+  @Override
+  public void close() {
+    cache.close();
+    if(ownClient) {
+      client.close();
+    }
+  }
+
+  public static CuratorFramework createClient(String zookeeperUrl, Optional<RetryPolicy>
retryPolicy) {
+    return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy.orElse(new ExponentialBackoffRetry(DEFAULT_CLIENT_SLEEP_MS,
DEFAULT_MAX_RETRIES)));
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index 93ced81..3e72d78 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -60,6 +60,7 @@
 		<module>metron-elasticsearch</module>
 		<module>metron-storm-kafka</module>
 		<module>metron-storm-kafka-override</module>
+		<module>metron-zookeeper</module>
 	</modules>
 	<dependencies>
 		<dependency>

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-stellar/stellar-common/src/main/scripts/stellar
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/scripts/stellar b/metron-stellar/stellar-common/src/main/scripts/stellar
index a93d09e..2f1cdbe 100644
--- a/metron-stellar/stellar-common/src/main/scripts/stellar
+++ b/metron-stellar/stellar-common/src/main/scripts/stellar
@@ -33,4 +33,4 @@ export METRON_VERSION=${project.version}
 export METRON_HOME=/usr/metron/$METRON_VERSION
 export STELLAR_LIB=$(find $METRON_HOME/lib/ -name metron-parsers*.jar)
 export MANAGEMENT_LIB=$(find $METRON_HOME/lib/ -name metron-management*.jar)
-java $JVMFLAGS -cp "$HBASE_CONFIGS:${CONTRIB:-$METRON_HOME/contrib}:$STELLAR_LIB:$MANAGEMENT_LIB"
org.apache.metron.stellar.common.shell.StellarShell "$@"
+java $JVMFLAGS -cp "${CONTRIB:-$METRON_HOME/contrib}:$STELLAR_LIB:$MANAGEMENT_LIB:$HBASE_CONFIGS"
org.apache.metron.stellar.common.shell.StellarShell "$@"


Mime
View raw message