metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject incubator-metron git commit: METRON-672: SolrIndexingIntegrationTest fails intermittently closes apache/incubator-metron#424
Date Thu, 26 Jan 2017 13:49:13 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 5a5d42b85 -> 0219e56a9


METRON-672: SolrIndexingIntegrationTest fails intermittently closes apache/incubator-metron#424


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/0219e56a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/0219e56a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/0219e56a

Branch: refs/heads/master
Commit: 0219e56a9eb67adf10ece1c2201041f91008a43a
Parents: 5a5d42b
Author: cstella <cestella@gmail.com>
Authored: Thu Jan 26 08:49:02 2017 -0500
Committer: cstella <cestella@gmail.com>
Committed: Thu Jan 26 08:49:02 2017 -0500

----------------------------------------------------------------------
 .../components/ConfigUploadComponent.java       | 47 +++++++++++++
 .../integration/IndexingIntegrationTest.java    | 41 +++++++++++-
 .../metron/integration/BaseIntegrationTest.java | 10 +--
 .../components/ZKServerComponent.java           | 69 ++++++++++----------
 .../PcapTopologyIntegrationTest.java            | 11 +---
 5 files changed, 126 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0219e56a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
index a819105..2896512 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
@@ -27,6 +27,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.apache.metron.common.configuration.ConfigurationsUtils.*;
 
@@ -38,6 +40,7 @@ public class ConfigUploadComponent implements InMemoryComponent {
   private String enrichmentConfigsPath;
   private String indexingConfigsPath;
   private String profilerConfigPath;
+  private Optional<Consumer<ConfigUploadComponent>> postStartCallback = Optional.empty();
   private Optional<String> globalConfig = Optional.empty();
   private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>();
   public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
@@ -78,6 +81,47 @@ public class ConfigUploadComponent implements InMemoryComponent {
     return this;
   }
 
+  public ConfigUploadComponent withPostStartCallback(Consumer<ConfigUploadComponent>
f) {
+        this.postStartCallback = Optional.ofNullable(f);
+        return this;
+  }
+
+  public Properties getTopologyProperties() {
+    return topologyProperties;
+  }
+
+  public String getGlobalConfigPath() {
+    return globalConfigPath;
+  }
+
+  public String getParserConfigsPath() {
+    return parserConfigsPath;
+  }
+
+  public String getEnrichmentConfigsPath() {
+    return enrichmentConfigsPath;
+  }
+
+  public String getIndexingConfigsPath() {
+    return indexingConfigsPath;
+  }
+
+  public String getProfilerConfigPath() {
+    return profilerConfigPath;
+  }
+
+  public Optional<Consumer<ConfigUploadComponent>> getPostStartCallback() {
+    return postStartCallback;
+  }
+
+  public Optional<String> getGlobalConfig() {
+    return globalConfig;
+  }
+
+  public Map<String, SensorParserConfig> getParserSensorConfigs() {
+    return parserSensorConfigs;
+  }
+
   @Override
   public void start() throws UnableToStartException {
     try {
@@ -99,6 +143,9 @@ public class ConfigUploadComponent implements InMemoryComponent {
       if(globalConfig.isPresent()) {
         writeGlobalConfigToZookeeper(globalConfig.get().getBytes(), zookeeperUrl);
       }
+      if(postStartCallback.isPresent()) {
+        postStartCallback.get().accept(this);
+      }
 
     } catch (Exception e) {
       throw new UnableToStartException(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0219e56a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index f54d791..03ae9ff 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -21,8 +21,10 @@ package org.apache.metron.indexing.integration;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.interfaces.FieldNameConverter;
 import org.apache.metron.common.spout.kafka.SpoutConfig;
 import org.apache.metron.common.utils.JSONUtils;
@@ -37,13 +39,18 @@ import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.integration.utils.TestUtils;
 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient;
 
 public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
   protected String hdfsDir = "target/indexingIntegrationTest/hdfs";
@@ -139,12 +146,22 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest
{
       inputDocs.add(m);
 
     }
+    final AtomicBoolean isLoaded = new AtomicBoolean(false);
     ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
             .withTopologyProperties(topologyProperties)
             .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
             .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
             .withIndexingConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
-            ;
+            .withPostStartCallback(component -> {
+              try {
+                waitForIndex(component.getTopologyProperties().getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY));
+              } catch (Exception e) {
+                e.printStackTrace();
+              }
+              isLoaded.set(true);
+              }
+            );
+
     FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
             .withTopologyLocation(new File(fluxPath))
             .withTopologyName("test")
@@ -166,10 +183,11 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest
{
     runner.start();
 
     try {
+      while(!isLoaded.get()) {
+        Thread.sleep(100);
+      }
       fluxComponent.submitTopology();
-
       kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, inputMessages);
-      StringBuffer buffer = new StringBuffer();
       List<Map<String, Object>> docs = cleanDocs(runner.process(getProcessor(inputMessages)));
       Assert.assertEquals(docs.size(), inputMessages.size());
       //assert that our input docs are equivalent to the output docs, converting the input
docs keys based
@@ -184,6 +202,23 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest
{
     }
   }
 
+  private void waitForIndex(String zookeeperQuorum) throws Exception {
+    try(CuratorFramework client = getClient(zookeeperQuorum)) {
+      client.start();
+      byte[] bytes = null;
+      do {
+        try {
+          bytes = ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper(testSensorType,
client);
+          Thread.sleep(1000);
+        }
+        catch(KeeperException.NoNodeException nne) {
+          //kindly ignore because the path might not exist just yet.
+        }
+      }
+      while(bytes == null || bytes.length == 0);
+    }
+  }
+
   public List<Map<String, Object>> cleanDocs(ProcessorResult<List<Map<String,
Object>>> result) {
     List<Map<String,Object>> docs = result.getResult();
     StringBuffer buffer = new StringBuffer();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0219e56a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
index 7207d7a..7ae373b 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
@@ -33,13 +33,7 @@ public abstract class BaseIntegrationTest {
 
     protected static ZKServerComponent getZKServerComponent(final Properties topologyProperties)
{
         return new ZKServerComponent()
-                .withPostStartCallback(new Function<ZKServerComponent, Void>() {
-                    @Nullable
-                    @Override
-                    public Void apply(@Nullable ZKServerComponent zkComponent) {
-                        topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY,
zkComponent.getConnectionString());
-                        return null;
-                    }
-                });
+                .withPostStartCallback((zkComponent) -> topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY,
zkComponent.getConnectionString())
+                );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0219e56a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
index a52ad42..57d814b 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java
@@ -23,40 +23,43 @@ import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.curator.test.TestingServer;
 import java.util.Map;
-public class ZKServerComponent implements InMemoryComponent{
-    public static final String ZOOKEEPER_PROPERTY = "kafka.zk";
-    private TestingServer testZkServer;
-    private String zookeeperUrl = null;
-    private Map<String,String> properties = null;
-    private Function<ZKServerComponent, Void> postStartCallback;
-    public String getConnectionString()
-    {
-        return this.zookeeperUrl;
-    }
-    public ZKServerComponent withPostStartCallback(Function<ZKServerComponent, Void>
f) {
-        postStartCallback = f;
-        return this;
-    }
+import java.util.Optional;
+import java.util.function.Consumer;
 
-    @Override
-    public void start() throws UnableToStartException {
-        try {
-            testZkServer = new TestingServer(true);
-            zookeeperUrl = testZkServer.getConnectString();
-            if(postStartCallback != null) {
-                postStartCallback.apply(this);
-            }
-        }catch(Exception e){
-            throw new UnableToStartException("Unable to start TestingServer",e);
-        }
-    }
+public class ZKServerComponent implements InMemoryComponent {
+  public static final String ZOOKEEPER_PROPERTY = "kafka.zk";
+  private TestingServer testZkServer;
+  private String zookeeperUrl = null;
+  private Map<String,String> properties = null;
+  private Optional<Consumer<ZKServerComponent>> postStartCallback = Optional.empty();
+  public String getConnectionString()
+  {
+    return this.zookeeperUrl;
+  }
+  public ZKServerComponent withPostStartCallback(Consumer<ZKServerComponent> f) {
+    postStartCallback = Optional.ofNullable(f);
+    return this;
+  }
 
-    @Override
-    public void stop() {
-        try {
-            if (testZkServer != null) {
-                testZkServer.close();
-            }
-        }catch(Exception e){}
+  @Override
+  public void start() throws UnableToStartException {
+    try {
+      testZkServer = new TestingServer(true);
+      zookeeperUrl = testZkServer.getConnectString();
+      if(postStartCallback.isPresent()) {
+        postStartCallback.get().accept(this);
+      }
+    }catch(Exception e){
+      throw new UnableToStartException("Unable to start TestingServer",e);
     }
+  }
+
+  @Override
+  public void stop() {
+    try {
+      if (testZkServer != null) {
+        testZkServer.close();
+      }
+    }catch(Exception e){}
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0219e56a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index cc7f827..00cc62d 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -215,14 +215,9 @@ public class PcapTopologyIntegrationTest {
     }};
     updatePropertiesCallback.apply(topologyProperties);
 
-    final ZKServerComponent zkServerComponent = new ZKServerComponent().withPostStartCallback(new
Function<ZKServerComponent, Void>() {
-      @Nullable
-      @Override
-      public Void apply(@Nullable ZKServerComponent zkComponent) {
-        topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString());
-        return null;
-      }
-    });
+    final ZKServerComponent zkServerComponent = new ZKServerComponent().withPostStartCallback(
+            (zkComponent) -> topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY,
zkComponent.getConnectionString())
+    );
     final KafkaComponent kafkaComponent = new KafkaComponent().withTopics(new ArrayList<KafkaComponent.Topic>()
{{
       add(new KafkaComponent.Topic(KAFKA_TOPIC, 1));
     }}).withTopologyProperties(topologyProperties);


Mime
View raw message