metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [13/15] incubator-metron git commit: METRON 86: Adding Solr indexing support (merrimanr via cestella) closes apache/incubator-metron#67
Date Tue, 05 Apr 2016 19:42:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
deleted file mode 100644
index 21ecb18..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/TelemetryIndexingBolt.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.indexing;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.configuration.Configuration;
-import org.json.simple.JSONObject;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import org.apache.metron.helpers.topology.ErrorUtils;
-import org.apache.metron.index.interfaces.IndexAdapter;
-import org.apache.metron.json.serialization.JSONEncoderHelper;
-import org.apache.metron.metrics.MetricReporter;
-
-/**
- * 
- * Bolt for indexing telemetry messages into Elastic Search, Solr, Druid, etc...
- * For a list of all adapters provided please see org.apache.metron.indexing.adapters
- * 
- * As of release of this code the following adapters for indexing are provided:
- * <p>
- * <ul>
- * 
- * <li>ESBulkAdapter = adapter that can bulk index messages into ES
- * <li>ESBulkRotatingAdapter = adapter that can bulk index messages into ES,
- * rotate the index, and apply an alias to the rotated index
- * <ul>
- * <p>
- *
- */
-
-@SuppressWarnings("serial")
-public class TelemetryIndexingBolt extends AbstractIndexingBolt {
-
-	private JSONObject metricConfiguration;
-	private String _indexDateFormat;
-	
-	private Set<Tuple> tuple_queue = new HashSet<Tuple>();
-
-	public TelemetryIndexingBolt(String zookeeperUrl) {
-		super(zookeeperUrl);
-	}
-
-	/**
-	 * 
-	 * @param IndexIP
-	 *            ip of ElasticSearch/Solr/etc...
-	 * @return instance of bolt
-	 */
-	public TelemetryIndexingBolt withIndexIP(String IndexIP) {
-		_IndexIP = IndexIP;
-		return this;
-	}
-
-	/**
-	 * 
-	 * @param IndexPort
-	 *            port of ElasticSearch/Solr/etc...
-	 * @return instance of bolt
-	 */
-
-	public TelemetryIndexingBolt withIndexPort(int IndexPort) {
-		_IndexPort = IndexPort;
-		return this;
-	}
-
-	/**
-	 *
-	 * @param IndexName
-	 *            name of the index in ElasticSearch/Solr/etc...
-	 * @return instance of bolt
-	 */
-	public TelemetryIndexingBolt withIndexName(String IndexName) {
-		_IndexName = IndexName;
-		return this;
-	}
-
-	/**
-	 * 
-	 * @param ClusterName
-	 *            name of cluster to index into in ElasticSearch/Solr/etc...
-	 * @return instance of bolt
-	 */
-	public TelemetryIndexingBolt withClusterName(String ClusterName) {
-		_ClusterName = ClusterName;
-		return this;
-	}
-
-	/**
-	 * 
-	 * @param DocumentName
-	 *            name of document to be indexed in ElasticSearch/Solr/etc...
-	 * @return
-	 */
-
-	public TelemetryIndexingBolt withDocumentName(String DocumentName) {
-		_DocumentName = DocumentName;
-		return this;
-	}
-
-	/**
-	 * 
-	 * @param BulkIndexNumber
-	 *            number of documents to bulk index together
-	 * @return instance of bolt
-	 */
-	public TelemetryIndexingBolt withBulk(int BulkIndexNumber) {
-		_BulkIndexNumber = BulkIndexNumber;
-		return this;
-	}
-
-	/**
-	 * 
-	 * @param adapter
-	 *            adapter that handles indexing of JSON strings
-	 * @return instance of bolt
-	 */
-	public TelemetryIndexingBolt withIndexAdapter(IndexAdapter adapter) {
-		_adapter = adapter;
-
-		return this;
-	}
-	
-	/**
-	 * 
-	 * @param indexTimestamp
-	 *           timestamp to append to index names
-	 * @return instance of bolt
-	 */
-	public TelemetryIndexingBolt withIndexTimestamp(String indexTimestamp) {
-		_indexDateFormat = indexTimestamp;
-
-		return this;
-	}
-	/**
-	 * 
-	 * @param config
-	 *            - configuration for pushing metrics into graphite
-	 * @return instance of bolt
-	 */
-	public TelemetryIndexingBolt withMetricConfiguration(Configuration config) {
-		this.metricConfiguration = JSONEncoderHelper.getJSON(config
-				.subset("org.apache.metron.metrics"));
-		return this;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	void doPrepare(Map conf, TopologyContext topologyContext,
-			OutputCollector collector) throws IOException {
-
-		try {
-			
-			_adapter.initializeConnection(_IndexIP, _IndexPort,
-					_ClusterName, _IndexName, _DocumentName, _BulkIndexNumber, _indexDateFormat);
-			
-			_reporter = new MetricReporter();
-			_reporter.initialize(metricConfiguration,
-					TelemetryIndexingBolt.class);
-			this.registerCounters();
-		} catch (Exception e) {
-			
-			e.printStackTrace();
-					
-			JSONObject error = ErrorUtils.generateErrorMessage(new String("bulk index problem"), e);
-			_collector.emit("error", new Values(error));
-		}
-
-	}
-
-	public void execute(Tuple tuple) {
-
-		JSONObject message = null;
-
-		try {
-			LOG.trace("[Metron] Indexing bolt gets:  " + message);
-
-			message = (JSONObject) tuple.getValueByField("message");
-
-			if (message == null || message.isEmpty())
-				throw new Exception(
-						"Could not parse message from binary stream");
-
-			int result_code = _adapter.bulkIndex(message);
-
-			if (result_code == 0) {
-				tuple_queue.add(tuple);
-			} else if (result_code == 1) {
-				tuple_queue.add(tuple);
-				
-				Iterator<Tuple> iterator = tuple_queue.iterator();
-				while(iterator.hasNext())
-				{
-					Tuple setElement = iterator.next();
-					_collector.ack(setElement);
-					ackCounter.inc();
-				}
-				tuple_queue.clear();
-			} else if (result_code == 2) {
-				throw new Exception("Failed to index elements with client");
-			}
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			
-			
-			Iterator<Tuple> iterator = tuple_queue.iterator();
-			while(iterator.hasNext())
-			{
-				Tuple setElement = iterator.next();
-				_collector.fail(setElement);
-				failCounter.inc();
-				
-				
-				JSONObject error = ErrorUtils.generateErrorMessage(new String("bulk index problem"), e);
-				_collector.emit("error", new Values(error));
-			}
-			tuple_queue.clear();
-
-			
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declearer) {
-		declearer.declareStream("error", new Fields("Index"));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/AbstractIndexAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/AbstractIndexAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/AbstractIndexAdapter.java
deleted file mode 100644
index 58f5bed..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/AbstractIndexAdapter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.indexing.adapters;
-
-import java.io.Serializable;
-
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.metron.index.interfaces.IndexAdapter;
-import org.apache.metron.indexing.AbstractIndexingBolt;
-
-@SuppressWarnings("serial")
-public abstract class AbstractIndexAdapter implements IndexAdapter, Serializable{
-	
-	protected static final Logger _LOG = LoggerFactory
-			.getLogger(AbstractIndexingBolt.class);
-
-
-	
-
-	abstract public boolean initializeConnection(String ip, int port,
-			String cluster_name, String index_name, String document_name,
-			int bulk, String date_format) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBaseBulkAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBaseBulkAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBaseBulkAdapter.java
deleted file mode 100644
index 5e64b86..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBaseBulkAdapter.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.indexing.adapters;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.collections.Bag;
-import org.apache.commons.collections.bag.HashBag;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.json.simple.JSONObject;
-
-@SuppressWarnings("serial")
-public class ESBaseBulkAdapter extends AbstractIndexAdapter implements
-		Serializable {
-
-	private int _bulk_size;
-	private String _index_name;
-	private String _document_name;
-	private String _cluster_name;
-	private int _port;
-	private String _ip;
-	public transient TransportClient client;
-
-	private Bag bulk_set;
-
-	private Settings settings;
-
-	@Override
-	public boolean initializeConnection(String ip, int port,
-			String cluster_name, String index_name, String document_name,
-			int bulk_size, String date_format) throws Exception {
-
-		bulk_set = new HashBag();
-
-		_LOG.trace("[Metron] Initializing ESBulkAdapter...");
-
-		try {
-			_ip = ip;
-			_port = port;
-			_cluster_name = cluster_name;
-			_index_name = index_name;
-			_document_name = document_name;
-			_bulk_size = bulk_size;
-
-			_LOG.trace("[Metron] Bulk indexing is set to: " + _bulk_size);
-
-			settings = ImmutableSettings.settingsBuilder()
-					.put("cluster.name", _cluster_name).build();
-			client = new TransportClient(settings)
-					.addTransportAddress(new InetSocketTransportAddress(_ip,
-							_port));
-
-			return true;
-		} catch (Exception e) {
-			e.printStackTrace();
-			return false;
-		}
-	}
-
-	/**
-	 * @param raw_message
-	 *            message to bulk index in Elastic Search
-	 * @return integer (0) loaded into a bulk queue, (1) bulk indexing executed,
-	 *         (2) error
-	 */
-	@SuppressWarnings("unchecked")
-	public int bulkIndex(JSONObject raw_message) {
-
-		boolean success = true;
-		int set_size = 0;
-
-		synchronized (bulk_set) {
-			bulk_set.add(raw_message);
-			set_size = bulk_set.size();
-			
-			_LOG.trace("[Metron] Bulk size is now: " + bulk_set.size());
-		}
-
-		try {
-
-			if (set_size >= _bulk_size) {
-				success = doIndex();
-
-				if (success)
-					return 1;
-				else
-					return 2;
-			}
-
-			return 0;
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			return 2;
-		}
-	}
-
-	public boolean doIndex() throws Exception {
-
-		try {
-
-			synchronized (bulk_set) {
-				if (client == null)
-					throw new Exception("client is null");
-
-				BulkRequestBuilder bulkRequest = client.prepareBulk();
-
-				Iterator<JSONObject> iterator = bulk_set.iterator();
-
-				while (iterator.hasNext()) {
-					JSONObject setElement = iterator.next();
-
-					IndexRequestBuilder a = client.prepareIndex(_index_name,
-							_document_name);
-					a.setSource(setElement.toString());
-					bulkRequest.add(a);
-
-				}
-
-				_LOG.trace("[Metron] Performing bulk load of size: "
-						+ bulkRequest.numberOfActions());
-
-				BulkResponse resp = bulkRequest.execute().actionGet();
-				_LOG.trace("[Metron] Received bulk response: "
-						+ resp.toString());
-				bulk_set.clear();
-			}
-
-			return true;
-		}
-
-		catch (Exception e) {
-			e.printStackTrace();
-			return false;
-		}
-	}
-
-	public void setOptionalSettings(Map<String, String> settings) {
-		// TODO Auto-generated method stub
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java
deleted file mode 100644
index 1f8c50e..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESBulkRotatingAdapter.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.indexing.adapters;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.util.EntityUtils;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.json.simple.JSONObject;
-
-@SuppressWarnings({ "deprecation", "serial" })
-public class ESBulkRotatingAdapter extends AbstractIndexAdapter {
-
-	private Client client;
-	private BulkRequestBuilder bulkRequest;
-	private int _bulk_size;
-	private String _index_name;
-	private String _document_name;
-	private int element_count;
-	private String index_postfix;
-	private String running_index_postfix;
-
-	private HttpClient httpclient;
-	private HttpPost post;
-
-	private DateFormat dateFormat;
-
-	public boolean initializeConnection(String ip, int port,
-			String cluster_name, String index_name, String document_name,
-			int bulk_size, String date_format) {
-
-		_LOG.info("Initializing ESBulkAdapter...");
-
-		try {
-			httpclient = new DefaultHttpClient();
-			String alias_link = "http://" + ip + ":" + 9200 + "/_aliases";
-			post = new HttpPost(alias_link);
-
-			_index_name = index_name;
-			_document_name = document_name;
-
-			_bulk_size = bulk_size - 1;
-			
-
-			dateFormat = new SimpleDateFormat(date_format);
-			
-			element_count = 0;
-			running_index_postfix = "NONE";
-
-			Settings settings = ImmutableSettings.settingsBuilder()
-					.put("cluster.name", cluster_name).build();
-			client = new TransportClient(settings)
-					.addTransportAddress(new InetSocketTransportAddress(ip,
-							port));
-
-			bulkRequest = client.prepareBulk();
-
-			return true;
-		} catch (Exception e) {
-			e.printStackTrace();
-			return false;
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	public int bulkIndex(JSONObject raw_message) {
-
-		index_postfix = dateFormat.format(new Date());
-
-		bulkRequest.add(client.prepareIndex(_index_name + "_" + index_postfix,
-				_document_name).setSource(raw_message));
-
-		return doIndex();
-	}
-
-	public int bulkIndex(String raw_message) {
-
-		index_postfix = dateFormat.format(new Date());
-
-		bulkRequest.add(client.prepareIndex(_index_name + "_" + index_postfix,
-				_document_name).setSource(raw_message));
-
-		return doIndex();
-	}
-
-	public int doIndex() {
-
-		element_count++;
-		
-		if (element_count != _bulk_size)
-			return 0;
-
-		if (element_count == _bulk_size) {
-			_LOG.debug("Starting bulk load of size: " + _bulk_size);
-			BulkResponse resp = bulkRequest.execute().actionGet();
-			element_count = 0;
-			_LOG.debug("Received bulk response: " + resp.toString());
-
-			if (resp.hasFailures()) {
-				_LOG.error("Bulk update failed");
-				return 2;
-			}
-
-			if (!running_index_postfix.equals(index_postfix)) {
-
-				_LOG.debug("Attempting to apply a new alias");
-
-				try {
-
-					String alias = "{\"actions\" : [{ \"add\" : { \"index\" : \""
-							+ _index_name
-							+ "-"
-							+ index_postfix
-							+ "\", \"alias\" : \"" + _index_name + "\" } } ]}";
-
-					post.setEntity(new StringEntity(alias));
-
-					HttpResponse response = httpclient.execute(post);
-					String res = EntityUtils.toString(response.getEntity());
-
-					_LOG.debug("Alias request received the following response: "
-							+ res);
-
-					running_index_postfix = index_postfix;
-				}
-
-				catch (Exception e) {
-					e.printStackTrace();
-					_LOG.error("Alias request failed...");
-					return 2;
-				}
-			}
-
-			index_postfix = dateFormat.format(new Date());
-		}
-
-		_LOG.debug("Adding to bulk load: element " + element_count
-				+ " of bulk size " + _bulk_size);
-
-		return 1;
-	}
-
-	public void setOptionalSettings(Map<String, String> settings) {
-		// TODO Auto-generated method stub
-		
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java
deleted file mode 100644
index fd4c067..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/ESTimedRotatingAdapter.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.indexing.adapters;
-
-import java.io.Serializable;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.collections.Bag;
-import org.apache.commons.collections.HashBag;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.json.simple.JSONObject;
-
-@SuppressWarnings("serial")
-public class ESTimedRotatingAdapter extends AbstractIndexAdapter implements
-		Serializable {
-
-	private int _bulk_size;
-	private String _index_name;
-	private String _document_name;
-	private String _cluster_name;
-	private int _port;
-	private String _ip;
-	public transient TransportClient client;
-	private DateFormat dateFormat;
-	
-	private Map<String, String> tuning_settings;
-
-	private Bag bulk_set;
-
-	private Settings settings;
-	
-	public void setOptionalSettings(Map<String, String> settings)
-	{
-		tuning_settings = settings;
-	}
-
-	@Override
-	public boolean initializeConnection(String ip, int port,
-			String cluster_name, String index_name, String document_name,
-			int bulk_size, String date_format) throws Exception {
-
-		bulk_set = new HashBag();
-
-		_LOG.trace("[Metron] Initializing ESBulkAdapter...");
-
-		try {
-			_ip = ip;
-			_port = port;
-			_cluster_name = cluster_name;
-			_index_name = index_name;
-			_document_name = document_name;
-			_bulk_size = bulk_size;
-			
-
-			dateFormat = new SimpleDateFormat(date_format);
-
-			System.out.println("Bulk indexing is set to: " + _bulk_size);
-
-			ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder() ;	
-			
-			if(tuning_settings != null && tuning_settings.size() > 0)
-			{
-					builder.put(tuning_settings);
-			}
-			
-			builder.put("cluster.name", _cluster_name);
-			builder.put("client.transport.ping_timeout","500s");
-			
-			
-			settings = builder.build();
-					
-			client = new TransportClient(settings)
-					.addTransportAddress(new InetSocketTransportAddress(_ip,
-							_port));
-
-			return true;
-		} catch (Exception e) {
-			e.printStackTrace();
-			return false;
-		}
-	}
-
-	/**
-	 * @param raw_message
-	 *            message to bulk index in Elastic Search
-	 * @return integer (0) loaded into a bulk queue, (1) bulk indexing executed,
-	 *         (2) error
-	 */
-	@SuppressWarnings("unchecked")
-	public int bulkIndex(JSONObject raw_message) {
-
-		boolean success = true;
-		int set_size = 0;
-
-		synchronized (bulk_set) {
-			bulk_set.add(raw_message);
-			set_size = bulk_set.size();
-			
-			_LOG.trace("[Metron] Incremented bulk size to: " + bulk_set.size());
-		}
-
-		try {
-
-			if (set_size >= _bulk_size) {
-				success = doIndex();
-
-				if (success)
-					return 1;
-				else
-					return 2;
-			}
-
-			return 0;
-
-		} catch (Exception e) {
-			e.printStackTrace();
-			return 2;
-		}
-	}
-
-	public boolean doIndex() throws Exception {
-
-		try {
-
-			synchronized (bulk_set) {
-				if (client == null)
-					throw new Exception("client is null");
-
-				BulkRequestBuilder bulkRequest = client.prepareBulk();
-
-				Iterator<JSONObject> iterator = bulk_set.iterator();
-				
-				String index_postfix = dateFormat.format(new Date());
-
-				while (iterator.hasNext()) {
-					JSONObject setElement = iterator.next();
-					
-					_LOG.trace("[Metron] Flushing to index: " + _index_name+ "_" + index_postfix);
-
-					IndexRequestBuilder a = client.prepareIndex(_index_name+ "_" + index_postfix,
-							_document_name);
-					a.setSource(setElement.toString());
-					bulkRequest.add(a);
-
-				}
-
-				_LOG.trace("[Metron] Performing bulk load of size: "
-						+ bulkRequest.numberOfActions());
-
-				BulkResponse resp = bulkRequest.execute().actionGet();
-				
-				for(BulkItemResponse r: resp.getItems())
-				{
-					r.getResponse();
-					_LOG.trace("[Metron] ES SUCCESS MESSAGE: " + r.getFailureMessage());
-				}
-
-
-				bulk_set.clear();
-				
-				if (resp.hasFailures()) {
-					_LOG.error("[Metron] Received bulk response error: "
-							+ resp.buildFailureMessage());
-					
-					for(BulkItemResponse r: resp.getItems())
-					{
-						r.getResponse();
-						_LOG.error("[Metron] ES FAILURE MESSAGE: " + r.getFailureMessage());
-					}
-				}
-				
-			}
-
-			return true;
-		}
-
-		catch (Exception e) {
-			e.printStackTrace();
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/SolrAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/SolrAdapter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/SolrAdapter.java
deleted file mode 100644
index 13f02f4..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/indexing/adapters/SolrAdapter.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.indexing.adapters;
-
-public class SolrAdapter {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
deleted file mode 100644
index e8d654d..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.writer;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.metron.domain.SourceConfig;
-import org.apache.metron.writer.interfaces.BulkMessageWriter;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
-
-  private String clusterName;
-  private Map<String, String> optionalSettings;
-  private transient TransportClient client;
-  private String host;
-  private int port;
-  private SimpleDateFormat dateFormat;
-  private static final Logger LOG = LoggerFactory
-          .getLogger(ElasticsearchWriter.class);
-
-  public ElasticsearchWriter(String clusterName, String host, int port, String dateFormat) {
-    this.clusterName = clusterName;
-    this.host = host;
-    this.port = port;
-    this.dateFormat = new SimpleDateFormat(dateFormat);
-  }
-
-  public ElasticsearchWriter withOptionalSettings(Map<String, String> optionalSettings) {
-    this.optionalSettings = optionalSettings;
-    return this;
-  }
-
-  @Override
-  public void init(Map stormConf) {
-    ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
-    builder.put("cluster.name", clusterName);
-    builder.put("client.transport.ping_timeout","500s");
-    if (optionalSettings != null) {
-      builder.put(optionalSettings);
-    }
-    client = new TransportClient(builder.build())
-            .addTransportAddress(new InetSocketTransportAddress(host, port))
-            ;
-
-  }
-
-  @Override
-  public void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
-    String indexPostfix = dateFormat.format(new Date());
-    BulkRequestBuilder bulkRequest = client.prepareBulk();
-    for(JSONObject message: messages) {
-      String indexName = sourceType;
-      if (configuration != null) {
-        indexName = configuration.getIndex();
-      }
-      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName + "_index_" + indexPostfix,
-              sourceType);
-
-      indexRequestBuilder.setSource(message.toJSONString());
-      bulkRequest.add(indexRequestBuilder);
-    }
-    BulkResponse resp = bulkRequest.execute().actionGet();
-    if (resp.hasFailures()) {
-      throw new Exception(resp.buildFailureMessage());
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    client.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index d2cc827..591f9e3 100644
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -18,7 +18,7 @@
 package org.apache.metron.writer.hdfs;
 
 import backtype.storm.tuple.Tuple;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
 import org.apache.metron.writer.interfaces.BulkMessageWriter;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
@@ -62,13 +62,13 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public void init(Map stormConfig) {
+  public void init(Map stormConfig, Configurations configurations) {
     this.stormConfig = stormConfig;
   }
 
   @Override
   public void write( String sourceType
-                   , SourceConfig configuration
+                   , Configurations configurations
                    , List<Tuple> tuples
                    , List<JSONObject> messages
                    ) throws Exception

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/ParserBolt.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/ParserBolt.java
index 27294ef..abdb207 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/ParserBolt.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/bolt/ParserBolt.java
@@ -38,12 +38,12 @@ public class ParserBolt extends ConfiguredBolt {
   private MessageParser<JSONObject> parser;
   private MessageFilter<JSONObject> filter = new GenericMessageFilter();
   private MessageWriter<JSONObject> writer;
-  private String sourceType;
+  private String sensorType;
 
-  public ParserBolt(String zookeeperUrl, String sourceType, MessageParser<JSONObject> parser, MessageWriter<JSONObject> writer) {
+  public ParserBolt(String zookeeperUrl, String sensorType, MessageParser<JSONObject> parser, MessageWriter<JSONObject> writer) {
     super(zookeeperUrl);
     this.parser = parser;
-    this.sourceType = sourceType;
+    this.sensorType = sensorType;
     this.writer = writer;
   }
 
@@ -70,8 +70,8 @@ public class ParserBolt extends ConfiguredBolt {
       for(JSONObject message: messages) {
         if (parser.validate(message)) {
           if (filter != null && filter.emitTuple(message)) {
-            message.put(Constants.SOURCE_TYPE, sourceType);
-            writer.write(sourceType, configurations.get(sourceType), tuple, message);
+            message.put(Constants.SENSOR_TYPE, sensorType);
+            writer.write(sensorType, configurations, tuple, message);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/writer/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/writer/KafkaWriter.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/writer/KafkaWriter.java
index 8372e14..ec323d6 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/writer/KafkaWriter.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/writer/KafkaWriter.java
@@ -21,7 +21,7 @@ import backtype.storm.tuple.Tuple;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.metron.Constants;
-import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.domain.Configurations;
 import org.apache.metron.writer.interfaces.MessageWriter;
 import org.json.simple.JSONObject;
 
@@ -68,7 +68,7 @@ public class KafkaWriter implements MessageWriter<JSONObject>, Serializable {
 
   @SuppressWarnings("unchecked")
   @Override
-  public void write(String sourceType, SourceConfig configuration, Tuple tuple, JSONObject message) throws Exception {
+  public void write(String sourceType, Configurations configurations, Tuple tuple, JSONObject message) throws Exception {
     kafkaProducer.send(new ProducerRecord<String, String>(Constants.ENRICHMENT_TOPIC, message.toJSONString()));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/pom.xml b/metron-streaming/Metron-Solr/pom.xml
new file mode 100644
index 0000000..cbb7395
--- /dev/null
+++ b/metron-streaming/Metron-Solr/pom.xml
@@ -0,0 +1,204 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>Metron-Streaming</artifactId>
+        <version>0.1BETA</version>
+    </parent>
+    <artifactId>Metron-Solr</artifactId>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-Common</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-solrj</artifactId>
+            <version>${global_solr_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.solr</groupId>
+            <artifactId>solr-test-framework</artifactId>
+            <version>${global_solr_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-Testing</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-Topologies</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${global_mockito_version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <!-- Separates the unit tests from the integration tests. -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.12.4</version>
+                <configuration>
+                    <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
+                    <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+                    <skip>true</skip>
+                    <!-- Show 100% of the lines from the stack trace (doesn't work) -->
+                    <trimStackTrace>false</trimStackTrace>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>unit-tests</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Never skip running the tests when the test phase is invoked -->
+                            <skip>false</skip>
+                            <includes>
+                                <!-- Include unit tests within integration-test phase. -->
+                                <include>**/*Test.java</include>
+                            </includes>
+                            <excludes>
+                                <!-- Exclude integration tests within (unit) test phase. -->
+                                <exclude>**/*IntegrationTest.java</exclude>
+                            </excludes>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>integration-tests</id>
+                        <phase>integration-test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Never skip running the tests when the integration-test phase is invoked -->
+                            <skip>false</skip>
+                            <includes>
+                                <!-- Include integration tests within integration-test phase. -->
+                                <include>**/*IntegrationTest.java</include>
+                            </includes>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.3</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>storm:storm-core:*</exclude>
+                                    <exclude>storm:storm-lib:*</exclude>
+                                    <exclude>org.slf4j.impl*</exclude>
+                                    <exclude>org.slf4j:slf4j-log4j*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                    <resource>.yaml</resource>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/main/assembly/assembly.xml</descriptor>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/main/assembly/assembly.xml b/metron-streaming/Metron-Solr/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..35cbcc3
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/main/assembly/assembly.xml
@@ -0,0 +1,41 @@
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<assembly>
+  <id>archive</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${project.basedir}/src/main/resources/Metron_Configs/etc</directory>
+      <outputDirectory>/config/etc</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0644</fileMode>
+      <lineEnding>unix</lineEnding>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/target</directory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+      <outputDirectory>/lib</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+    </fileSet>
+  </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/solr/SolrConstants.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/solr/SolrConstants.java b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/solr/SolrConstants.java
new file mode 100644
index 0000000..d5dc7a0
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/solr/SolrConstants.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.solr;
+
+public class SolrConstants {
+
+  public static final String REQUEST_ACTION = "action";
+  public static final String REQUEST_NAME = "name";
+  public static final String REQUEST_NUM_SHARDS = "numShards";
+  public static final String REQUEST_REPLICATION_FACTOR = "replicationFactor";
+  public static final String REQUEST_COLLECTION_CONFIG_NAME = "collection.configName";
+  public static final String REQUEST_COLLECTIONS_PATH = "/admin/collections";
+  public static final String RESPONSE_COLLECTIONS = "collections";
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/MetronSolrClient.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/MetronSolrClient.java b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/MetronSolrClient.java
new file mode 100644
index 0000000..e0485ab
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/MetronSolrClient.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.solr;
+
+import org.apache.log4j.Logger;
+import org.apache.metron.solr.SolrConstants;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+
+import java.io.IOException;
+import java.util.List;
+
+public class MetronSolrClient extends CloudSolrClient {
+
+  private static final Logger LOG = Logger.getLogger(MetronSolrClient.class);
+
+
+  public MetronSolrClient(String zkHost) {
+    super(zkHost);
+  }
+
+  public void createCollection(String name, int numShards, int replicationFactor) throws IOException, SolrServerException {
+    if (!listCollections().contains(name)) {
+      request(getCreateCollectionsRequest(name, numShards, replicationFactor));
+    }
+  }
+
+  public QueryRequest getCreateCollectionsRequest(String name, int numShards, int replicationFactor) {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(SolrConstants.REQUEST_ACTION, CollectionParams.CollectionAction.CREATE.name());
+    params.set(SolrConstants.REQUEST_NAME, name);
+    params.set(SolrConstants.REQUEST_NUM_SHARDS, numShards);
+    params.set(SolrConstants.REQUEST_REPLICATION_FACTOR, replicationFactor);
+    params.set(SolrConstants.REQUEST_COLLECTION_CONFIG_NAME, name);
+    QueryRequest request = new QueryRequest(params);
+    request.setPath(SolrConstants.REQUEST_COLLECTIONS_PATH);
+    return request;
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<String> listCollections() throws IOException, SolrServerException {
+    NamedList<Object> response = request(getListCollectionsRequest(), null);
+    return (List<String>) response.get(SolrConstants.RESPONSE_COLLECTIONS);
+  }
+
+  public QueryRequest getListCollectionsRequest() {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(SolrConstants.REQUEST_ACTION, CollectionParams.CollectionAction.LIST.name());
+    QueryRequest request = new QueryRequest(params);
+    request.setPath(SolrConstants.REQUEST_COLLECTIONS_PATH);
+    return request;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/SolrWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/SolrWriter.java b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/SolrWriter.java
new file mode 100644
index 0000000..68303ea
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/main/java/org/apache/metron/writer/solr/SolrWriter.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.solr;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.log4j.Logger;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.writer.interfaces.BulkMessageWriter;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {
+
+  public static final String DEFAULT_COLLECTION = "metron";
+
+  private static final Logger LOG = Logger.getLogger(SolrWriter.class);
+
+  private boolean shouldCommit = false;
+  private MetronSolrClient solr;
+
+  public SolrWriter withShouldCommit(boolean shouldCommit) {
+    this.shouldCommit = shouldCommit;
+    return this;
+  }
+
+  public SolrWriter withMetronSolrClient(MetronSolrClient solr) {
+    this.solr = solr;
+    return this;
+  }
+
+  @Override
+  public void init(Map stormConf, Configurations configurations) throws IOException, SolrServerException {
+    Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
+    if(solr == null) solr = new MetronSolrClient((String) globalConfiguration.get("solr.zookeeper"));
+    String collection = getCollection(configurations);
+    solr.createCollection(collection, (Integer) globalConfiguration.get("solr.numShards"), (Integer) globalConfiguration.get("solr.replicationFactor"));
+    solr.setDefaultCollection(collection);
+  }
+
+  @Override
+  public void write(String sourceType, Configurations configurations, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
+    for(JSONObject message: messages) {
+      SolrInputDocument document = new SolrInputDocument();
+      document.addField("id", getIdValue(message));
+      document.addField("sensorType", sourceType);
+      for(Object key: message.keySet()) {
+        Object value = message.get(key);
+        document.addField(getFieldName(key, value), value);
+      }
+      UpdateResponse response = solr.add(document);
+    }
+    if (shouldCommit) {
+      solr.commit(getCollection(configurations));
+    }
+  }
+
+  protected String getCollection(Configurations configurations) {
+    String collection = (String) configurations.getGlobalConfig().get("solr.collection");
+    return collection != null ? collection : DEFAULT_COLLECTION;
+  }
+
+  private int getIdValue(JSONObject message) {
+    return message.toJSONString().hashCode();
+  }
+
+  protected String getFieldName(Object key, Object value) {
+    String field;
+    if (value instanceof Integer) {
+      field = key + "_i";
+    } else if (value instanceof Long) {
+      field = key + "_l";
+    } else if (value instanceof Float) {
+      field = key + "_f";
+    } else if (value instanceof Double) {
+      field = key + "_d";
+    } else {
+      field = key + "_s";
+    }
+    return field;
+  }
+
+  @Override
+  public void close() throws Exception {
+    solr.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/main/resources/Metron_Configs/etc/env/solr.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/main/resources/Metron_Configs/etc/env/solr.properties b/metron-streaming/Metron-Solr/src/main/resources/Metron_Configs/etc/env/solr.properties
new file mode 100644
index 0000000..df25506
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/main/resources/Metron_Configs/etc/env/solr.properties
@@ -0,0 +1,109 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+##### Kafka #####
+
+kafka.zk=node1:2181
+kafka.broker=node1:6667
+spout.kafka.topic.asa=asa
+spout.kafka.topic.bro=bro
+spout.kafka.topic.fireeye=fireeye
+spout.kafka.topic.ise=ise
+spout.kafka.topic.lancope=lancope
+spout.kafka.topic.paloalto=paloalto
+spout.kafka.topic.pcap=pcap
+spout.kafka.topic.snort=snort
+spout.kafka.topic.yaf=yaf
+
+##### Indexing #####
+writer.class.name=org.apache.metron.writer.solr.SolrWriter
+
+##### ElasticSearch #####
+
+es.ip=10.22.0.214
+es.port=9300
+es.clustername=elasticsearch
+
+##### MySQL #####
+
+mysql.ip=10.22.0.214
+mysql.port=3306
+mysql.username=root
+mysql.password=hadoop123
+
+##### Metrics #####
+
+#reporters
+org.apache.metron.metrics.reporter.graphite=true
+org.apache.metron.metrics.reporter.console=false
+org.apache.metron.metrics.reporter.jmx=false
+
+#Graphite Addresses
+
+org.apache.metron.metrics.graphite.address=localhost
+org.apache.metron.metrics.graphite.port=2023
+
+#TelemetryParserBolt
+org.apache.metron.metrics.TelemetryParserBolt.acks=true
+org.apache.metron.metrics.TelemetryParserBolt.emits=true
+org.apache.metron.metrics.TelemetryParserBolt.fails=true
+
+
+#GenericEnrichmentBolt
+org.apache.metron.metrics.GenericEnrichmentBolt.acks=true
+org.apache.metron.metrics.GenericEnrichmentBolt.emits=true
+org.apache.metron.metrics.GenericEnrichmentBolt.fails=true
+
+
+#TelemetryIndexingBolt
+org.apache.metron.metrics.TelemetryIndexingBolt.acks=true
+org.apache.metron.metrics.TelemetryIndexingBolt.emits=true
+org.apache.metron.metrics.TelemetryIndexingBolt.fails=true
+
+##### Host Enrichment #####
+
+org.apache.metron.enrichment.host.known_hosts=[{"ip":"10.1.128.236", "local":"YES", "type":"webserver", "asset_value" : "important"},\
+{"ip":"10.1.128.237", "local":"UNKNOWN", "type":"unknown", "asset_value" : "important"},\
+{"ip":"10.60.10.254", "local":"YES", "type":"printer", "asset_value" : "important"}]
+
+##### HDFS #####
+
+bolt.hdfs.batch.size=5000
+bolt.hdfs.field.delimiter=|
+bolt.hdfs.file.rotation.size.in.mb=5
+bolt.hdfs.file.system.url=hdfs://iot01.cloud.hortonworks.com:8020
+bolt.hdfs.wip.file.path=/paloalto/wip
+bolt.hdfs.finished.file.path=/paloalto/rotated
+bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec
+index.hdfs.output=/tmp/metron/enriched
+
+##### HBase #####
+bolt.hbase.table.name=pcap
+bolt.hbase.table.fields=t:value
+bolt.hbase.table.key.tuple.field.name=key
+bolt.hbase.table.timestamp.tuple.field.name=timestamp
+bolt.hbase.enable.batching=false
+bolt.hbase.write.buffer.size.in.bytes=2000000
+bolt.hbase.durability=SKIP_WAL
+bolt.hbase.partitioner.region.info.refresh.interval.mins=60
+
+##### Threat Intel #####
+
+threat.intel.tracker.table=
+threat.intel.tracker.cf=
+threat.intel.ip.table=
+threat.intel.ip.cf=

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
new file mode 100644
index 0000000..afeb56b
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/SolrEnrichmentIntegrationTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration;
+
+import com.google.common.base.Function;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.integration.util.integration.ComponentRunner;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.Processor;
+import org.apache.metron.integration.util.integration.ReadinessState;
+import org.apache.metron.integration.util.integration.components.SolrComponent;
+import org.apache.metron.util.SampleUtil;
+import org.apache.metron.utils.ConfigurationsUtils;
+import org.apache.metron.utils.JSONUtils;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class SolrEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
+
+  private String collection = "metron";
+  private String solrZookeeperUrl;
+
+  @Override
+  public InMemoryComponent getSearchComponent(final Properties topologyProperties) throws Exception {
+    SolrComponent solrComponent = new SolrComponent.Builder()
+            .addCollection(collection, "../Metron-Solr/src/test/resources/solr/conf")
+            .withPostStartCallback(new Function<SolrComponent, Void>() {
+              @Nullable
+              @Override
+              public Void apply(@Nullable SolrComponent solrComponent) {
+                topologyProperties.setProperty("solr.zk", solrComponent.getZookeeperUrl());
+                try {
+                  String testZookeeperUrl = topologyProperties.getProperty("kafka.zk");
+                  Configurations configurations = SampleUtil.getSampleConfigs();
+                  Map<String, Object> globalConfig = configurations.getGlobalConfig();
+                  globalConfig.put("solr.zookeeper", solrComponent.getZookeeperUrl());
+                  ConfigurationsUtils.writerGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), testZookeeperUrl);
+                } catch (Exception e) {
+                  e.printStackTrace();
+                }
+                return null;
+              }
+            })
+            .build();
+    return solrComponent;
+  }
+
+  @Override
+  Processor<List<Map<String, Object>>> getProcessor(final List<byte[]> inputMessages) {
+    return new Processor<List<Map<String, Object>>>() {
+      List<Map<String, Object>> docs = null;
+      public ReadinessState process(ComponentRunner runner) {
+        SolrComponent solrComponent = runner.getComponent("search", SolrComponent.class);
+        if (solrComponent.hasCollection(collection)) {
+          List<Map<String, Object>> docsFromDisk;
+          try {
+            docs = solrComponent.getAllIndexedDocs(collection);
+            docsFromDisk = EnrichmentIntegrationTest.readDocsFromDisk(hdfsDir);
+            System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
+          } catch (IOException e) {
+            throw new IllegalStateException("Unable to retrieve indexed documents.", e);
+          }
+          if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
+            return ReadinessState.NOT_READY;
+          } else {
+            return ReadinessState.READY;
+          }
+        } else {
+          return ReadinessState.NOT_READY;
+        }
+      }
+
+      public List<Map<String, Object>> getResult() {
+        return docs;
+      }
+    };
+  }
+
+  @Override
+  void setAdditionalProperties(Properties topologyProperties) {
+    topologyProperties.setProperty("writer.class.name", "org.apache.metron.writer.solr.SolrWriter");
+  }
+
+  @Override
+  public String cleanField(String field) {
+    return field.replaceFirst("_[dfils]$", "");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/util/integration/components/SolrComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/util/integration/components/SolrComponent.java b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/util/integration/components/SolrComponent.java
new file mode 100644
index 0000000..f2b9748
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/integration/util/integration/components/SolrComponent.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.components;
+
+import com.google.common.base.Function;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.UnableToStartException;
+import org.apache.metron.writer.solr.MetronSolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettyConfig;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.common.SolrDocument;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SolrComponent implements InMemoryComponent {
+
+  public static class Builder {
+    private int port = 8983;
+    private String solrXmlPath = "../Metron-Solr/src/test/resources/solr/solr.xml";
+    private Map<String, String> collections = new HashMap<>();
+    private Function<SolrComponent, Void> postStartCallback;
+
+    public Builder withPort(int port) {
+      this.port = port;
+      return this;
+    }
+
+    public Builder withSolrXmlPath(String solrXmlPath) {
+      this.solrXmlPath = solrXmlPath;
+      return this;
+    }
+
+    public Builder addCollection(String name, String configPath) {
+      collections.put(name, configPath);
+      return this;
+    }
+
+    public Builder withPostStartCallback(Function<SolrComponent, Void> f) {
+      postStartCallback = f;
+      return this;
+    }
+
+    public SolrComponent build() throws Exception {
+      if (collections.isEmpty()) throw new Exception("Must add at least 1 collection");
+      return new SolrComponent(port, solrXmlPath, collections, postStartCallback);
+    }
+  }
+
+  private int port;
+  private String solrXmlPath;
+  private Map<String, String> collections;
+  private MiniSolrCloudCluster miniSolrCloudCluster;
+  private Function<SolrComponent, Void> postStartCallback;
+
+  private SolrComponent(int port, String solrXmlPath, Map<String, String> collections, Function<SolrComponent, Void> postStartCallback) throws Exception {
+    this.port = port;
+    this.solrXmlPath = solrXmlPath;
+    this.collections = collections;
+    this.postStartCallback = postStartCallback;
+  }
+
+  @Override
+  public void start() throws UnableToStartException {
+    try {
+      File baseDir = Files.createTempDirectory("solrcomponent").toFile();
+      baseDir.deleteOnExit();
+      miniSolrCloudCluster = new MiniSolrCloudCluster(1, baseDir, new File(solrXmlPath), JettyConfig.builder().setPort(port).build());
+      for(String name: collections.keySet()) {
+        String configPath = collections.get(name);
+        miniSolrCloudCluster.uploadConfigDir(new File(configPath), name);
+      }
+      miniSolrCloudCluster.createCollection("metron", 1, 1, "metron", new HashMap<String, String>());
+      if (postStartCallback != null) postStartCallback.apply(this);
+    } catch(Exception e) {
+      throw new UnableToStartException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    try {
+      miniSolrCloudCluster.shutdown();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public MetronSolrClient getSolrClient() {
+    return new MetronSolrClient(getZookeeperUrl());
+  }
+
+  public MiniSolrCloudCluster getMiniSolrCloudCluster() {
+    return this.miniSolrCloudCluster;
+  }
+
+  public String getZookeeperUrl() {
+    return miniSolrCloudCluster.getZkServer().getZkAddress();
+  }
+
+  public boolean hasCollection(String collection) {
+    MetronSolrClient solr = getSolrClient();
+    boolean collectionFound = false;
+    try {
+      collectionFound = solr.listCollections().contains(collection);
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+    return collectionFound;
+  }
+
+  public List<Map<String, Object>> getAllIndexedDocs(String collection) {
+    List<Map<String, Object>> docs = new ArrayList<>();
+    CloudSolrClient solr = miniSolrCloudCluster.getSolrClient();
+    solr.setDefaultCollection(collection);
+    SolrQuery parameters = new SolrQuery();
+    parameters.set("q", "*:*");
+    try {
+      solr.commit();
+      QueryResponse response = solr.query(parameters);
+      for (SolrDocument solrDocument : response.getResults()) {
+        docs.add(solrDocument);
+      }
+    } catch (SolrServerException | IOException e) {
+      e.printStackTrace();
+    }
+    return docs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/MetronSolrClientTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/MetronSolrClientTest.java b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/MetronSolrClientTest.java
new file mode 100644
index 0000000..7bd3ac6
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/MetronSolrClientTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.solr;
+
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.NamedList;
+import org.hamcrest.Description;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class MetronSolrClientTest {
+
+  class CollectionRequestMatcher extends ArgumentMatcher<QueryRequest> {
+
+    private String name;
+
+    public CollectionRequestMatcher(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      QueryRequest queryRequest = (QueryRequest) o;
+      return name.equals(queryRequest.getParams().get("action"));
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText(name);
+    }
+  }
+
+  @Test
+  public void testClient() throws Exception {
+
+    final String collection = "metron";
+    String zookeeperUrl = "zookeeperUrl";
+    MetronSolrClient metronSolrClient = Mockito.spy(new MetronSolrClient(zookeeperUrl));
+
+    Mockito.doReturn(new NamedList<Object>() {{
+      add("collections", new ArrayList<String>() {{
+        add(collection);
+      }});
+    }}).when(metronSolrClient).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.LIST.name())), (String) isNull());
+    metronSolrClient.createCollection(collection, 1, 1);
+    verify(metronSolrClient, times(1)).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.LIST.name())), (String) isNull());
+    verify(metronSolrClient, times(0)).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.CREATE.name())), (String) isNull());
+
+    metronSolrClient = Mockito.spy(new MetronSolrClient(zookeeperUrl));
+    Mockito.doReturn(new NamedList<Object>() {{
+      add("collections", new ArrayList<String>());
+    }}).when(metronSolrClient).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.LIST.name())), (String) isNull());
+    Mockito.doReturn(new NamedList<>()).when(metronSolrClient).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.CREATE.name())), (String) isNull());
+    metronSolrClient.createCollection(collection, 1, 1);
+    verify(metronSolrClient, times(1)).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.LIST.name())), (String) isNull());
+    verify(metronSolrClient, times(1)).request(argThat(new CollectionRequestMatcher(CollectionParams.CollectionAction.CREATE.name())), (String) isNull());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/SolrWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/SolrWriterTest.java b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/SolrWriterTest.java
new file mode 100644
index 0000000..7c720ea
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/test/java/org/apache/metron/writer/solr/SolrWriterTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.solr;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.domain.Configurations;
+import org.apache.metron.util.SampleUtil;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.hamcrest.Description;
+import org.json.simple.JSONObject;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class SolrWriterTest {
+
+  class CollectionRequestMatcher extends ArgumentMatcher<QueryRequest> {
+
+    private String name;
+
+    public CollectionRequestMatcher(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      QueryRequest queryRequest = (QueryRequest) o;
+      return name.equals(queryRequest.getParams().get("action"));
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText(name);
+    }
+  }
+
+  class SolrInputDocumentMatcher extends ArgumentMatcher<SolrInputDocument> {
+
+    private int expectedId;
+    private String expectedSourceType;
+    private int expectedInt;
+    private double expectedDouble;
+
+    public SolrInputDocumentMatcher(int expectedId, String expectedSourceType, int expectedInt, double expectedDouble) {
+      this.expectedId = expectedId;
+      this.expectedSourceType = expectedSourceType;
+      this.expectedInt = expectedInt;
+      this.expectedDouble = expectedDouble;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      SolrInputDocument solrInputDocument = (SolrInputDocument) o;
+      int actualId = (Integer) solrInputDocument.get("id").getValue();
+      String actualName = (String) solrInputDocument.get("sensorType").getValue();
+      int actualInt = (Integer) solrInputDocument.get("intField_i").getValue();
+      double actualDouble = (Double) solrInputDocument.get("doubleField_d").getValue();
+      return expectedId == actualId && expectedSourceType.equals(actualName) && expectedInt == actualInt && expectedDouble == actualDouble;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText(String.format("fields: [id=%d, doubleField_d=%f, name=%s, intField_i=%d]", expectedId, expectedDouble, expectedSourceType, expectedInt));
+    }
+
+  }
+
+  @Test
+  public void testWriter() throws Exception {
+    Configurations configurations = SampleUtil.getSampleConfigs();
+    JSONObject message1 = new JSONObject();
+    message1.put("intField", 100);
+    message1.put("doubleField", 100.0);
+    JSONObject message2 = new JSONObject();
+    message2.put("intField", 200);
+    message2.put("doubleField", 200.0);
+    List<JSONObject> messages = new ArrayList<>();
+    messages.add(message1);
+    messages.add(message2);
+
+    String collection = "metron";
+    MetronSolrClient solr = Mockito.mock(MetronSolrClient.class);
+    SolrWriter writer = new SolrWriter().withMetronSolrClient(solr);
+    writer.init(null, configurations);
+    verify(solr, times(1)).createCollection(collection, 1, 1);
+    verify(solr, times(1)).setDefaultCollection(collection);
+
+    collection = "metron2";
+    int numShards = 4;
+    int replicationFactor = 2;
+    Map<String, Object> globalConfig = configurations.getGlobalConfig();
+    globalConfig.put("solr.collection", collection);
+    globalConfig.put("solr.numShards", numShards);
+    globalConfig.put("solr.replicationFactor", replicationFactor);
+    configurations.updateGlobalConfig(globalConfig);
+    writer = new SolrWriter().withMetronSolrClient(solr);
+    writer.init(null, configurations);
+    verify(solr, times(1)).createCollection(collection, numShards, replicationFactor);
+    verify(solr, times(1)).setDefaultCollection(collection);
+
+    writer.write("test", configurations, new ArrayList<Tuple>(), messages);
+    verify(solr, times(1)).add(argThat(new SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 100.0)));
+    verify(solr, times(1)).add(argThat(new SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 200.0)));
+    verify(solr, times(0)).commit(collection);
+
+    writer = new SolrWriter().withMetronSolrClient(solr).withShouldCommit(true);
+    writer.init(null, configurations);
+    writer.write("test", configurations, new ArrayList<Tuple>(), messages);
+    verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 100.0)));
+    verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 200.0)));
+    verify(solr, times(1)).commit(collection);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e59b1a31/metron-streaming/Metron-Solr/src/test/resources/solr/conf/_rest_managed.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Solr/src/test/resources/solr/conf/_rest_managed.json b/metron-streaming/Metron-Solr/src/test/resources/solr/conf/_rest_managed.json
new file mode 100644
index 0000000..6a4aec3
--- /dev/null
+++ b/metron-streaming/Metron-Solr/src/test/resources/solr/conf/_rest_managed.json
@@ -0,0 +1 @@
+{"initArgs":{},"managedList":[]}


Mime
View raw message