metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [1/3] metron git commit: METRON-1004 Travis CI - Job Exceeded Maximum Time Limit (justinleet) closes apache/metron#624
Date Fri, 30 Jun 2017 13:25:21 GMT
Repository: metron
Updated Branches:
  refs/heads/master 095be23dc -> df94ed405


http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
index 2896512..5901d9f 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
@@ -124,6 +124,10 @@ public class ConfigUploadComponent implements InMemoryComponent {
 
   @Override
   public void start() throws UnableToStartException {
+    update();
+  }
+
+  public void update() throws UnableToStartException {
     try {
       final String zookeeperUrl = topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY);
 
@@ -152,6 +156,7 @@ public class ConfigUploadComponent implements InMemoryComponent {
     }
   }
 
+
   public SensorParserConfig getSensorParserConfig(String sensorType) {
     SensorParserConfig sensorParserConfig = new SensorParserConfig();
     CuratorFramework client = getClient(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY));

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
index 27544c0..1849745 100644
--- a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
+++ b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
@@ -20,13 +20,12 @@
 
 package org.apache.metron.hbase.client;
 
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
 import org.apache.storm.tuple.Tuple;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.hbase.Widget;
 import org.apache.metron.hbase.WidgetMapper;
@@ -40,6 +39,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -61,8 +61,9 @@ public class HBaseClientTest {
   private static final String tableName = "widgets";
 
   private static HBaseTestingUtility util;
-  private HBaseClient client;
-  private HTableInterface table;
+  private static HBaseClient client;
+  private static HTableInterface table;
+  private static Admin admin;
   private Tuple tuple1;
   private Tuple tuple2;
   byte[] rowKey1;
@@ -80,17 +81,36 @@ public class HBaseClientTest {
     config.set("hbase.regionserver.hostname", "localhost");
     util = new HBaseTestingUtility(config);
     util.startMiniCluster();
+    admin = util.getHBaseAdmin();
+    // create the table
+    table = util.createTable(Bytes.toBytes(tableName), WidgetMapper.CF);
+    util.waitTableEnabled(table.getName());
+    // setup the client
+    client = new HBaseClient((c,t) -> table, table.getConfiguration(), tableName);
   }
 
   @AfterClass
   public static void stopHBase() throws Exception {
+    util.deleteTable(tableName);
     util.shutdownMiniCluster();
     util.cleanupTestDir();
   }
 
+  @After
+  public void clearTable() throws Exception {
+    List<Delete> deletions = new ArrayList<>();
+    for(Result r : table.getScanner(new Scan())) {
+      deletions.add(new Delete(r.getRow()));
+    }
+    table.delete(deletions);
+  }
+
   @Before
   public void setupTuples() throws Exception {
 
+    // create a mapper
+    mapper = new WidgetMapper();
+
     // setup the first tuple
     widget1 = new Widget("widget1", 100);
     tuple1 = mock(Tuple.class);
@@ -108,25 +128,6 @@ public class HBaseClientTest {
     cols2 = mapper.columns(tuple2);
   }
 
-  @Before
-  public void setup() throws Exception {
-
-    // create a mapper
-    mapper = new WidgetMapper();
-
-    // create the table
-    table = util.createTable(Bytes.toBytes(tableName), WidgetMapper.CF);
-    util.waitTableEnabled(table.getName());
-
-    // setup the client
-    client = new HBaseClient((c,t) -> table, table.getConfiguration(), tableName);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    util.deleteTable(tableName);
-  }
-
   /**
    * Should be able to read/write a single Widget.
    */

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index da46d93..9e20b39 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -196,6 +196,8 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
       // on the field name converter
       assertInputDocsMatchOutputs(inputDocs, docs, getFieldNameConverter());
       assertInputDocsMatchOutputs(inputDocs, readDocsFromDisk(hdfsDir), x -> x);
+    } catch(Throwable e) {
+      e.printStackTrace();
     }
     finally {
       if(runner != null) {

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java
index ce7cab8..4641e48 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/ComponentRunner.java
@@ -26,6 +26,7 @@ public class ComponentRunner {
         LinkedHashMap<String, InMemoryComponent> components;
         String[] startupOrder;
         String[] shutdownOrder;
+        String[] resetOrder;
         long timeBetweenAttempts = 1000;
         int numRetries = 5;
         long maxTimeMS = 120000;
@@ -56,6 +57,10 @@ public class ComponentRunner {
             this.shutdownOrder = shutdownOrder;
             return this;
         }
+        public Builder withCustomResetOrder(String[] resetOrder) {
+            this.resetOrder = resetOrder;
+            return this;
+        }
         public Builder withMillisecondsBetweenAttempts(long timeBetweenAttempts) {
             this.timeBetweenAttempts = timeBetweenAttempts;
             return this;
@@ -75,7 +80,15 @@ public class ComponentRunner {
             if(startupOrder == null) {
                 startupOrder = toOrderedList(components);
             }
-            return new ComponentRunner(components, startupOrder, shutdownOrder, timeBetweenAttempts, numRetries, maxTimeMS);
+            if(resetOrder == null) {
+                // Reset in the order of shutdown, if no reset is defined. Otherwise, just order them.
+                if (shutdownOrder != null) {
+                    resetOrder = shutdownOrder;
+                } else {
+                    resetOrder = toOrderedList(components);
+                }
+            }
+            return new ComponentRunner(components, startupOrder, shutdownOrder, resetOrder, timeBetweenAttempts, numRetries, maxTimeMS);
         }
 
     }
@@ -83,12 +96,14 @@ public class ComponentRunner {
     LinkedHashMap<String, InMemoryComponent> components;
     String[] startupOrder;
     String[] shutdownOrder;
+    String[] resetOrder;
     long timeBetweenAttempts;
     int numRetries;
     long maxTimeMS;
     public ComponentRunner( LinkedHashMap<String, InMemoryComponent> components
                           , String[] startupOrder
                           , String[] shutdownOrder
+                          , String[] resetOrder
                           , long timeBetweenAttempts
                           , int numRetries
                           , long maxTimeMS
@@ -97,6 +112,7 @@ public class ComponentRunner {
         this.components = components;
         this.startupOrder = startupOrder;
         this.shutdownOrder = shutdownOrder;
+        this.resetOrder = resetOrder;
         this.timeBetweenAttempts = timeBetweenAttempts;
         this.numRetries = numRetries;
         this.maxTimeMS = maxTimeMS;
@@ -120,6 +136,11 @@ public class ComponentRunner {
             components.get(componentName).stop();
         }
     }
+    public void reset() {
+        for(String componentName : resetOrder) {
+            components.get(componentName).reset();
+        }
+    }
 
     public <T> ProcessorResult<T> process(Processor<T> successState) {
         int retryCount = 0;

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java
index 8a9ee96..90a8615 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/InMemoryComponent.java
@@ -18,6 +18,7 @@
 package org.apache.metron.integration;
 
 public interface InMemoryComponent {
-    public void start() throws UnableToStartException;
-    public void stop();
+    void start() throws UnableToStartException;
+    void stop();
+    default void reset() {}
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
index d34ff08..779db37 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,12 +20,11 @@ package org.apache.metron.integration.components;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.KillOptions;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.generated.TopologyInfo;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.storm.flux.FluxBuilder;
@@ -47,7 +46,6 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Comparator;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 
 public class FluxTopologyComponent implements InMemoryComponent {
 
@@ -156,12 +154,15 @@ public class FluxTopologyComponent implements InMemoryComponent {
     if (stormCluster != null) {
       try {
           try {
+            // Kill the topology directly instead of sitting through the wait period
+            killTopology();
             stormCluster.shutdown();
           } catch (IllegalStateException ise) {
             if (!(ise.getMessage().contains("It took over") && ise.getMessage().contains("to shut down slot"))) {
               throw ise;
             }
             else {
+              LOG.error("Attempting to assassinate slots");
               assassinateSlots();
               LOG.error("Storm slots didn't shut down entirely cleanly *sigh*.  " +
                       "I gave them the old one-two-skadoo and killed the slots with prejudice.  " +
@@ -178,17 +179,39 @@ public class FluxTopologyComponent implements InMemoryComponent {
     }
   }
 
+  @Override
+  public void reset() {
+    if (stormCluster != null) {
+      killTopology();
+    }
+  }
+
+  protected void killTopology() {
+    KillOptions ko = new KillOptions();
+    ko.set_wait_secs(0);
+    stormCluster.killTopologyWithOpts(topologyName, ko);
+    try {
+      // Actually wait for it to die.
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      // Do nothing
+    }
+  }
+
   public static void assassinateSlots() {
     /*
     You might be wondering why I'm not just casting to slot here, but that's because the Slot class moved locations
     and we're supporting multiple versions of storm.
      */
+    LOG.error("During slot assassination, all candidate threads: " + Thread.getAllStackTraces().keySet());
     Thread.getAllStackTraces().keySet().stream().filter(t -> t instanceof AutoCloseable && t.getName().toLowerCase().contains("slot")).forEach(t -> {
-      AutoCloseable slot = (AutoCloseable) t;
+      LOG.error("Attempting to close thread: " + t + " with state: " + t.getState());
+      // With extreme prejudice.  Safety doesn't matter
       try {
-        slot.close();
-      } catch (Exception e) {
-        LOG.error("Tried to kill " + t.getName() + " but.." + e.getMessage(), e);
+        t.stop();
+        LOG.error("Called thread.stop() on " + t.getName() + ". State is: " + t.getState());
+      } catch(Exception e) {
+        // Just swallow anything arising from the threads being killed.
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
index e55b317..6ec1314 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
@@ -179,15 +179,48 @@ public class KafkaComponent implements InMemoryComponent {
   public void stop() {
     shutdownConsumer();
     shutdownProducers();
+
     if(kafkaServer != null) {
-      kafkaServer.shutdown();
-      kafkaServer.awaitShutdown();
+      try {
+        kafkaServer.shutdown();
+        kafkaServer.awaitShutdown();
+      }
+      catch(Throwable fnf) {
+        if(!fnf.getMessage().contains("Error writing to highwatermark file")) {
+          throw fnf;
+        }
+      }
     }
     if(zkClient != null) {
+      // Delete data in ZK to avoid startup interference.
+      for(Topic topic : topics) {
+        zkClient.deleteRecursive(ZkUtils.getTopicPath(topic.name));
+      }
+
+      zkClient.deleteRecursive(ZkUtils.BrokerIdsPath());
+      zkClient.deleteRecursive(ZkUtils.BrokerTopicsPath());
+      zkClient.deleteRecursive(ZkUtils.ConsumersPath());
+      zkClient.deleteRecursive(ZkUtils.ControllerPath());
+      zkClient.deleteRecursive(ZkUtils.ControllerEpochPath());
+      zkClient.deleteRecursive(ZkUtils.ReassignPartitionsPath());
+      zkClient.deleteRecursive(ZkUtils.DeleteTopicsPath());
+      zkClient.deleteRecursive(ZkUtils.PreferredReplicaLeaderElectionPath());
+      zkClient.deleteRecursive(ZkUtils.BrokerSequenceIdPath());
+      zkClient.deleteRecursive(ZkUtils.IsrChangeNotificationPath());
+      zkClient.deleteRecursive(ZkUtils.EntityConfigPath());
+      zkClient.deleteRecursive(ZkUtils.EntityConfigChangesPath());
       zkClient.close();
     }
   }
 
+  @Override
+  public void reset() {
+    // Unfortunately, there's no clean way to (quickly) purge or delete a topic.
+    // At least without killing and restarting broker anyway.
+    stop();
+    start();
+  }
+
   public List<byte[]> readMessages(String topic) {
     SimpleConsumer consumer = new SimpleConsumer("localhost", 6667, 100000, 64 * 1024, "consumer");
     FetchRequest req = new FetchRequestBuilder()

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
index 57d814b..cc85d5f 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
@@ -18,7 +18,8 @@
 
 package org.apache.metron.integration.components;
 
-import com.google.common.base.Function;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.curator.test.TestingServer;
@@ -60,6 +61,19 @@ public class ZKServerComponent implements InMemoryComponent {
       if (testZkServer != null) {
         testZkServer.close();
       }
-    }catch(Exception e){}
+    }catch(Exception e){
+      // Do nothing
+    }
+  }
+
+  @Override
+  public void reset() {
+    if (testZkServer != null) {
+      try {
+        FileUtils.deleteDirectory(testZkServer.getTempDirectory());
+      } catch (IOException e) {
+        // Do nothing
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-management/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/pom.xml b/metron-platform/metron-management/pom.xml
index 638d65f..4184668 100644
--- a/metron-platform/metron-management/pom.xml
+++ b/metron-platform/metron-management/pom.xml
@@ -62,6 +62,10 @@
             <scope>provided</scope>
             <exclusions>
                 <exclusion>
+                    <artifactId>commons-lang3</artifactId>
+                    <groupId>org.apache.commons</groupId>
+                </exclusion>
+                <exclusion>
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
index ee6a362..972eed7 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
@@ -32,6 +32,7 @@ import org.json.simple.parser.JSONParser;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -42,14 +43,14 @@ import static org.apache.metron.management.utils.FileUtils.slurp;
 import static org.apache.metron.common.utils.StellarProcessorUtils.run;
 
 public class ConfigurationFunctionsTest {
-  private TestingServer testZkServer;
-  private CuratorFramework client;
-  private String zookeeperUrl;
+  private static TestingServer testZkServer;
+  private static CuratorFramework client;
+  private static String zookeeperUrl;
   private Context context = new Context.Builder()
             .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
             .build();
-  @Before
-  public void setup() throws Exception {
+  @BeforeClass
+  public static void setup() throws Exception {
     testZkServer = new TestingServer(true);
     zookeeperUrl = testZkServer.getConnectString();
     client = ConfigurationsUtils.getClient(zookeeperUrl);
@@ -61,7 +62,7 @@ public class ConfigurationFunctionsTest {
 
   }
 
-  private void pushConfigs(String inputPath) throws Exception {
+  private static void pushConfigs(String inputPath) throws Exception {
     String[] args = new String[]{
             "-z", zookeeperUrl
             , "--mode", "PUSH"

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-management/src/test/java/org/apache/metron/management/FileSystemFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/FileSystemFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/FileSystemFunctionsTest.java
index 88eabe0..e0bad79 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/FileSystemFunctionsTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/FileSystemFunctionsTest.java
@@ -21,10 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.metron.common.dsl.Context;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -37,8 +34,11 @@ import java.util.*;
 public class FileSystemFunctionsTest {
   private FileSystemFunctions.FS_TYPE type;
   private FileSystemFunctions.FileSystemGetter fsGetter = null;
-  private File baseDir;
-  private MiniDFSCluster hdfsCluster;
+  private static File hdfsBaseDir;
+  private static File localBaseDir;
+  private static MiniDFSCluster hdfsCluster;
+  private static String hdfsPrefix;
+  private static String localPrefix;
   private String prefix;
   private Context context = null;
   private FileSystemFunctions.FileSystemGet get;
@@ -59,25 +59,34 @@ public class FileSystemFunctionsTest {
     });
   }
 
-  @Before
-  public void setup() throws IOException {
-    if(type == FileSystemFunctions.FS_TYPE.HDFS) {
-      baseDir = Files.createTempDirectory("test_hdfs").toFile().getAbsoluteFile();
+  @BeforeClass
+  public static void setupFS() throws IOException {
+    {
+      hdfsBaseDir = Files.createTempDirectory("test_hdfs").toFile().getAbsoluteFile();
       Configuration conf = new Configuration();
-      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBaseDir.getAbsolutePath());
       MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
       hdfsCluster = builder.build();
+      hdfsPrefix = "/";
+    }
+    {
+      localPrefix = "target/fsTest/";
+      if (new File(localPrefix).exists()) {
+        new File(localPrefix).delete();
+      }
+      new File(localPrefix).mkdirs();
+    }
+  }
 
+  @Before
+  public void setup() throws IOException {
+    if(type == FileSystemFunctions.FS_TYPE.HDFS) {
+      prefix=hdfsPrefix;
       fsGetter = () -> hdfsCluster.getFileSystem();
-      prefix = "/";
     }
     else {
+      prefix=localPrefix;
       fsGetter = FileSystemFunctions.FS_TYPE.LOCAL;
-      prefix = "target/fsTest/";
-      if(new File(prefix).exists()) {
-        new File(prefix).delete();
-      }
-      new File(prefix).mkdirs();
     }
 
     get = new FileSystemFunctions.FileSystemGet(fsGetter);
@@ -92,14 +101,14 @@ public class FileSystemFunctionsTest {
     rm.initialize(null);
   }
 
-  @After
-  public void teardown() {
-    if(type == FileSystemFunctions.FS_TYPE.HDFS) {
+  @AfterClass
+  public static void teardown() {
+    {
       hdfsCluster.shutdown();
-      FileUtil.fullyDelete(baseDir);
+      FileUtil.fullyDelete(hdfsBaseDir);
     }
-    else {
-      new File(prefix).delete();
+    {
+      new File(localPrefix).delete();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
index 28d9489..97cfb65 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
@@ -26,6 +26,7 @@ import org.apache.metron.integration.BaseIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -93,10 +94,15 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest {
   }
 
   @AfterClass
-  public static void tearDown() throws Exception {
+  public static void tearDownAfterClass() throws Exception {
     runner.stop();
   }
 
+  @After
+  public void tearDown() {
+    runner.reset();
+  }
+
   /**
    * Write one message, read one message.
    */

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-parsers/parser-testing.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/parser-testing.md b/metron-platform/metron-parsers/parser-testing.md
new file mode 100644
index 0000000..e30a7b7
--- /dev/null
+++ b/metron-platform/metron-parsers/parser-testing.md
@@ -0,0 +1,70 @@
+# Parser Contribution and Testing
+
+So you want to contribute a parser to Apache Metron.  First off, on behalf
+of the community, thank you very much!  Now that you have implemented a parser
+by writing a java class which implements `org.apache.metron.parsers.interfaces.MessageParser`
+what are the testing expectations for a new parser?
+
+It is expected that a new parser have two tests:
+* A JUnit test directly testing your parser class.
+* An Integration test validating that your parser class can parse messages 
+inside the `ParserBolt`.
+
+## The JUnit Test
+
+The JUnit Test should be focused on testing your Parser directly.  You
+should feel free to use mocks or stubs or whatever else you need to completely
+test that unit of functionality.
+
+## The Integration Test
+
+Integration tests are more structured.  The intent is that the parser that
+you have implemented can be driven successfully from `org.apache.metron.parsers.bolt.ParserBolt`.
+
+The procedure for creating a new test is as follows:
+* Create an integration test that extends `org.apache.metron.parsers.integration.ParserIntegrationTest`
+  * Override `getSensorType()` to return the sensor type to be used in the test (referred to as `${sensor_type}` at times)
+  * Override `getValidations()` to indicate how you want the output of the parser to be validated (more on validations later)
+  * Optionally `readSensorConfig(String sensorType)` to read the sensor config
+    * By default, we will pull this from `metron-parsers/src/main/config/zookeeper/parsers/${sensor_type}`.  Override if you want to provide your own
+  * Optionally `readGlobalConfig()` to return the global config
+    * By default, we will pull this from `metron-integration-test/src/main/config/zookeeper/global.json)`.  Override if you want to provide your own
+* Place sample input data in `metron-integration-test/src/main/sample/data/${sensor_type}/raw`
+  * It should be one line per input record.
+* Place expected output based on sample data in `metron-integration-test/src/main/sample/data/${sensor_type}/parsed`
+  * Line `k` in the expected data should match with line `k`
+
+The way these tests function is by creating a `ParserBolt` instance with your specified global configuration and
+sensor configuration.  It will then send your specified sample input data in line-by-line.  It will then
+perform some basic sanity validation:
+* Ensure no errors were logged
+* Execute your specified validation methods
+
+### Validations
+
+Validations are functions which indicate how one should validate the parsed messages.  The basic one which is sufficient
+for most cases is `org.apache.metron.parsers.integration.validation.SampleDataValidation`.  This will read the expected results
+from `metron-integration-test/src/main/sample/data/${sensor_type}/parsed` and validate that the actual parsed message
+conforms (excluding timestamp).
+
+If you have special validations required, you may implement your own and return an instance of that in the `getValidations()`
+method of your Integration Test.
+
+### Sample Integration Test
+
+A sample integration test for the `snort` parser is as follows:
+```
+public class SnortIntegrationTest extends ParserIntegrationTest {
+  @Override
+  String getSensorType() {
+    return "snort";
+  }
+
+  @Override
+  List<ParserValidation> getValidations() {
+    return new ArrayList<ParserValidation>() {{
+      add(new SampleDataValidation());
+    }};
+  }
+}
+```

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-parsers/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/pom.xml b/metron-platform/metron-parsers/pom.xml
index cce975a..b0d77cd 100644
--- a/metron-platform/metron-parsers/pom.xml
+++ b/metron-platform/metron-parsers/pom.xml
@@ -166,6 +166,10 @@
                     <artifactId>slf4j-log4j12</artifactId>
                     <groupId>org.slf4j</groupId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>commons-lang3</artifactId>
+                    <groupId>org.apache.commons</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 2c43c23..56506a7 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -79,6 +79,10 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     return this;
   }
 
+  public MessageParser<JSONObject> getParser() {
+    return parser;
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
@@ -170,16 +174,20 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
         collector.ack(tuple);
       }
     } catch (Throwable ex) {
-      MetronError error = new MetronError()
-              .withErrorType(Constants.ErrorType.PARSER_ERROR)
-              .withThrowable(ex)
-              .withSensorType(getSensorType())
-              .addRawMessage(originalMessage);
-      ErrorUtils.handleError(collector, error);
-      collector.ack(tuple);
+      handleError(originalMessage, tuple, ex, collector);
     }
   }
 
+  protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withThrowable(ex)
+            .withSensorType(getSensorType())
+            .addRawMessage(originalMessage);
+    ErrorUtils.handleError(collector, error);
+    collector.ack(tuple);
+  }
+
   private List<FieldValidator> getFailedValidators(JSONObject input, List<FieldValidator> validators) {
     List<FieldValidator> failedValidators = new ArrayList<>();
     for(FieldValidator validator : validators) {

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
new file mode 100644
index 0000000..b844104
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.integration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.parsers.bolt.ParserBolt;
+import org.apache.metron.parsers.bolt.WriterHandler;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.json.simple.JSONObject;
+import org.mockito.Matchers;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ParserDriver {
+  private static final Logger LOG = LoggerFactory.getLogger(ParserBolt.class);
+  public static class CollectingWriter implements MessageWriter<JSONObject>{
+    List<byte[]> output;
+    public CollectingWriter(List<byte[]> output) {
+      this.output = output;
+    }
+
+    @Override
+    public void init() {
+
+    }
+
+    @Override
+    public void write(String sensorType, WriterConfiguration configurations, Tuple tuple, JSONObject message) throws Exception {
+      output.add(message.toJSONString().getBytes());
+    }
+
+    @Override
+    public String getName() {
+      return "collecting";
+    }
+
+    @Override
+    public void close() throws Exception {
+    }
+
+    public List<byte[]> getOutput() {
+      return output;
+    }
+  }
+
+  private class ShimParserBolt extends ParserBolt {
+    List<byte[]> output;
+    List<byte[]> errors = new ArrayList<>();
+
+    public ShimParserBolt(List<byte[]> output) {
+      super(null
+           , sensorType == null?config.getSensorTopic():sensorType
+           , ReflectionUtils.createInstance(config.getParserClassName())
+           , new WriterHandler( new CollectingWriter(output))
+      );
+      this.output = output;
+      getParser().configure(config.getParserConfig());
+    }
+
+    @Override
+    public ParserConfigurations getConfigurations() {
+      return new ParserConfigurations() {
+        @Override
+        public SensorParserConfig getSensorParserConfig(String sensorType) {
+          return config;
+        }
+
+        @Override
+        public Map<String, Object> getGlobalConfig() {
+          return globalConfig;
+        }
+
+        @Override
+        public List<FieldValidator> getFieldValidations() {
+          return new ArrayList<>();
+        }
+      };
+    }
+
+    @Override
+    protected void prepCache() {
+    }
+
+    @Override
+    protected void handleError(byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
+      errors.add(originalMessage);
+      LOG.error("Error parsing message: " + ex.getMessage(), ex);
+    }
+
+    public ProcessorResult<List<byte[]>> getResults() {
+      return new ProcessorResult.Builder<List<byte[]>>().withProcessErrors(errors)
+                                                        .withResult(output)
+                                                        .build();
+
+    }
+  }
+
+
+  private SensorParserConfig config;
+  private Map<String, Object> globalConfig;
+  private String sensorType;
+
+  public ParserDriver(String sensorType, String parserConfig, String globalConfig) throws IOException {
+    config = SensorParserConfig.fromBytes(parserConfig.getBytes());
+    this.sensorType = sensorType;
+    this.globalConfig = JSONUtils.INSTANCE.load(globalConfig, new TypeReference<Map<String, Object>>() {
+    });
+  }
+
+  public ProcessorResult<List<byte[]>> run(List<byte[]> in) {
+    ShimParserBolt bolt = new ShimParserBolt(new ArrayList<>());
+    OutputCollector collector = mock(OutputCollector.class);
+    bolt.prepare(null, null, collector);
+    for(byte[] record : in) {
+      bolt.execute(toTuple(record));
+    }
+    return bolt.getResults();
+  }
+
+  public Tuple toTuple(byte[] record) {
+    Tuple ret = mock(Tuple.class);
+    when(ret.getBinary(eq(0))).thenReturn(record);
+    return ret;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
index b20445e..cd3d005 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
@@ -30,77 +30,59 @@ import org.apache.metron.integration.utils.TestUtils;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
 import org.apache.metron.test.TestDataType;
 import org.apache.metron.test.utils.SampleDataUtils;
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.util.*;
 
 public abstract class ParserIntegrationTest extends BaseIntegrationTest {
-  protected static final String ERROR_TOPIC = "parser_error";
   protected List<byte[]> inputMessages;
-  @Test
-  public void test() throws Exception {
-    final String sensorType = getSensorType();
-    inputMessages = TestUtils.readSampleData(SampleDataUtils.getSampleDataPath(sensorType, TestDataType.RAW));
 
-    final Properties topologyProperties = new Properties();
-    final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
-      add(new KafkaComponent.Topic(sensorType, 1));
-      add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
-      add(new KafkaComponent.Topic(ERROR_TOPIC,1));
-    }});
-    topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
-
-    ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
+  protected String readGlobalConfig() throws IOException {
+    File configsRoot = new File(TestConstants.SAMPLE_CONFIG_PATH);
+    return new String(Files.readAllBytes(new File(configsRoot, "global.json").toPath()));
+  }
 
-    ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
-            .withTopologyProperties(topologyProperties)
-            .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
-            .withParserConfigsPath(TestConstants.PARSER_CONFIGS_PATH);
+  protected String readSensorConfig(String sensorType) throws IOException {
+    File configsRoot = new File(TestConstants.PARSER_CONFIGS_PATH);
+    File parsersRoot = new File(configsRoot, "parsers");
+    return new String(Files.readAllBytes(new File(parsersRoot, sensorType + ".json").toPath()));
+  }
 
-    ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
-            .withSensorType(sensorType)
-            .withTopologyProperties(topologyProperties)
-            .withOutputTopic(Constants.ENRICHMENT_TOPIC)
-            .withBrokerUrl(kafkaComponent.getBrokerList()).build();
+  @Test
+  public void test() throws Exception {
+    String sensorType = getSensorType();
+    ParserDriver driver = new ParserDriver(sensorType, readSensorConfig(sensorType), readGlobalConfig());
+    inputMessages = TestUtils.readSampleData(SampleDataUtils.getSampleDataPath(sensorType, TestDataType.RAW));
 
-    //UnitTestHelper.verboseLogging();
-    ComponentRunner runner = new ComponentRunner.Builder()
-            .withComponent("zk", zkServerComponent)
-            .withComponent("kafka", kafkaComponent)
-            .withComponent("config", configUploadComponent)
-            .withComponent("org/apache/storm", parserTopologyComponent)
-            .withMillisecondsBetweenAttempts(5000)
-            .withNumRetries(10)
-            .withCustomShutdownOrder(new String[] {"org/apache/storm","config","kafka","zk"})
-            .build();
-    try {
-      runner.start();
-      kafkaComponent.writeMessages(sensorType, inputMessages);
-      ProcessorResult<List<byte[]>> result = runner.process(getProcessor());
-      List<byte[]> outputMessages = result.getResult();
-      StringBuffer buffer = new StringBuffer();
-      if (result.failed()){
-        result.getBadResults(buffer);
-        buffer.append(String.format("%d Valid Messages Processed", outputMessages.size())).append("\n");
+    ProcessorResult<List<byte[]>> result = driver.run(inputMessages);
+    List<byte[]> outputMessages = result.getResult();
+    StringBuffer buffer = new StringBuffer();
+    if (result.failed()){
+      result.getBadResults(buffer);
+      buffer.append(String.format("%d Valid Messages Processed", outputMessages.size())).append("\n");
+      dumpParsedMessages(outputMessages,buffer);
+      Assert.fail(buffer.toString());
+    } else {
+      List<ParserValidation> validations = getValidations();
+      if (validations == null || validations.isEmpty()) {
+        buffer.append("No validations configured for sensorType " + sensorType + ".  Dumping parsed messages").append("\n");
         dumpParsedMessages(outputMessages,buffer);
         Assert.fail(buffer.toString());
       } else {
-        List<ParserValidation> validations = getValidations();
-        if (validations == null || validations.isEmpty()) {
-          buffer.append("No validations configured for sensorType " + sensorType + ".  Dumping parsed messages").append("\n");
-          dumpParsedMessages(outputMessages,buffer);
-          Assert.fail(buffer.toString());
-        } else {
-          for (ParserValidation validation : validations) {
-            System.out.println("Running " + validation.getName() + " on sensorType " + sensorType);
-            validation.validate(sensorType, outputMessages);
-          }
+        for (ParserValidation validation : validations) {
+          System.out.println("Running " + validation.getName() + " on sensorType " + sensorType);
+          validation.validate(sensorType, outputMessages);
         }
       }
-    } finally {
-      runner.stop();
     }
   }
 
@@ -110,28 +92,6 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest {
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private KafkaProcessor<List<byte[]>> getProcessor(){
-
-    return new KafkaProcessor<>()
-            .withKafkaComponentName("kafka")
-            .withReadTopic(Constants.ENRICHMENT_TOPIC)
-            .withErrorTopic(ERROR_TOPIC)
-            .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
-              @Nullable
-              @Override
-              public Boolean apply(@Nullable KafkaMessageSet messageSet) {
-                return (messageSet.getMessages().size() + messageSet.getErrors().size() == inputMessages.size());
-              }
-            })
-            .withProvideResult(new Function<KafkaMessageSet,List<byte[]>>(){
-              @Nullable
-              @Override
-              public List<byte[]> apply(@Nullable KafkaMessageSet messageSet) {
-                  return messageSet.getMessages();
-              }
-            });
-  }
   abstract String getSensorType();
   abstract List<ParserValidation> getValidations();
 

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 6ad7427..b556411 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -17,9 +17,9 @@
  */
 package org.apache.metron.parsers.integration.components;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.KillOptions;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
@@ -27,12 +27,6 @@ import org.apache.metron.parsers.topology.ParserTopologyBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitOption;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.*;
 
 import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
@@ -82,6 +76,9 @@ public class ParserTopologyComponent implements InMemoryComponent {
     this.outputTopic = outputTopic;
   }
 
+  public void updateSensorType(String sensorType) {
+    this.sensorType = sensorType;
+  }
 
   @Override
   public void start() throws UnableToStartException {
@@ -113,6 +110,8 @@ public class ParserTopologyComponent implements InMemoryComponent {
     if (stormCluster != null) {
       try {
         try {
+          // Kill the topology directly instead of sitting through the wait period
+          killTopology();
           stormCluster.shutdown();
         } catch (IllegalStateException ise) {
           if (!(ise.getMessage().contains("It took over") && ise.getMessage().contains("to shut down slot"))) {
@@ -135,4 +134,23 @@ public class ParserTopologyComponent implements InMemoryComponent {
 
     }
   }
+
+  @Override
+  public void reset() {
+    if (stormCluster != null) {
+      killTopology();
+    }
+  }
+
+  protected void killTopology() {
+    KillOptions ko = new KillOptions();
+    ko.set_wait_secs(0);
+    stormCluster.killTopologyWithOpts(sensorType, ko);
+    try {
+      // Actually wait for it to die.
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      // Do nothing
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 7d1dba8..e988c30 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -64,6 +64,7 @@ import org.apache.metron.spout.pcap.deserializer.Deserializers;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class PcapTopologyIntegrationTest {
@@ -89,6 +90,8 @@ public class PcapTopologyIntegrationTest {
     }).length;
   }
 
+  // This will eventually be completely deprecated.  As it takes a significant amount of testing, the test is being disabled.
+  @Ignore
   @Test
   public void testTimestampInPacket() throws Exception {
     testTopology(new Function<Properties, Void>() {
@@ -106,6 +109,7 @@ public class PcapTopologyIntegrationTest {
     , true
                );
   }
+
   @Test
   public void testTimestampInKey() throws Exception {
     testTopology(new Function<Properties, Void>() {

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java
index 02fbc4d..58976a3 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java
@@ -109,6 +109,15 @@ public class SolrComponent implements InMemoryComponent {
     }
   }
 
+  @Override
+  public void reset() {
+    try {
+      miniSolrCloudCluster.deleteCollection("metron");
+    } catch (SolrServerException | IOException e) {
+      // Do nothing
+    }
+  }
+
   public MetronSolrClient getSolrClient() {
     return new MetronSolrClient(getZookeeperUrl());
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
index 030348f..514a21d 100644
--- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
+++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
@@ -18,6 +18,9 @@
 
 package org.apache.metron.storm.kafka.flux;
 
+import java.util.Arrays;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.log4j.Logger;
 import org.apache.storm.kafka.spout.KafkaSpout;
@@ -33,6 +36,9 @@ public class StormKafkaSpout<K, V> extends KafkaSpout<K, V> {
   private static final Logger LOG = Logger.getLogger(StormKafkaSpout.class);
   protected KafkaSpoutConfig<K,V> _spoutConfig;
   protected String _topic;
+
+  protected AtomicBoolean isShutdown = new AtomicBoolean(false);
+
   public StormKafkaSpout(SimpleStormKafkaBuilder<K,V> builder) {
     super(builder.build());
     this._topic = builder.getTopic();
@@ -48,12 +54,18 @@ public class StormKafkaSpout<K, V> extends KafkaSpout<K, V> {
       //see https://issues.apache.org/jira/browse/STORM-2184
       LOG.warn("You can generally ignore these, as per https://issues.apache.org/jira/browse/STORM-2184 -- " + we.getMessage(), we);
     }
+    finally {
+      isShutdown.set(true);
+    }
   }
 
   @Override
   public void close() {
     try {
-      super.close();
+      if(!isShutdown.get()) {
+        super.close();
+        isShutdown.set(true);
+      }
     }
     catch(WakeupException we) {
       //see https://issues.apache.org/jira/browse/STORM-2184

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
index 631b24b..0403d1b 100644
--- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
+++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/mock/MockHTable.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -479,6 +478,13 @@ public class MockHTable implements HTableInterface {
     }
   }
 
+  public void clear() {
+    synchronized (putLog) {
+      putLog.clear();
+    }
+    data.clear();
+  }
+
   @Override
   public void put(Put put) throws IOException {
     addToPutLog(put);

http://git-wip-us.apache.org/repos/asf/metron/blob/df94ed40/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 16b2499..af97e83 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,6 +108,7 @@
         <global_jar_version>3.0.2</global_jar_version>
         <global_surefire_version>2.18</global_surefire_version>
         <global_maven_version>[3.3.1,)</global_maven_version>
+        <argLine></argLine>
     </properties>
 
     <profiles>


Mime
View raw message