metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [53/89] [abbrv] incubator-metron git commit: Move all com/apache folders to org/apache
Date Tue, 26 Jan 2016 14:18:31 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/TopologyRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/TopologyRunner.java b/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/TopologyRunner.java
deleted file mode 100644
index da32025..0000000
--- a/metron-streaming/Metron-Topologies/src/main/java/com/apache/metron/topology/runner/TopologyRunner.java
+++ /dev/null
@@ -1,1048 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.apache.metron.topology.runner;
-
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-
-import oi.thekraken.grok.api.Grok;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.StringUtils;
-import org.apache.storm.hdfs.bolt.HdfsBolt;
-import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
-import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
-import org.apache.storm.hdfs.bolt.format.FileNameFormat;
-import org.apache.storm.hdfs.bolt.format.RecordFormat;
-import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
-import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
-import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
-import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
-import org.apache.storm.hdfs.common.rotation.MoveFileAction;
-import org.json.simple.JSONObject;
-
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-import storm.kafka.bolt.KafkaBolt;
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.Grouping;
-import backtype.storm.spout.RawScheme;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.esotericsoftware.kryo.serializers.MapSerializer;
-
-
-
-import com.apache.metron.alerts.TelemetryAlertsBolt;
-import com.apache.metron.alerts.adapters.HbaseWhiteAndBlacklistAdapter;
-import com.apache.metron.alerts.interfaces.AlertsAdapter;
-import com.apache.metron.enrichment.adapters.cif.CIFHbaseAdapter;
-import com.apache.metron.enrichment.adapters.geo.GeoMysqlAdapter;
-import com.apache.metron.enrichment.adapters.host.HostFromPropertiesFileAdapter;
-import com.apache.metron.enrichment.adapters.whois.WhoisHBaseAdapter;
-import com.apache.metron.enrichment.adapters.threat.ThreatHbaseAdapter;
-import com.apache.metron.enrichment.common.GenericEnrichmentBolt;
-import com.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import com.apache.metron.hbase.HBaseBolt;
-import com.apache.metron.hbase.HBaseStreamPartitioner;
-import com.apache.metron.hbase.TupleTableConfig;
-import com.apache.metron.helpers.topology.Cli;
-import com.apache.metron.helpers.topology.SettingsLoader;
-import com.apache.metron.index.interfaces.IndexAdapter;
-import com.apache.metron.indexing.TelemetryIndexingBolt;
-import com.apache.metron.json.serialization.JSONKryoSerializer;
-
-public abstract class TopologyRunner {
-
-	protected Configuration config;
-	protected TopologyBuilder builder;
-	protected Config conf;
-	protected boolean local_mode = true;
-	protected boolean debug = true;
-	protected String config_path = null;
-	protected String default_config_path = "Metron_Configs";
-	protected boolean success = false;
-	protected Stack<String> messageComponents = new Stack<String>();
-	protected Stack<String> errorComponents = new Stack<String>();
-	protected Stack<String> alertComponents = new Stack<String>();
-	protected Stack<String> dataComponents = new Stack<String>();
-	protected Stack<String> terminalComponents = new Stack<String>();
-
-	public void initTopology(String args[], String subdir)
-			throws Exception {
-		Cli command_line = new Cli(args);
-		command_line.parse();
-
-		System.out.println("[Metron] Starting topology deployment...");
-
-		debug = command_line.isDebug();
-		System.out.println("[Metron] Debug mode set to: " + debug);
-
-		local_mode = command_line.isLocal_mode();
-		System.out.println("[Metron] Local mode set to: " + local_mode);
-
-		if (command_line.getPath() != null) {
-			config_path = command_line.getPath();
-			System.out
-					.println("[Metron] Setting config path to external config path: "
-							+ config_path);
-		} else {
-			config_path = default_config_path;
-			System.out
-					.println("[Metron] Initializing from default internal config path: "
-							+ config_path);
-		}
-
-		String topology_conf_path = config_path + "/topologies/" + subdir
-				+ "/topology.conf";
-
-		String environment_identifier_path = config_path
-				+ "/topologies/environment_identifier.conf";
-		String topology_identifier_path = config_path + "/topologies/" + subdir
-				+ "/topology_identifier.conf";
-
-		System.out.println("[Metron] Looking for environment identifier: "
-				+ environment_identifier_path);
-		System.out.println("[Metron] Looking for topology identifier: "
-				+ topology_identifier_path);
-		System.out.println("[Metron] Looking for topology config: "
-				+ topology_conf_path);
-
-		config = new PropertiesConfiguration(topology_conf_path);
-
-		JSONObject environment_identifier = SettingsLoader
-				.loadEnvironmentIdnetifier(environment_identifier_path);
-		JSONObject topology_identifier = SettingsLoader
-				.loadTopologyIdnetifier(topology_identifier_path);
-
-		String topology_name = SettingsLoader.generateTopologyName(
-				environment_identifier, topology_identifier);
-
-		System.out.println("[Metron] Initializing Topology: " + topology_name);
-
-		builder = new TopologyBuilder();
-
-		conf = new Config();
-		conf.registerSerialization(JSONObject.class, MapSerializer.class);
-		conf.setDebug(debug);
-
-		System.out.println("[Metron] Initializing Spout: " + topology_name);
-
-		if (command_line.isGenerator_spout()) {
-			String component_name = config.getString("spout.test.name",
-					"DefaultTopologySpout");
-			success = initializeTestingSpout(component_name);
-			messageComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"spout.test");
-		}
-
-		if (!command_line.isGenerator_spout()) {
-			String component_name = config.getString("spout.kafka.name",
-					"DefaultTopologyKafkaSpout");
-
-			success = initializeKafkaSpout(component_name);
-			messageComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"spout.kafka");
-		}
-
-		if (config.getBoolean("bolt.parser.enabled", true)) {
-			String component_name = config.getString("bolt.parser.name",
-					"DefaultTopologyParserBot");
-
-			success = initializeParsingBolt(topology_name, component_name);
-			messageComponents.add(component_name);
-			errorComponents.add(component_name);
-
-			dataComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.parser");
-		}
-
-		if (config.getBoolean("bolt.enrichment.geo.enabled", false)) {
-			String component_name = config.getString(
-					"bolt.enrichment.geo.name", "DefaultGeoEnrichmentBolt");
-
-			success = initializeGeoEnrichment(topology_name, component_name);
-			messageComponents.add(component_name);
-			errorComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.enrichment.geo");
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"mysql");
-		}
-
-		if (config.getBoolean("bolt.enrichment.host.enabled", false)) {
-			String component_name = config.getString(
-					"bolt.enrichment.host.name", "DefaultHostEnrichmentBolt");
-
-			success = initializeHostsEnrichment(topology_name, component_name,
-					"Metron_Configs/etc/whitelists/known_hosts.conf");
-			messageComponents.add(component_name);
-			errorComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.enrichment.host");
-		}
-
-		if (config.getBoolean("bolt.enrichment.whois.enabled", false)) {
-			String component_name = config.getString(
-					"bolt.enrichment.whois.name", "DefaultWhoisEnrichmentBolt");
-
-			success = initializeWhoisEnrichment(topology_name, component_name);
-			messageComponents.add(component_name);
-			errorComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.enrichment.whois");
-		}
-
-		if (config.getBoolean("bolt.enrichment.cif.enabled", false)) {
-			String component_name = config.getString(
-					"bolt.enrichment.cif.name", "DefaultCIFEnrichmentBolt");
-
-			success = initializeCIFEnrichment(topology_name, component_name);
-			messageComponents.add(component_name);
-			errorComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.enrichment.cif");
-		}
-		
-		if (config.getBoolean("bolt.enrichment.threat.enabled", false)) {
-			String component_name = config.getString(
-					"bolt.enrichment.threat.name", "DefaultThreatEnrichmentBolt");
-
-			success = initializeThreatEnrichment(topology_name, component_name);
-			messageComponents.add(component_name);
-			errorComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.enrichment.threat");
-		}
-
-		if (config.getBoolean("bolt.alerts.enabled", false)) {
-			String component_name = config.getString("bolt.alerts.name",
-					"DefaultAlertsBolt");
-
-			success = initializeAlerts(topology_name, component_name,
-					config_path + "/topologies/" + subdir + "/alerts.xml",
-					environment_identifier, topology_identifier);
-
-			messageComponents.add(component_name);
-			errorComponents.add(component_name);
-			alertComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.alerts");
-		}
-
-		if (config.getBoolean("bolt.alerts.indexing.enabled") && config.getBoolean("bolt.alerts.enabled")) {
-
-			String component_name = config.getString(
-					"bolt.alerts.indexing.name", "DefaultAlertsBolt");
-
-			success = initializeAlertIndexing(component_name);
-			terminalComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.alerts.indexing");
-		}
-
-		if (config.getBoolean("bolt.kafka.enabled", false)) {
-			String component_name = config.getString("bolt.kafka.name",
-					"DefaultKafkaBolt");
-
-			success = initializeKafkaBolt(component_name);
-			terminalComponents.add(component_name);
-
-			System.out.println("[Metron] Component " + component_name
-					+ " initialized");
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.kafka");
-		}
-
-		if (config.getBoolean("bolt.indexing.enabled", true)) {
-			String component_name = config.getString("bolt.indexing.name",
-					"DefaultIndexingBolt");
-
-			success = initializeIndexingBolt(component_name);
-			errorComponents.add(component_name);
-			terminalComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.indexing");
-		}
-
-		if (config.getBoolean("bolt.hdfs.enabled", false)) {
-			String component_name = config.getString("bolt.hdfs.name",
-					"DefaultHDFSBolt");
-
-			success = initializeHDFSBolt(topology_name, component_name);
-			terminalComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.hdfs");
-		}
-
-		if (config.getBoolean("bolt.error.indexing.enabled")) {
-			String component_name = config.getString(
-					"bolt.error.indexing.name", "DefaultErrorIndexingBolt");
-
-			success = initializeErrorIndexBolt(component_name);
-			terminalComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.error");
-		}
-
-		if (config.containsKey("bolt.hbase.enabled")
-				&& config.getBoolean("bolt.hbase.enabled")) {
-			String component_name = config.getString("bolt.hbase.name",
-					"DefaultHbaseBolt");
-
-			String shuffleType = config.getString("bolt.hbase.shuffle.type",
-					"direct");
-			success = initializeHbaseBolt(component_name, shuffleType);
-			terminalComponents.add(component_name);
-
-			System.out.println("[Metron] ------Component " + component_name
-					+ " initialized with the following settings:");
-
-			SettingsLoader.printConfigOptions((PropertiesConfiguration) config,
-					"bolt.hbase");
-		}
-
-		System.out.println("[Metron] Topology Summary: ");
-		System.out.println("[Metron] Message Stream: "
-				+ printComponentStream(messageComponents));
-		System.out.println("[Metron] Alerts Stream: "
-				+ printComponentStream(alertComponents));
-		System.out.println("[Metron] Error Stream: "
-				+ printComponentStream(errorComponents));
-		System.out.println("[Metron] Data Stream: "
-				+ printComponentStream(dataComponents));
-		System.out.println("[Metron] Terminal Components: "
-				+ printComponentStream(terminalComponents));
-
-		if (local_mode) {
-			conf.setNumWorkers(config.getInt("num.workers"));
-			conf.setMaxTaskParallelism(1);
-			LocalCluster cluster = new LocalCluster();
-			cluster.submitTopology(topology_name, conf,
-					builder.createTopology());
-		} else {
-
-			conf.setNumWorkers(config.getInt("num.workers"));
-			conf.setNumAckers(config.getInt("num.ackers"));
-			StormSubmitter.submitTopology(topology_name, conf,
-					builder.createTopology());
-		}
-
-	}
-
-	private String printComponentStream(List<String> messageComponents) {
-		StringBuilder print_string = new StringBuilder();
-
-		for (String component : messageComponents) {
-			print_string.append(component + " -> ");
-		}
-
-		print_string.append("[TERMINAL COMPONENT]");
-
-		return print_string.toString();
-	}
-
-	public boolean initializeHbaseBolt(String name, String shuffleType) {
-
-		try {
-
-			String messageUpstreamComponent = dataComponents.get(dataComponents
-					.size()-1);
-
-			System.out.println("[Metron] ------" + name
-					+ " is initializing from " + messageUpstreamComponent);
-
-			String tableName = config.getString("bolt.hbase.table.name")
-					.toString();
-			TupleTableConfig hbaseBoltConfig = new TupleTableConfig(tableName,
-					config.getString("bolt.hbase.table.key.tuple.field.name")
-							.toString(), config.getString(
-							"bolt.hbase.table.timestamp.tuple.field.name")
-							.toString());
-
-			String allColumnFamiliesColumnQualifiers = config.getString(
-					"bolt.hbase.table.fields").toString();
-			// This is expected in the form
-			// "<cf1>:<cq11>,<cq12>,<cq13>|<cf2>:<cq21>,<cq22>|......."
-			String[] tokenizedColumnFamiliesWithColumnQualifiers = StringUtils
-					.split(allColumnFamiliesColumnQualifiers, "\\|");
-			for (String tokenizedColumnFamilyWithColumnQualifiers : tokenizedColumnFamiliesWithColumnQualifiers) {
-				String[] cfCqTokens = StringUtils.split(
-						tokenizedColumnFamilyWithColumnQualifiers, ":");
-				String columnFamily = cfCqTokens[0];
-				String[] columnQualifiers = StringUtils.split(cfCqTokens[1],
-						",");
-				for (String columnQualifier : columnQualifiers) {
-					hbaseBoltConfig.addColumn(columnFamily, columnQualifier);
-				}
-
-				// hbaseBoltConfig.setDurability(Durability.valueOf(conf.get(
-				// "storm.topology.pcap.bolt.hbase.durability").toString()));
-
-				hbaseBoltConfig.setBatch(Boolean.valueOf(config.getString(
-						"bolt.hbase.enable.batching").toString()));
-
-				HBaseBolt hbase_bolt = new HBaseBolt(hbaseBoltConfig,
-						config.getString("kafka.zk.list"),
-						config.getString("kafka.zk.port"));
-				hbase_bolt.setAutoAck(true);
-
-				BoltDeclarer declarer = builder.setBolt(name, hbase_bolt,
-						config.getInt("bolt.hbase.parallelism.hint"))
-						.setNumTasks(config.getInt("bolt.hbase.num.tasks"));
-
-				if (Grouping._Fields.CUSTOM_OBJECT.toString().equalsIgnoreCase(
-						shuffleType)) {
-					declarer.customGrouping(
-							messageUpstreamComponent,
-							"pcap_data_stream",
-							new HBaseStreamPartitioner(
-									hbaseBoltConfig.getTableName(),
-									0,
-									Integer.parseInt(conf
-											.get("bolt.hbase.partitioner.region.info.refresh.interval.mins")
-											.toString())));
-				} else if (Grouping._Fields.DIRECT.toString().equalsIgnoreCase(
-						shuffleType)) {
-					declarer.fieldsGrouping(messageUpstreamComponent,
-							"pcap_data_stream", new Fields("pcap_id"));
-				}
-
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.exit(0);
-		}
-		return true;
-	}
-
-	private boolean initializeErrorIndexBolt(String component_name) {
-		try {
-			
-			Class loaded_class = Class.forName(config.getString("bolt.error.indexing.adapter"));
-			IndexAdapter adapter = (IndexAdapter) loaded_class.newInstance();
-
-			String dateFormat = "yyyy.MM";
-			if (config.containsKey("bolt.alerts.indexing.timestamp")) {
-				dateFormat = config.getString("bolt.alerts.indexing.timestamp");
-			}
-			
-			TelemetryIndexingBolt indexing_bolt = new TelemetryIndexingBolt()
-					.withIndexIP(config.getString("es.ip"))
-					.withIndexPort(config.getInt("es.port"))
-					.withClusterName(config.getString("es.clustername"))
-					.withIndexName(
-							config.getString("bolt.error.indexing.indexname"))
-					.withDocumentName(
-							config.getString("bolt.error.indexing.documentname"))
-					.withIndexTimestamp(dateFormat)
-					.withBulk(config.getInt("bolt.error.indexing.bulk"))
-					.withIndexAdapter(adapter)
-					.withMetricConfiguration(config);
-
-			BoltDeclarer declarer = builder
-					.setBolt(
-							component_name,
-							indexing_bolt,
-							config.getInt("bolt.error.indexing.parallelism.hint"))
-					.setNumTasks(config.getInt("bolt.error.indexing.num.tasks"));
-
-			for (String component : errorComponents)
-				declarer.shuffleGrouping(component, "error");
-
-			return true;
-		} catch (Exception e) {
-			e.printStackTrace();
-			return false;
-		}
-
-	}
-
-	private boolean initializeKafkaSpout(String name) {
-		try {
-
-			BrokerHosts zk = new ZkHosts(config.getString("kafka.zk"));
-			String input_topic = config.getString("spout.kafka.topic");
-			SpoutConfig kafkaConfig = new SpoutConfig(zk, input_topic, "",
-					input_topic);
-			kafkaConfig.scheme = new SchemeAsMultiScheme(new RawScheme());
-			kafkaConfig.forceFromStart = Boolean.valueOf("True");
-			kafkaConfig.startOffsetTime = -1;
-
-			builder.setSpout(name, new KafkaSpout(kafkaConfig),
-					config.getInt("spout.kafka.parallelism.hint")).setNumTasks(
-					config.getInt("spout.kafka.num.tasks"));
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.exit(0);
-		}
-
-		return true;
-	}
-
-	abstract boolean initializeParsingBolt(String topology_name, String name);
-
-	abstract boolean initializeTestingSpout(String name);
-
-	private boolean initializeGeoEnrichment(String topology_name, String name) {
-
-		try {
-			String messageUpstreamComponent = messageComponents
-					.get(messageComponents.size() - 1);
-
-			System.out.println("[Metron] ------" + name
-					+ " is initializing from " + messageUpstreamComponent);
-
-			
-			String[] keys_from_settings = config.getStringArray("bolt.enrichment.geo.fields");
-			List<String> geo_keys = new ArrayList<String>(Arrays.asList(keys_from_settings));
-			
-			GeoMysqlAdapter geo_adapter = new GeoMysqlAdapter(
-					config.getString("mysql.ip"), config.getInt("mysql.port"),
-					config.getString("mysql.username"),
-					config.getString("mysql.password"),
-					config.getString("bolt.enrichment.geo.adapter.table"));
-
-			GenericEnrichmentBolt geo_enrichment = new GenericEnrichmentBolt()
-					.withEnrichmentTag(
-							config.getString("bolt.enrichment.geo.enrichment_tag"))
-					.withOutputFieldName(topology_name)
-					.withAdapter(geo_adapter)
-					.withMaxTimeRetain(
-							config.getInt("bolt.enrichment.geo.MAX_TIME_RETAIN_MINUTES"))
-					.withMaxCacheSize(
-							config.getInt("bolt.enrichment.geo.MAX_CACHE_SIZE_OBJECTS_NUM"))
-					.withKeys(geo_keys).withMetricConfiguration(config);
-
-			builder.setBolt(name, geo_enrichment,
-					config.getInt("bolt.enrichment.geo.parallelism.hint"))
-					.fieldsGrouping(messageUpstreamComponent, "message",
-							new Fields("key"))
-					.setNumTasks(config.getInt("bolt.enrichment.geo.num.tasks"));
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.exit(0);
-		}
-
-		return true;
-	}
-
-	private boolean initializeHostsEnrichment(String topology_name,
-			String name, String hosts_path) {
-
-		try {
-
-			String messageUpstreamComponent = messageComponents
-					.get(messageComponents.size() - 1);
-
-			System.out.println("[Metron] ------" + name
-					+ " is initializing from " + messageUpstreamComponent);
-
-			List<String> hosts_keys = new ArrayList<String>();
-			hosts_keys.add(config.getString("source.ip"));
-			hosts_keys.add(config.getString("dest.ip"));
-
-			Map<String, JSONObject> known_hosts = SettingsLoader
-					.loadKnownHosts(hosts_path);
-
-			HostFromPropertiesFileAdapter host_adapter = new HostFromPropertiesFileAdapter(
-					known_hosts);
-
-			GenericEnrichmentBolt host_enrichment = new GenericEnrichmentBolt()
-					.withEnrichmentTag(
-							config.getString("bolt.enrichment.host.enrichment_tag"))
-					.withAdapter(host_adapter)
-					.withMaxTimeRetain(
-							config.getInt("bolt.enrichment.host.MAX_TIME_RETAIN_MINUTES"))
-					.withMaxCacheSize(
-							config.getInt("bolt.enrichment.host.MAX_CACHE_SIZE_OBJECTS_NUM"))
-					.withOutputFieldName(topology_name).withKeys(hosts_keys)
-					.withMetricConfiguration(config);
-
-			builder.setBolt(name, host_enrichment,
-					config.getInt("bolt.enrichment.host.parallelism.hint"))
-					.fieldsGrouping(messageUpstreamComponent, "message",
-							new Fields("key"))
-					.setNumTasks(
-							config.getInt("bolt.enrichment.host.num.tasks"));
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.exit(0);
-		}
-
-		return true;
-	}
-
-	@SuppressWarnings("rawtypes")
-	private boolean initializeAlerts(String topology_name, String name,
-			String alerts_path, JSONObject environment_identifier,
-			JSONObject topology_identifier) {
-		try {
-			
-			Class loaded_class = Class.forName(config.getString("bolt.alerts.adapter"));
-			Constructor constructor = loaded_class.getConstructor(new Class[] { Map.class});
-			
-			Map<String, String> settings = SettingsLoader.getConfigOptions((PropertiesConfiguration)config, config.getString("bolt.alerts.adapter") + ".");
-			
-			System.out.println("Adapter Settings: ");
-			SettingsLoader.printOptionalSettings(settings);
-			
-			AlertsAdapter alerts_adapter = (AlertsAdapter) constructor.newInstance(settings);
-			
-	
-
-			String messageUpstreamComponent = messageComponents
-					.get(messageComponents.size() - 1);
-
-			System.out.println("[Metron] ------" + name
-					+ " is initializing from " + messageUpstreamComponent);
-
-			JSONObject alerts_identifier = SettingsLoader
-					.generateAlertsIdentifier(environment_identifier,
-							topology_identifier);
-
-			 
-
-			TelemetryAlertsBolt alerts_bolt = new TelemetryAlertsBolt()
-					.withIdentifier(alerts_identifier).withMaxCacheSize(1000)
-					.withMaxTimeRetain(3600).withAlertsAdapter(alerts_adapter)
-					.withOutputFieldName("message")
-					.withMetricConfiguration(config);
-
-			builder.setBolt(name, alerts_bolt,
-					config.getInt("bolt.alerts.parallelism.hint"))
-					.fieldsGrouping(messageUpstreamComponent, "message",
-							new Fields("key"))
-					.setNumTasks(config.getInt("bolt.alerts.num.tasks"));
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.exit(0);
-		}
-		return true;
-	}
-
-	private boolean initializeAlertIndexing(String name) {
-		
-		try{
-		String messageUpstreamComponent = alertComponents.get(alertComponents
-				.size() - 1);
-
-		System.out.println("[Metron] ------" + name + " is initializing from "
-				+ messageUpstreamComponent);
-		
-		Class loaded_class = Class.forName(config.getString("bolt.alerts.indexing.adapter"));
-		IndexAdapter adapter = (IndexAdapter) loaded_class.newInstance();
-
-		String dateFormat = "yyyy.MM.dd";
-		if (config.containsKey("bolt.alerts.indexing.timestamp")) {
-			dateFormat = config.getString("bolt.alerts.indexing.timestamp");
-		}
-		TelemetryIndexingBolt indexing_bolt = new TelemetryIndexingBolt()
-				.withIndexIP(config.getString("es.ip"))
-				.withIndexPort(config.getInt("es.port"))
-				.withClusterName(config.getString("es.clustername"))
-				.withIndexName(
-						config.getString("bolt.alerts.indexing.indexname"))
-				.withDocumentName(
-						config.getString("bolt.alerts.indexing.documentname"))
-				.withIndexTimestamp(dateFormat)
-				.withBulk(config.getInt("bolt.alerts.indexing.bulk"))
-				.withIndexAdapter(adapter)
-				.withMetricConfiguration(config);
-
-		String alerts_name = config.getString("bolt.alerts.indexing.name");
-		builder.setBolt(alerts_name, indexing_bolt,
-				config.getInt("bolt.indexing.parallelism.hint"))
-				.shuffleGrouping(messageUpstreamComponent, "alert")
-				.setNumTasks(config.getInt("bolt.indexing.num.tasks"));
-		}
-		catch(Exception e)
-		{
-			e.printStackTrace();
-			return false;
-		}
-
-		return true;
-	}
-
-	private boolean initializeKafkaBolt(String name) {
-		try {
-
-			String messageUpstreamComponent = messageComponents
-					.get(messageComponents.size() - 1);
-
-			System.out.println("[Metron] ------" + name
-					+ " is initializing from " + messageUpstreamComponent);
-
-			Map<String, String> kafka_broker_properties = new HashMap<String, String>();
-			kafka_broker_properties.put("zk.connect",
-					config.getString("kafka.zk"));
-			kafka_broker_properties.put("metadata.broker.list",
-					config.getString("kafka.br"));
-
-			kafka_broker_properties.put("serializer.class",
-					"com.apache.metron.json.serialization.JSONKafkaSerializer");
-
-			kafka_broker_properties.put("key.serializer.class",
-					"kafka.serializer.StringEncoder");
-
-			String output_topic = config.getString("bolt.kafka.topic");
-
-			conf.put("kafka.broker.properties", kafka_broker_properties);
-			conf.put("topic", output_topic);
-
-			builder.setBolt(name, new KafkaBolt<String, JSONObject>(),
-					config.getInt("bolt.kafka.parallelism.hint"))
-					.shuffleGrouping(messageUpstreamComponent, "message")
-					.setNumTasks(config.getInt("bolt.kafka.num.tasks"));
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.exit(0);
-		}
-		return true;
-	}
-
-	private boolean initializeWhoisEnrichment(String topology_name, String name) {
-		try {
-
-			String messageUpstreamComponent = messageComponents
-					.get(messageComponents.size() - 1);
-
-			System.out.println("[Metron] ------" + name
-					+ " is initializing from " + messageUpstreamComponent);
-
-			String[] keys_from_settings = config.getString("bolt.enrichment.whois.fields").split(",");
-			List<String> whois_keys = new ArrayList<String>(Arrays.asList(keys_from_settings));
-
-			EnrichmentAdapter whois_adapter = new WhoisHBaseAdapter(
-					config.getString("bolt.enrichment.whois.hbase.table.name"),
-					config.getString("kafka.zk.list"),
-					config.getString("kafka.zk.port"));
-
-			GenericEnrichmentBolt whois_enrichment = new GenericEnrichmentBolt()
-					.withEnrichmentTag(
-							config.getString("bolt.enrichment.whois.enrichment_tag"))
-					.withOutputFieldName(topology_name)
-					.withAdapter(whois_adapter)
-					.withMaxTimeRetain(
-							config.getInt("bolt.enrichment.whois.MAX_TIME_RETAIN_MINUTES"))
-					.withMaxCacheSize(
-							config.getInt("bolt.enrichment.whois.MAX_CACHE_SIZE_OBJECTS_NUM"))
-					.withKeys(whois_keys).withMetricConfiguration(config);
-
-			builder.setBolt(name, whois_enrichment,
-					config.getInt("bolt.enrichment.whois.parallelism.hint"))
-					.fieldsGrouping(messageUpstreamComponent, "message",
-							new Fields("key"))
-					.setNumTasks(
-							config.getInt("bolt.enrichment.whois.num.tasks"));
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.exit(0);
-		}
-
-		return true;
-	}
-
-	private boolean initializeIndexingBolt(String name) {
-		try {
-
-			String messageUpstreamComponent = messageComponents
-					.get(messageComponents.size() - 1);
-
-			System.out.println("[Metron] ------" + name
-					+ " is initializing from " + messageUpstreamComponent);
-			
-			Class loaded_class = Class.forName(config.getString("bolt.indexing.adapter"));
-			IndexAdapter adapter = (IndexAdapter) loaded_class.newInstance();
-			
-			Map<String, String> settings = SettingsLoader.getConfigOptions((PropertiesConfiguration)config, "optional.settings.bolt.index.search.");
-			
-			if(settings != null && settings.size() > 0)
-			{
-				adapter.setOptionalSettings(settings);
-				System.out.println("[Metron] Index Bolt picket up optional settings:");
-				SettingsLoader.printOptionalSettings(settings);			
-			}
-
-			// dateFormat defaults to hourly if not specified
-			String dateFormat = "yyyy.MM.dd.hh";
-			if (config.containsKey("bolt.indexing.timestamp")) {
-				dateFormat = config.getString("bolt.indexing.timestamp");
-			}
-			TelemetryIndexingBolt indexing_bolt = new TelemetryIndexingBolt()
-					.withIndexIP(config.getString("es.ip"))
-					.withIndexPort(config.getInt("es.port"))
-					.withClusterName(config.getString("es.clustername"))
-					.withIndexName(config.getString("bolt.indexing.indexname"))
-					.withIndexTimestamp(dateFormat)
-					.withDocumentName(
-							config.getString("bolt.indexing.documentname"))
-					.withBulk(config.getInt("bolt.indexing.bulk"))
-					.withIndexAdapter(adapter)
-					.withMetricConfiguration(config);
-
-			builder.setBolt(name, indexing_bolt,
-					config.getInt("bolt.indexing.parallelism.hint"))
-					.fieldsGrouping(messageUpstreamComponent, "message",
-							new Fields("key"))
-					.setNumTasks(config.getInt("bolt.indexing.num.tasks"));
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.exit(0);
-		}
-
-		return true;
-	}
-	
-	
-	private boolean initializeThreatEnrichment(String topology_name, String name) {
-		try {
-
-			String messageUpstreamComponent = messageComponents
-					.get(messageComponents.size() - 1);
-
-			System.out.println("[Metron] ------" + name
-					+ " is initializing from " + messageUpstreamComponent);
-
-			String[] fields = config.getStringArray("bolt.enrichment.threat.fields");
-			List<String> threat_keys = new ArrayList<String>(Arrays.asList(fields));
-
-			GenericEnrichmentBolt threat_enrichment = new GenericEnrichmentBolt()
-					.withEnrichmentTag(
-							config.getString("bolt.enrichment.threat.enrichment_tag"))
-					.withAdapter(
-							new ThreatHbaseAdapter(config
-									.getString("kafka.zk.list"), config
-									.getString("kafka.zk.port"), config
-									.getString("bolt.enrichment.threat.tablename")))
-					.withOutputFieldName(topology_name)
-					.withEnrichmentTag(config.getString("bolt.enrichment.threat.enrichment_tag"))
-					.withKeys(threat_keys)
-					.withMaxTimeRetain(
-							config.getInt("bolt.enrichment.threat.MAX_TIME_RETAIN_MINUTES"))
-					.withMaxCacheSize(
-							config.getInt("bolt.enrichment.threat.MAX_CACHE_SIZE_OBJECTS_NUM"))
-					.withMetricConfiguration(config);
-
-			builder.setBolt(name, threat_enrichment,
-					config.getInt("bolt.enrichment.threat.parallelism.hint"))
-					.fieldsGrouping(messageUpstreamComponent, "message",
-							new Fields("key"))
-					.setNumTasks(config.getInt("bolt.enrichment.threat.num.tasks"));
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.exit(0);
-		}
-
-		return true;
-	}
-
-	private boolean initializeCIFEnrichment(String topology_name, String name) {
-		try {
-
-			String messageUpstreamComponent = messageComponents
-					.get(messageComponents.size() - 1);
-
-			System.out.println("[Metron] ------" + name
-					+ " is initializing from " + messageUpstreamComponent);
-
-			List<String> cif_keys = new ArrayList<String>();
-
-			String[] ipFields = config.getStringArray("bolt.enrichment.cif.fields.ip");
-			cif_keys.addAll(Arrays.asList(ipFields));
-			
-			String[] hostFields = config.getStringArray("bolt.enrichment.cif.fields.host");
-			cif_keys.addAll(Arrays.asList(hostFields));
-			
-			String[] emailFields = config.getStringArray("bolt.enrichment.cif.fields.email");
-			cif_keys.addAll(Arrays.asList(emailFields));
-			
-			GenericEnrichmentBolt cif_enrichment = new GenericEnrichmentBolt()
-					.withEnrichmentTag(
-							config.getString("bolt.enrichment.cif.enrichment_tag"))
-					.withAdapter(
-							new CIFHbaseAdapter(config
-									.getString("kafka.zk.list"), config
-									.getString("kafka.zk.port"), config
-									.getString("bolt.enrichment.cif.tablename")))
-					.withOutputFieldName(topology_name)
-					.withKeys(cif_keys)
-					.withMaxTimeRetain(
-							config.getInt("bolt.enrichment.cif.MAX_TIME_RETAIN_MINUTES"))
-					.withMaxCacheSize(
-							config.getInt("bolt.enrichment.cif.MAX_CACHE_SIZE_OBJECTS_NUM"))
-					.withMetricConfiguration(config);
-
-			builder.setBolt(name, cif_enrichment,
-					config.getInt("bolt.enrichment.cif.parallelism.hint"))
-					.fieldsGrouping(messageUpstreamComponent, "message",
-							new Fields("key"))
-					.setNumTasks(config.getInt("bolt.enrichment.cif.num.tasks"));
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.exit(0);
-		}
-
-		return true;
-	}
-
-	private boolean initializeHDFSBolt(String topology_name, String name) {
-		try {
-
-			String messageUpstreamComponent = messageComponents
-					.get(messageComponents.size() - 1);
-
-			System.out.println("[Metron] ------" + name
-					+ " is initializing from " + messageUpstreamComponent);
-
-			RecordFormat format = new DelimitedRecordFormat()
-					.withFieldDelimiter(
-							config.getString("bolt.hdfs.field.delimiter")
-									.toString()).withFields(
-							new Fields("message"));
-
-			// sync the file system after every x number of tuples
-			SyncPolicy syncPolicy = new CountSyncPolicy(Integer.valueOf(config
-					.getString("bolt.hdfs.batch.size").toString()));
-
-			// rotate files when they reach certain size
-			FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(
-					Float.valueOf(config.getString(
-							"bolt.hdfs.file.rotation.size.in.mb").toString()),
-					Units.MB);
-
-			FileNameFormat fileNameFormat = new DefaultFileNameFormat()
-					.withPath(config.getString("bolt.hdfs.wip.file.path")
-							.toString());
-
-			// Post rotate action
-			MoveFileAction moveFileAction = (new MoveFileAction())
-					.toDestination(config.getString(
-							"bolt.hdfs.finished.file.path").toString());
-
-			HdfsBolt hdfsBolt = new HdfsBolt()
-					.withFsUrl(
-							config.getString("bolt.hdfs.file.system.url")
-									.toString())
-					.withFileNameFormat(fileNameFormat)
-					.withRecordFormat(format)
-					.withRotationPolicy(rotationPolicy)
-					.withSyncPolicy(syncPolicy)
-					.addRotationAction(moveFileAction);
-			if (config.getString("bolt.hdfs.compression.codec.class") != null) {
-				hdfsBolt.withCompressionCodec(config.getString(
-						"bolt.hdfs.compression.codec.class").toString());
-			}
-
-			builder.setBolt(name, hdfsBolt,
-					config.getInt("bolt.hdfs.parallelism.hint"))
-					.shuffleGrouping(messageUpstreamComponent, "message")
-					.setNumTasks(config.getInt("bolt.hdfs.num.tasks"));
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.exit(0);
-		}
-
-		return true;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/bolts/PrintingBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/bolts/PrintingBolt.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/bolts/PrintingBolt.java
new file mode 100644
index 0000000..4938585
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/bolts/PrintingBolt.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.metron.test.bolts;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+
+@SuppressWarnings("serial")
+public class PrintingBolt extends BaseRichBolt {
+
+	@SuppressWarnings("rawtypes")
+	public void prepare(Map stormConf, TopologyContext context,
+			OutputCollector collector) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	public void execute(Tuple input) {
+		System.out.println("---------[RECEIVED] " + input);
+		
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		// TODO Auto-generated method stub
+		
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java
new file mode 100644
index 0000000..886baca
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.metron.test.filereaders;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.LinkedList;
+import java.util.List;
+
+public class FileReader {
+	public List<String> readFromFile(String filename) throws IOException 
+	{
+		
+		System.out.println("Reading stream from " + filename);
+
+		List<String> lines = new LinkedList<String>();
+
+		InputStream stream = Thread.currentThread().getContextClassLoader()
+				.getResourceAsStream(filename);
+
+		DataInputStream in = new DataInputStream(stream);
+		BufferedReader br = new BufferedReader(new InputStreamReader(in));
+		String strLine;
+		while ((strLine = br.readLine()) != null) 
+		{
+			//System.out.println("-----------------I READ: " + strLine);
+			lines.add(strLine);
+		}
+		//br.close();
+
+		return lines;
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
new file mode 100644
index 0000000..0f52e26
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.metron.test.spouts;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import com.apache.metron.test.filereaders.FileReader;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+
+public class GenericInternalTestSpout extends BaseRichSpout {
+
+	
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -2379344923143372543L;
+
+	List<String> jsons;
+	
+	private String _filename;
+	private int _delay = 100;
+	private boolean _repeating = true;
+	
+	private SpoutOutputCollector _collector;
+	private FileReader Reader;
+	private int cnt = 0;
+	
+	public GenericInternalTestSpout withFilename(String filename)
+	{
+		_filename = filename;
+		return this;
+	}
+	public GenericInternalTestSpout withMilisecondDelay(int delay)
+	{
+		_delay = delay;
+		return this;
+	}
+	
+	public GenericInternalTestSpout withRepeating(boolean repeating)
+	{
+		_repeating = repeating;
+		return this;
+	}
+
+
+	@SuppressWarnings("rawtypes") 
+	public void open(Map conf, TopologyContext context,
+			SpoutOutputCollector collector) {
+		
+		_collector = collector;
+		try {
+			Reader =  new FileReader();
+			jsons = Reader.readFromFile(_filename);
+
+			
+		} catch (IOException e) 
+		{
+			System.out.println("Could not read sample JSONs");
+			e.printStackTrace();
+		}
+		
+	}
+
+	public void nextTuple() {
+		Utils.sleep(_delay);
+		
+		if(cnt < jsons.size())
+		{
+			_collector.emit(new Values(jsons.get(cnt).getBytes()));
+		}
+		cnt ++;
+		
+		if(_repeating && cnt == jsons.size() -1 )
+			cnt = 0;
+	}
+
+	@Override
+	public void ack(Object id) {
+	}
+
+	@Override
+	public void fail(Object id) {
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("message"));
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/PcapSimulatorSpout.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/PcapSimulatorSpout.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/PcapSimulatorSpout.java
new file mode 100644
index 0000000..6e47b95
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/PcapSimulatorSpout.java
@@ -0,0 +1,153 @@
+package com.apache.metron.test.spouts;
+
+import java.util.Map;
+import java.util.Random;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import com.apache.metron.pcap.PcapUtils;
+
+
+/**
+ * The Class PcapSimulatorSpout.
+ */
+public class PcapSimulatorSpout extends BaseRichSpout {
+
+  /** The Constant serialVersionUID. */
+  private static final long serialVersionUID = -5878104600899840638L;
+
+  /** The collector. */
+  private SpoutOutputCollector collector = null;
+
+  /** The Constant randomIpSegmentGenerator. */
+  private static final Random randomIpSegmentGenerator = new Random(255);
+
+  /** The Constant randomPortGenerator. */
+  private static final Random randomPortGenerator = new Random(64000);
+
+  /** The Constant randomJsonGenerator. */
+  private static final Random randomJsonGenerator = new Random(8);
+
+  /** The Constant randomProtocolGenerator. */
+  private static final Random randomProtocolGenerator = new Random(255);
+
+  /** The message size. */
+  private static int messageSize = 30000;
+
+  /** The pcap. */
+  private static byte[] pcap = new byte[messageSize];
+
+  /** The Constant randomPcapGenerator. */
+  private static final Random randomPcapGenerator = new Random();
+
+  /** The json doc. */
+  private static String jsonDoc;
+
+  /** The ts. */
+  private static long ts;
+
+  /** The group key. */
+  private static String groupKey;
+
+  /** The ip addr. */
+  StringBuffer ipAddr = new StringBuffer();
+
+  /** The Constant jsonDocs. */
+  private static final String[] jsonDocs = {
+      "{ \"header\": { \"IncLen\": 124,\"OrigLen\": 124,\"TsSec\": 1391740061,\"TsUsec\": 723610},\"ipv4header\": { \"Destination\": -1407317716,\"DestinationAddress\": \"172.30.9.44\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 22550,\"Id\": 30686,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317715,\"SourceAddress\": \"172.30.9.45\",\"Tos\": 0,\"TotalLength\": 110,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": 1331776820,\"Checksum\": 21822,\"DataLength\": 58,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.44\",\"DestinationPort\": 9092,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": 337331842,\"SessionKey\": \"172.30.9.45:56412 -> 172.30.9.44:9092\",\"SourceAddress\": \"172.30.9.45\",\"SourcePort\": 56412,\"TotalLength\": 90,\"UrgentPointer\": 0,\"Window\": 115}}",
+      "{ \"header\": { \"IncLen\": 60,\"OrigLen\": 60,\"TsSec\": 1391743533,\"TsUsec\": 523808},\"ipv4header\": { \"Destination\": 202,\"DestinationAddress\": \"0.0.0.202\",\"Flags\": 0,\"FragmentOffset\": 572,\"HeaderChecksum\": 21631,\"Id\": 2,\"Ihl\": 8,\"Protocol\": 0,\"Source\": -285366020,\"SourceAddress\": \"238.253.168.252\",\"Tos\": 66,\"TotalLength\": 768,\"Ttl\": 128,\"Version\": 4}} ",
+      "{ \"header\": { \"IncLen\": 64,\"OrigLen\": 64,\"TsSec\": 1391729466,\"TsUsec\": 626286},\"ipv4header\": { \"Destination\": -55296,\"DestinationAddress\": \"255.255.40.0\",\"Flags\": 0,\"FragmentOffset\": 0,\"HeaderChecksum\": 28302,\"Id\": 62546,\"Ihl\": 0,\"Protocol\": 0,\"Source\": 151031295,\"SourceAddress\": \"9.0.141.255\",\"Tos\": 60,\"TotalLength\": 14875,\"Ttl\": 0,\"Version\": 0}}",
+      "{ \"header\": { \"IncLen\": 64,\"OrigLen\": 64,\"TsSec\": 1391729470,\"TsUsec\": 404175},\"ipv4header\": { \"Destination\": -55296,\"DestinationAddress\": \"255.255.40.0\",\"Flags\": 0,\"FragmentOffset\": 0,\"HeaderChecksum\": 53034,\"Id\": 62546,\"Ihl\": 0,\"Protocol\": 0,\"Source\": 100699647,\"SourceAddress\": \"6.0.141.255\",\"Tos\": 60,\"TotalLength\": 15899,\"Ttl\": 0,\"Version\": 0}}",
+      "{ \"header\": { \"IncLen\": 64,\"OrigLen\": 64,\"TsSec\": 1391729470,\"TsUsec\": 991207},\"ipv4header\": { \"Destination\": -55296,\"DestinationAddress\": \"255.255.40.0\",\"Flags\": 0,\"FragmentOffset\": 0,\"HeaderChecksum\": 59167,\"Id\": 62546,\"Ihl\": 0,\"Protocol\": 0,\"Source\": 251694591,\"SourceAddress\": \"15.0.141.255\",\"Tos\": 60,\"TotalLength\": 15899,\"Ttl\": 0,\"Version\": 0}}",
+      "{ \"header\": { \"IncLen\": 78,\"OrigLen\": 78,\"TsSec\": 1391743531,\"TsUsec\": 617746},\"ipv4header\": { \"Destination\": -1407317706,\"DestinationAddress\": \"172.30.9.54\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 12015,\"Id\": 41253,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317711,\"SourceAddress\": \"172.30.9.49\",\"Tos\": 0,\"TotalLength\": 64,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": -854627611,\"Checksum\": 28439,\"DataLength\": 12,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.54\",\"DestinationPort\": 43457,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": -70750910,\"SessionKey\": \"172.30.9.49:9092 -> 172.30.9.54:43457\",\"SourceAddress\": \"172.30.9.49\",\"SourcePort\": 9092,\"TotalLength\": 44,\"UrgentPointer\": 0,\"Window\": 1453}}",
+      "{ \"header\": { \"IncLen\": 78,\"OrigLen\": 78,\"TsSec\": 1391743532,\"TsUsec\": 78633},\"ipv4header\": { \"Destination\": -1407317706,\"DestinationAddress\": \"172.30.9.54\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 26235,\"Id\": 27034,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317712,\"SourceAddress\": \"172.30.9.48\",\"Tos\": 0,\"TotalLength\": 64,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": 965354559,\"Checksum\": 6890,\"DataLength\": 12,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.54\",\"DestinationPort\": 37051,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": -1654276327,\"SessionKey\": \"172.30.9.48:9092 -> 172.30.9.54:37051\",\"SourceAddress\": \"172.30.9.48\",\"SourcePort\": 9092,\"TotalLength\": 44,\"UrgentPointer\": 0,\"Window\": 1453}}",
+      "{ \"header\": { \"IncLen\": 78,\"OrigLen\": 78,\"TsSec\": 1391743634,\"TsUsec\": 784540},\"ipv4header\": { \"Destination\": -1407317710,\"DestinationAddress\": \"172.30.9.50\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 46490,\"Id\": 6784,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317713,\"SourceAddress\": \"172.30.9.47\",\"Tos\": 0,\"TotalLength\": 64,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": -477288801,\"Checksum\": 60687,\"DataLength\": 12,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.50\",\"DestinationPort\": 53561,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": -1890443606,\"SessionKey\": \"172.30.9.47:9092 -> 172.30.9.50:53561\",\"SourceAddress\": \"172.30.9.47\",\"SourcePort\": 9092,\"TotalLength\": 44,\"UrgentPointer\": 0,\"Window\": 1453}}",
+      "{ \"header\": { \"IncLen\": 78,\"OrigLen\": 78,\"TsSec\": 1391743683,\"TsUsec\": 495234},\"ipv4header\": { \"Destination\": -1407317711,\"DestinationAddress\": \"172.30.9.49\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 48322,\"Id\": 4956,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317716,\"SourceAddress\": \"172.30.9.44\",\"Tos\": 0,\"TotalLength\": 64,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": -1825947455,\"Checksum\": 27340,\"DataLength\": 12,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.49\",\"DestinationPort\": 37738,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": -496700614,\"SessionKey\": \"172.30.9.44:9092 -> 172.30.9.49:37738\",\"SourceAddress\": \"172.30.9.44\",\"SourcePort\": 9092,\"TotalLength\": 44,\"UrgentPointer\": 0,\"Window\": 1453}}",
+      "{ \"header\": { \"IncLen\": 78,\"OrigLen\": 78,\"TsSec\": 1391743772,\"TsUsec\": 719493},\"ipv4header\": { \"Destination\": -1407317715,\"DestinationAddress\": \"172.30.9.45\",\"Flags\": 2,\"FragmentOffset\": 0,\"HeaderChecksum\": 39105,\"Id\": 14173,\"Ihl\": 20,\"Protocol\": 6,\"Source\": -1407317712,\"SourceAddress\": \"172.30.9.48\",\"Tos\": 0,\"TotalLength\": 64,\"Ttl\": 64,\"Version\": 4},\"tcpheader\": { \"Ack\": 898627232,\"Checksum\": 57115,\"DataLength\": 12,\"DataOffset\": 8,\"DestinationAddress\": \"172.30.9.45\",\"DestinationPort\": 45629,\"Direction\": null,\"Flags\": 24,\"ReassembledLength \": 0,\"RelativeAck\": 0,\"RelativeSeq\": 0,\"Seq\": 1030775351,\"SessionKey\": \"172.30.9.48:9092 -> 172.30.9.45:45629\",\"SourceAddress\": \"172.30.9.48\",\"SourcePort\": 9092,\"TotalLength\": 44,\"UrgentPointer\": 0,\"Window\": 1453}}" };
+
+  /** The Constant protoCols. */
+  private static final String[] protoCols = { "TCP", "UDP", "SNMP" };
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see backtype.storm.spout.ISpout#open(java.util.Map,
+   * backtype.storm.task.TopologyContext,
+   * backtype.storm.spout.SpoutOutputCollector)
+   */
+  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+
+    System.out.println("Opening PcapSimulatorSpout");
+
+    this.collector = collector;
+
+    if (conf.containsKey("storm.topology.pcap.spout.pcap-kafka-simulator-spout.packet.size.in.bytes")) {
+
+      messageSize = Integer.valueOf(conf.get("storm.topology.pcap.spout.pcap-kafka-simulator-spout.packet.size.in.bytes").toString());
+      pcap = new byte[messageSize];
+
+      System.out.println("Using message size : " + messageSize);
+    }
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see backtype.storm.spout.ISpout#nextTuple()
+   */
+  public void nextTuple() {
+
+    // System.out.println("nextTuple of PcapSimulatorSpout");
+    ipAddr.setLength(0);
+    String srcAddr = ipAddr.append(randomIpSegmentGenerator.nextInt(255)).append('.').append(randomIpSegmentGenerator.nextInt(255))
+        .append('.').append(randomIpSegmentGenerator.nextInt(255)).append('.').append(randomIpSegmentGenerator.nextInt(255)).toString();
+    ipAddr.setLength(0);
+    String dstAddr = ipAddr.append(randomIpSegmentGenerator.nextInt(255)).append('.').append(randomIpSegmentGenerator.nextInt(255))
+        .append('.').append(randomIpSegmentGenerator.nextInt(255)).append('.').append(randomIpSegmentGenerator.nextInt(255)).toString();
+
+    String key = PcapUtils.getSessionKey(srcAddr, dstAddr, String.valueOf(randomProtocolGenerator.nextInt(255)),
+        String.valueOf(randomPortGenerator.nextInt(64000)), String.valueOf(randomPortGenerator.nextInt(64000)), "0", "0");
+
+    jsonDoc = jsonDocs[randomJsonGenerator.nextInt(8)];
+    ts = System.currentTimeMillis() + randomPortGenerator.nextInt();
+    randomPcapGenerator.nextBytes(pcap);
+
+    collector.emit(new Values(srcAddr, key.toString(), jsonDoc, ts, pcap));
+
+    collector.emit("pcap_index_stream", new Values(jsonDoc, key));
+    collector.emit("pcap_header_stream", new Values(jsonDoc, key));
+    collector.emit("pcap_data_stream", new Values(key.toString(), ts, pcap));
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology
+   * .OutputFieldsDeclarer)
+   */
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    System.out.println("Declaring output fields of PcapSimulatorSpout");
+
+    declarer.declareStream("pcap_index_stream", new Fields("index_json"));
+    declarer.declareStream("pcap_header_stream", new Fields("header_json"));
+    declarer.declareStream("pcap_data_stream", new Fields("pcap_id", "timestamp", "pcap"));
+
+  }
+  
+  @Override
+  public void ack(Object id) {
+  }
+
+  @Override
+  public void fail(Object id) {
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Asa.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Asa.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Asa.java
new file mode 100644
index 0000000..ed27764
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Asa.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.metron.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.apache.metron.topology.runner.AsaRunner;
+import com.apache.metron.topology.runner.TopologyRunner;
+
+
+/**
+ * Topology for processing Asa messages
+ *
+ */
+public class Asa{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new AsaRunner();
+		runner.initTopology(args, "asa");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Bro.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Bro.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Bro.java
new file mode 100644
index 0000000..f246adb
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Bro.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.metron.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.apache.metron.topology.runner.BroRunner;
+import com.apache.metron.topology.runner.TopologyRunner;
+
+public class Bro{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new BroRunner();
+		runner.initTopology(args, "bro");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/FireEye.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/FireEye.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/FireEye.java
new file mode 100644
index 0000000..2a7a294
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/FireEye.java
@@ -0,0 +1,21 @@
+package com.apache.metron.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+import backtype.storm.generated.InvalidTopologyException;
+import com.apache.metron.topology.runner.FireEyeRunner;
+import com.apache.metron.topology.runner.TopologyRunner;
+
+
+/**
+ * Topology for processing FireEye syslog messages
+ *
+ */
+public class FireEye {
+
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new FireEyeRunner();
+		runner.initTopology(args, "fireeye");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Ise.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Ise.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Ise.java
new file mode 100644
index 0000000..85099a8
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Ise.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.metron.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.apache.metron.topology.runner.ISERunner;
+import com.apache.metron.topology.runner.TopologyRunner;
+
+/**
+ * Topology for processing Ise messages
+ *
+ */
+public class Ise{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new ISERunner();
+		runner.initTopology(args, "ise");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Lancope.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Lancope.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Lancope.java
new file mode 100644
index 0000000..39cd646
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Lancope.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.metron.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.apache.metron.topology.runner.LancopeRunner;
+import com.apache.metron.topology.runner.TopologyRunner;
+
+
+/**
+ * Topology for processing Lancope messages
+ *
+ */
+public class Lancope{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new LancopeRunner();
+		runner.initTopology(args, "lancope");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/PaloAltoFirewall.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/PaloAltoFirewall.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/PaloAltoFirewall.java
new file mode 100644
index 0000000..822bd60
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/PaloAltoFirewall.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.metron.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.apache.metron.topology.runner.AsaRunner;
+import com.apache.metron.topology.runner.PaloAltoFirewallRunner;
+import com.apache.metron.topology.runner.TopologyRunner;
+
+
+/**
+ * Topology for processing Palo Alto Firewall Syslog messages
+ *
+ */
+public class PaloAltoFirewall {
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new PaloAltoFirewallRunner();
+		runner.initTopology(args, "paloalto");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Pcap.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Pcap.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Pcap.java
new file mode 100644
index 0000000..e01009c
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Pcap.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.metron.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+import com.apache.metron.topology.runner.PcapRunner;
+import com.apache.metron.topology.runner.TopologyRunner;
+
+
+/**
+ * Topology for processing raw packet messages
+ *
+ */
+
+public class Pcap{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new PcapRunner();
+		runner.initTopology(args, "pcap");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Sourcefire.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Sourcefire.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Sourcefire.java
new file mode 100644
index 0000000..b8530a6
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/Sourcefire.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.metron.topology;
+
+import org.apache.commons.configuration.ConfigurationException;
+
+import com.apache.metron.topology.runner.SourcefireRunner;
+import com.apache.metron.topology.runner.TopologyRunner;
+
+import backtype.storm.generated.InvalidTopologyException;
+
+
+/**
+ * Topology for processing Sourcefire messages
+ *
+ */
+public class Sourcefire{
+	
+	public static void main(String[] args) throws ConfigurationException, Exception, InvalidTopologyException {
+		
+		TopologyRunner runner = new SourcefireRunner();
+		runner.initTopology(args, "sourcefire");
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/AsaRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/AsaRunner.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/AsaRunner.java
new file mode 100644
index 0000000..4c4dcb5
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/AsaRunner.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.metron.topology.runner;
+
+import com.apache.metron.filters.GenericMessageFilter;
+import com.apache.metron.parser.interfaces.MessageParser;
+import com.apache.metron.parsing.AbstractParserBolt;
+import com.apache.metron.parsing.TelemetryParserBolt;
+import com.apache.metron.test.spouts.GenericInternalTestSpout;
+
+public class AsaRunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/AsaOutput";
+
+	@Override
+	public boolean initializeParsingBolt(String topology_name,
+			String name) {
+		try {
+			
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[Metron] ------" +  name + " is initializing from " + messageUpstreamComponent);
+
+			
+			String class_name = config.getString("bolt.parser.adapter");
+			
+			if(class_name == null)
+			{
+				System.out.println("[Metron] Parser adapter not set.  Please set bolt.indexing.adapter in topology.conf");
+				throw new Exception("Parser adapter not set");
+			}
+			
+			Class loaded_class = Class.forName(class_name);
+			MessageParser parser = (MessageParser) loaded_class.newInstance();
+			
+	        
+			AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+					.withMessageParser(parser)
+					.withOutputFieldName(topology_name)
+					.withMessageFilter(new GenericMessageFilter())
+					.withMetricConfig(config);
+
+			builder.setBolt(name, parser_bolt,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.shuffleGrouping(messageUpstreamComponent)
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			System.out.println("[Metron] Initializing Test Spout");
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+	
+	
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/BroRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/BroRunner.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/BroRunner.java
new file mode 100644
index 0000000..fee72cd
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/BroRunner.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.metron.topology.runner;
+
+import com.apache.metron.filters.GenericMessageFilter;
+import com.apache.metron.parser.interfaces.MessageParser;
+import com.apache.metron.parsing.AbstractParserBolt;
+import com.apache.metron.parsing.TelemetryParserBolt;
+import com.apache.metron.test.spouts.GenericInternalTestSpout;
+
+public class BroRunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/BroExampleOutput";
+
+	@Override
+	public boolean initializeParsingBolt(String topology_name,
+			String name) {
+		try {
+
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[Metron] ------" +  name + " is initializing from " + messageUpstreamComponent);
+			
+			String class_name = config.getString("bolt.parser.adapter");
+			
+			if(class_name == null)
+			{
+				System.out.println("[Metron] Parser adapter not set.  Please set bolt.indexing.adapter in topology.conf");
+				throw new Exception("Parser adapter not set");
+			}
+			
+			Class loaded_class = Class.forName(class_name);
+			MessageParser parser = (MessageParser) loaded_class.newInstance();
+			
+			AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+					.withMessageParser(parser)
+					.withOutputFieldName(topology_name)
+					.withMessageFilter(new GenericMessageFilter())
+					.withMetricConfig(config);
+
+			builder.setBolt(name, parser_bolt,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.shuffleGrouping(messageUpstreamComponent)
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			System.out.println("[Metron] Initializing Test Spout");
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/FireEyeRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/FireEyeRunner.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/FireEyeRunner.java
new file mode 100644
index 0000000..db59415
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/FireEyeRunner.java
@@ -0,0 +1,77 @@
+package com.apache.metron.topology.runner;
+
+import com.apache.metron.filters.GenericMessageFilter;
+import com.apache.metron.parser.interfaces.MessageParser;
+import com.apache.metron.parsing.AbstractParserBolt;
+import com.apache.metron.parsing.TelemetryParserBolt;
+import com.apache.metron.test.spouts.GenericInternalTestSpout;
+
+public class FireEyeRunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/FireeyeExampleOutput";
+
+	@Override
+	public boolean initializeParsingBolt(String topology_name,
+			String name) {
+		try {
+			
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[Metron] ------" +  name + " is initializing from " + messageUpstreamComponent);
+
+			
+			String class_name = config.getString("bolt.parser.adapter");
+			
+			if(class_name == null)
+			{
+				System.out.println("[Metron] Parser adapter not set.  Please set bolt.indexing.adapter in topology.conf");
+				throw new Exception("Parser adapter not set");
+			}
+			
+			Class loaded_class = Class.forName(class_name);
+			MessageParser parser = (MessageParser) loaded_class.newInstance();
+	        
+	        
+			AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+					.withMessageParser(parser)
+					.withOutputFieldName(topology_name)
+					.withMessageFilter(new GenericMessageFilter())
+					.withMetricConfig(config);
+
+			builder.setBolt(name, parser_bolt,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.shuffleGrouping(messageUpstreamComponent)
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			System.out.println("[Metron] Initializing Test Spout");
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+	
+	
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/ISERunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/ISERunner.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/ISERunner.java
new file mode 100644
index 0000000..3d3d663
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/topology/runner/ISERunner.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.metron.topology.runner;
+
+import com.apache.metron.filters.GenericMessageFilter;
+import com.apache.metron.parser.interfaces.MessageParser;
+import com.apache.metron.parsing.AbstractParserBolt;
+import com.apache.metron.parsing.TelemetryParserBolt;
+import com.apache.metron.test.spouts.GenericInternalTestSpout;
+
+public class ISERunner extends TopologyRunner{
+	
+	 static String test_file_path = "SampleInput/ISESampleOutput";
+
+	@Override
+	public boolean initializeParsingBolt(String topology_name,
+			String name) {
+		try {
+			
+			String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1);
+			
+			System.out.println("[Metron] ------" +  name + " is initializing from " + messageUpstreamComponent);
+
+			
+			String class_name = config.getString("bolt.parser.adapter");
+			
+			if(class_name == null)
+			{
+				System.out.println("[Metron] Parser adapter not set.  Please set bolt.indexing.adapter in topology.conf");
+				throw new Exception("Parser adapter not set");
+			}
+			
+			Class loaded_class = Class.forName(class_name);
+			MessageParser parser = (MessageParser) loaded_class.newInstance();
+			
+			
+			AbstractParserBolt parser_bolt = new TelemetryParserBolt()
+					.withMessageParser(parser)
+					.withOutputFieldName(topology_name)
+					.withMessageFilter(new GenericMessageFilter())
+					.withMetricConfig(config);
+
+			builder.setBolt(name, parser_bolt,
+					config.getInt("bolt.parser.parallelism.hint"))
+					.shuffleGrouping(messageUpstreamComponent)
+					.setNumTasks(config.getInt("bolt.parser.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+
+		return true;
+	}
+
+	@Override	
+	public  boolean initializeTestingSpout(String name) {
+		try {
+
+			System.out.println("[Metron] Initializing Test Spout");
+
+			GenericInternalTestSpout testSpout = new GenericInternalTestSpout()
+					.withFilename(test_file_path).withRepeating(
+							config.getBoolean("spout.test.parallelism.repeat"));
+
+			builder.setSpout(name, testSpout,
+					config.getInt("spout.test.parallelism.hint")).setNumTasks(
+					config.getInt("spout.test.num.tasks"));
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			System.exit(0);
+		}
+		return true;
+	}
+}



Mime
View raw message