metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From o...@apache.org
Subject [51/52] [abbrv] metron git commit: METRON-1445: Update performance tuning guide with more explicit parameter instructions (mmiklavc via mmiklavc) closes apache/metron#988
Date Wed, 18 Apr 2018 15:00:21 GMT
METRON-1445: Update performance tuning guide with more explicit parameter instructions (mmiklavc via mmiklavc) closes apache/metron#988


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/e0949142
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/e0949142
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/e0949142

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: e0949142dd682a84e59fea09066de8024f106f13
Parents: daf543b
Author: mmiklavc <michael.miklavcic@gmail.com>
Authored: Tue Apr 17 12:31:37 2018 -0600
Committer: Michael Miklavcic <michael.miklavcic@gmail.com>
Committed: Tue Apr 17 12:31:37 2018 -0600

----------------------------------------------------------------------
 metron-platform/Performance-tuning-guide.md     | 244 +++++++++++-
 metron-platform/metron-common/README.md         |  32 ++
 .../src/main/scripts/cluster_info.py            | 389 +++++++++++++++++++
 3 files changed, 659 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/e0949142/metron-platform/Performance-tuning-guide.md
----------------------------------------------------------------------
diff --git a/metron-platform/Performance-tuning-guide.md b/metron-platform/Performance-tuning-guide.md
index 7d79ace..e2d1ae2 100644
--- a/metron-platform/Performance-tuning-guide.md
+++ b/metron-platform/Performance-tuning-guide.md
@@ -17,6 +17,14 @@ limitations under the License.
 -->
 # Metron Performance Tuning Guide
 
+- [Overview](#overview)
+- [General Tuning Suggestions](#general-tuning-suggestions)
+- [Component Tuning Levers](#component-tuning-levers)
+- [Use Case Specific Tuning Suggestions](#use-case-specific-tuning-suggestions)
+- [Debugging](#debugging)
+- [Issues](#issues)
+- [Reference](#reference)
+
 ## Overview
 
 This document provides guidance from our experiences tuning the Apache Metron Storm topologies for maximum performance. You'll find
@@ -31,20 +39,33 @@ pipe, and the majority of these options assist in tweaking the various pipe widt
 
 ## General Tuning Suggestions
 
+### Storm Executors vs. Tasks
+
 Note that there is currently no method for specifying the number of tasks from the number of executors in Flux topologies (enrichment,
  indexing). By default, the number of tasks will equal the number of executors. Logically, setting the number of tasks equal to the number
 of executors is sensible. Storm enforces num executors <= num tasks. The reason you might set the number of tasks higher than the number of
 executors is for future performance tuning and rebalancing without the need to bring down your topologies. The number of tasks is fixed
 at topology startup time whereas the number of executors can be increased up to a maximum value equal to the number of tasks.
 
-When configuring Storm Kafka spouts, we found that the default values for poll.timeout.ms, offset.commit.period.ms, and max.uncommitted.offsets worked well in nearly all cases.
-As a general rule, it was optimal to set spout parallelism equal to the number of partitions used in your Kafka topic. Any greater
+### Kafka Spout Configuration
+
+When configuring Storm Kafka spouts, we found that the default values for
+
+- `poll.timeout.ms`
+- `offset.commit.period.ms`
+- `max.uncommitted.offsets `
+
+worked well in nearly all cases. As a general rule, it was optimal to set spout parallelism equal to the number of partitions used in your Kafka topic. Any greater
 parallelism will leave you with idle consumers since Kafka limits the max number of consumers to the number of partitions. This is
 important because Kafka has certain ordering guarantees for message delivery per partition that would not be possible if more than
 one consumer in a given consumer group were able to read from that partition.
 
 ## Component Tuning Levers
 
+### High Level Overview
+
+There are a number of levers that can be set while tuning a Metron cluster. The main services to interact with for performance tuning are: Kafka, Storm, HDFS, and indexing (Elasticsearch or Solr). For each service, here is a high level breakdown of the major knobs and levers that can be modified while tuning your cluster.
+
 - Kafka
     - Number partitions
 - Storm
@@ -70,12 +91,15 @@ for more details.
 
 ### Storm Tuning
 
+#### Overview
+
 There are quite a few options you will be confronted with when tuning your Storm topologies and this is largely trial and error. As a general rule of thumb,
 we recommend starting with the defaults and smaller numbers in terms of parallelism while iteratively working up until the desired performance is achieved.
 You will find the offset lag tool indispensable while verifying your settings.
 
 We won't go into a full discussion about Storm's architecture - see references section for more info - but there are some general rules of thumb that should be
 followed. It's first important to understand the ways you can impact parallelism in a Storm topology.
+
 - num tasks
 - num executors (parallelism hint)
 - num workers
@@ -83,10 +107,10 @@ followed. It's first important to understand the ways you can impact parallelism
 Tasks are instances of a given spout or bolt, executors are threads in a process, and workers are jvm processes. You'll want the number of tasks as a multiple of the number of executors,
 the number of executors as multiple of the number of workers, and the number of workers as a multiple of the number of machines. The main reason for this approach is
  that it will give a uniform distribution of work to each machine and jvm process. More often than not, your number of tasks will be equal to the number of executors, which
- is the default in Storm. Flux does not actually provide a way to independently set number of tasks, so for enrichments and indexing which use Flux, num tasks will always equal
+ is the default in Storm. Flux does not actually provide a way to independently set number of tasks, so for enrichments and indexing, which use Flux, num tasks will always equal
  num executors.
 
-You can change the number of workers via the property `topology.workers`
+You can change the number of workers via the Storm property `topology.workers`
 
 __Other Storm Settings__
 
@@ -96,12 +120,15 @@ topology.max.spout.pending
 This is the maximum number of tuples that can be in flight (ie, not yet acked) at any given time within your topology. You set this as a form of backpressure to ensure
 you don't flood your topology.
 
+
 ```
 topology.ackers.executors
 ```
+
 This specifies how many threads should be dedicated to tuple acking. We found that setting this equal to the number of partitions in your inbound Kafka topic worked well.
 
 __spout-config.json__
+
 ```
 {
     ...
@@ -111,15 +138,146 @@ __spout-config.json__
 }
 ```
 
-These are the spout recommended defaults from Storm and are currently the defaults provided in the Kafka spout itself. In fact, if you find the recommended defaults work fine for you,
+Above is a snippet for configuring parsers. These are the spout recommended defaults from Storm and are currently the defaults provided in the Kafka spout itself. In fact, if you find the recommended defaults work fine for you,
 then you can omit these settings altogether.
 
+#### Where to Find Tuning Properties
+
+**Important:** The parser topologies are deployed via a builder pattern that takes parameters from the CLI as set via Ambari. The enrichment and indexing topologies are configured
+using a Storm Flux file, a configuration properties file, and Ambari. Here is a setting materialization summary for each of the topology types:
+
+- Parsers
+	- Management UI -> parser json config and CLI -> Storm
+- Enrichment
+	- Ambari UI -> properties file -> Flux -> Storm
+- Indexing
+	- Ambari UI -> properties file -> Flux -> Storm
+
+**Parsers**
+
+This is a mapping of the various performance tuning properties for parsers and how they are materialized.
+
+See more detail on starting parsers [here](https://github.com/apache/metron/blob/master/metron-platform/metron-parsers/README.md#starting-the-parser-topology)
+
+| Category                    | Management UI Property Name                | JSON Config File Property Name     | CLI Option                                                                                     | Storm Property Name             |  Notes                                                                        |
+|-----------------------------|--------------------------------------------|------------------------------------|------------------------------------------------------------------------------------------------|---------------------------------|-------------------------------------------------------------------------------|
+| Storm topology config       | Num Workers                                | n/a                                | -nw,--num_workers <NUM_WORKERS>                                                                | topology.workers                |                                                                               |
+|                             | Num Ackers                                 | n/a                                | -na,--num_ackers <NUM_ACKERS>                                                                  | topology.acker.executors        |                                                                               |
+|                             | Storm Config                               | topology.max.spout.pending         | -e,--extra_topology_options <JSON_FILE>, e.g. { "topology.max.spout.pending" : NUM }           | topology.max.spout.pending      | Put property in JSON format in a file named `storm-<MY_PARSER>-config.json`   |
+| Kafka spout                 | Spout Parallelism                          | n/a                                | -sp,--spout_p <SPOUT_PARALLELISM_HINT>                                                         | n/a                             |                                                                               |
+|                             | Spout Num Tasks                            | n/a                                | -snt,--spout_num_tasks <NUM_TASKS>                                                             | n/a                             |                                                                               |
+|                             | Spout Config                               | spout.pollTimeoutMs                | -esc,--extra_kafka_spout_config <JSON_FILE>, e.g. { "spout.pollTimeoutMs" : 200 }              | n/a                             | Put property in JSON format in a file named `spout-<MY_PARSER>-config.json`   |
+|                             | Spout Config                               | spout.maxUncommittedOffsets        | -esc,--extra_kafka_spout_config <JSON_FILE>, e.g. { "spout.maxUncommittedOffsets" : 10000000 } | n/a                             | Put property in JSON format in a file named `spout-<MY_PARSER>-config.json`   |
+|                             | Spout Config                               | spout.offsetCommitPeriodMs         | -esc,--extra_kafka_spout_config <JSON_FILE>, e.g. { "spout.offsetCommitPeriodMs" : 30000 }     | n/a                             | Put property in JSON format in a file named `spout-<MY_PARSER>-config.json`   |
+| Parser bolt                 | Parser Num Tasks                           | n/a                                | -pnt,--parser_num_tasks <NUM_TASKS>                                                            | n/a                             |                                                                               |
+|                             | Parser Parallelism                         | n/a                                | -pp,--parser_p <PARALLELISM_HINT>                                                              | n/a                             |                                                                               |
+|                             | Parser Parallelism                         | n/a                                | -pp,--parser_p <PARALLELISM_HINT>                                                              | n/a                             |                                                                               |
+
+**Enrichment**
+
+This is a mapping of the various performance tuning properties for enrichments and how they are materialized.
+
+Flux file found here - $METRON_HOME/flux/enrichment/remote.yaml
+
+_Note 1:_ Changes to Flux file properties that are managed by Ambari will render Ambari unable to further manage the property.
+
+_Note 2:_ Many of these settings will be irrelevant in the alternate non-split-join topology
+
+| Category                    | Ambari Property Name                       | enrichment.properties property                         | Flux Property                                          | Flux Section Location               | Storm Property Name             | Notes                                  |
+|-----------------------------|--------------------------------------------|--------------------------------------------------------|--------------------------------------------------------|-------------------------------------|---------------------------------|----------------------------------------|
+| Storm topology config       | enrichment_workers                         | enrichment.workers                                     | topology.workers                                       | line 18, config                     | topology.workers                |                                        |
+|                             | enrichment_acker_executors                 | enrichment.acker.executors                             | topology.acker.executors                               | line 18, config                     | topology.acker.executors        |                                        |
+|                             | enrichment_topology_max_spout_pending      | topology.max.spout.pending                             | topology.max.spout.pending                             | line 18, config                     | topology.max.spout.pending      |                                        |
+| Kafka spout                 | enrichment_kafka_spout_parallelism         | kafka.spout.parallelism                                | parallelism                                            | line 245, id: kafkaSpout            | n/a                             |                                        |
+|                             | n/a                                        | session.timeout.ms                                     | session.timeout.ms                                     | line 201, id: kafkaProps            | n/a                             | Kafka consumer client property         |
+|                             | n/a                                        | enable.auto.commit                                     | enable.auto.commit                                     | line 201, id: kafkaProps            | n/a                             | Kafka consumer client property         |
+|                             | n/a                                        | n/a                                                    | setPollTimeoutMs                                       | line 230, id: kafkaConfig           | n/a                             | Kafka consumer client property         |
+|                             | n/a                                        | n/a                                                    | setMaxUncommittedOffsets                               | line 230, id: kafkaConfig           | n/a                             | Kafka consumer client property         |
+|                             | n/a                                        | n/a                                                    | setOffsetCommitPeriodMs                                | line 230, id: kafkaConfig           | n/a                             | Kafka consumer client property         |
+| Enrichment splitter         | enrichment_split_parallelism               | enrichment.split.parallelism                           | parallelism                                            | line 253, id: enrichmentSplitBolt   | n/a                             |                                        |
+| Enrichment joiner           | enrichment_join_parallelism                | enrichment.join.parallelism                            | parallelism                                            | line 316, id: enrichmentJoinBolt    | n/a                             |                                        |
+| Threat intel splitter       | threat_intel_split_parallelism             | threat.intel.split.parallelism                         | parallelism                                            | line 338, id: threatIntelSplitBolt  | n/a                             |                                        |
+| Threat intel joiner         | threat_intel_join_parallelism              | threat.intel.join.parallelism                          | parallelism                                            | line 376, id: threatIntelJoinBolt   | n/a                             |                                        |
+| Output bolt                 | kafka_writer_parallelism                   | kafka.writer.parallelism                               | parallelism                                            | line 397, id: outputBolt            | n/a                             |                                        |
+
+When adding Kafka spout properties, there are 3 ways you'll do this.
+
+1. Ambari: If they are properties managed by Ambari (noted in the table under 'Ambari Property Name'), look for the setting in Ambari.
+
+1. Flux -> kafkaProps: add a new key/value to the kafkaProps section HashMap on line 201. For example, if you want to set the Kafka Spout consumer's session.timeout.ms to 30 seconds, you would add the following:
+
+    ```
+           -   name: "put"
+               args:
+                   - "session.timeout.ms"
+                   - 30000
+    ```
+
+1. Flux -> kafkaConfig: add a new setter to the kafkaConfig section on line 230. For example, if you want to set the Kafka Spout consumer's poll timeout to 200 milliseconds, you would add the following under `configMethods`:
+
+    ```
+             -   name: "setPollTimeoutMs"
+                 args:
+                     - 200
+    ```
+
+**Indexing (Batch)**
+
+This is a mapping of the various performance tuning properties for indexing and how they are materialized.
+
+Flux file can be found here - $METRON_HOME/flux/indexing/batch/remote.yaml.
+
+Note: Changes to Flux file properties that are managed by Ambari will render Ambari unable to further manage the property.
+
+| Category                    | Ambari Property Name                       | hdfs.properties property                               | Flux Property                                          | Flux Section Location               | Storm Property Name             | Notes                                  |
+|-----------------------------|--------------------------------------------|--------------------------------------------------------|--------------------------------------------------------|-------------------------------------|---------------------------------|----------------------------------------|
+| Storm topology config       | enrichment_workers                         | enrichment.workers                                     | topology.workers                                       | line 19, config                     | topology.workers                |                                        |
+|                             | enrichment_acker_executors                 | enrichment.acker.executors                             | topology.acker.executors                               | line 19, config                     | topology.acker.executors        |                                        |
+|                             | enrichment_topology_max_spout_pending      | topology.max.spout.pending                             | topology.max.spout.pending                             | line 19, config                     | topology.max.spout.pending      |                                        |
+| Kafka spout                 | batch_indexing_kafka_spout_parallelism     | kafka.spout.parallelism                                | parallelism                                            | line 123, id: kafkaSpout            | n/a                             |                                        |
+|                             | n/a                                        | session.timeout.ms                                     | session.timeout.ms                                     | line 80, id: kafkaProps             | n/a                             | Kafka consumer client property         |
+|                             | n/a                                        | enable.auto.commit                                     | enable.auto.commit                                     | line 80, id: kafkaProps             | n/a                             | Kafka consumer client property         |
+|                             | n/a                                        | n/a                                                    | setPollTimeoutMs                                       | line 108, id: kafkaConfig           | n/a                             | Kafka consumer client property         |
+|                             | n/a                                        | n/a                                                    | setMaxUncommittedOffsets                               | line 108, id: kafkaConfig           | n/a                             | Kafka consumer client property         |
+|                             | n/a                                        | n/a                                                    | setOffsetCommitPeriodMs                                | line 108, id: kafkaConfig           | n/a                             | Kafka consumer client property         |
+| Output bolt                 | hdfs_writer_parallelism                    | hdfs.writer.parallelism                                | parallelism                                            | line 133, id: hdfsIndexingBolt      | n/a                             |                                        |
+|                             | n/a                                        | n/a                                                    | hdfsSyncPolicy (see notes below)                       | line 47, id: hdfsWriter             | n/a                             | See notes below about adding this prop |
+|                             | bolt_hdfs_rotation_policy_units            | bolt.hdfs.rotation.policy.units                        | constructorArgs                                        | line 41, id: hdfsRotationPolicy     | n/a                             |                                        |
+|                             | bolt_hdfs_rotation_policy_count            | bolt.hdfs.rotation.policy.count                        | constructorArgs                                        | line 41, id: hdfsRotationPolicy     | n/a                             |                                        |
+
+_Note_: HDFS sync policy is not currently managed via Ambari. You will need to modify the Flux file directly to accommodate this setting. e.g.
+
+Add a new setter to the hdfsWriter around line 56. Lines 53-55 provided for context.
+
+```
+ 53             -   name: "withRotationPolicy"
+ 54                 args:
+ 55                     - ref: "hdfsRotationPolicy
+ 56             -   name: "withSyncPolicy"
+ 57                 args:
+ 58                     - ref: "hdfsSyncPolicy
+```
+
+Add an hdfsSyncPolicy after the hdfsRotationPolicy that appears on line 41. e.g.
+
+```
+ 41     -   id: "hdfsRotationPolicy"
+...
+ 45           - "${bolt.hdfs.rotation.policy.units}"
+ 46
+ 47     -   id: "hdfsSyncPolicy"
+ 48         className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy"
+ 49         constructorArgs:
+ 50           -  100000
+```
+
 ## Use Case Specific Tuning Suggestions
 
 The below discussion outlines a specific tuning exercise we went through for driving 1 Gbps of traffic through a Metron cluster running with 4 Kafka brokers and 4
 Storm Supervisors.
 
 General machine specs
+
 - 10 Gb network cards
 - 256 GB memory
 - 12 disks
@@ -174,6 +332,7 @@ ${KAFKA_HOME}/bin/kafka-consumer-groups.sh \
 
 This will return a table with the following output depicting offsets for all partitions and consumers associated with the specified
 consumer group:
+
 ```
 GROUP                          TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
 enrichments                    enrichments        9          29746066        29746067        1               consumer-2_/xxx.xxx.xxx.xxx
@@ -212,6 +371,7 @@ We started with a single partition for the inbound Kafka topics and eventually w
 The default is 'null' which would result in no limit.
 
 __storm-bro.config__
+
 ```
 {
     ...
@@ -223,6 +383,7 @@ __storm-bro.config__
 And the following default spout settings. Again, this can be ommitted entirely since we are using the defaults.
 
 __spout-bro.config__
+
 ```
 {
     ...
@@ -252,6 +413,7 @@ though you could certainly do so if necessary. Notice that we only needed 1 work
 ```
 
 From the usage docs, here are the options we've used. The full reference can be found [here](../metron-platform/metron-parsers/README.md#Starting_the_Parser_Topology).
+
 ```
 usage: start_parser_topology.sh
  -e,--extra_topology_options <JSON_FILE>               Extra options in the form
@@ -290,6 +452,7 @@ Note that the main Metron-specific option we've changed to accomodate the desire
 More information on Flux can be found here - http://storm.apache.org/releases/1.0.1/flux.html
 
 __General storm settings__
+
 ```
 topology.workers: 8
 topology.acker.executors: 48
@@ -297,6 +460,7 @@ topology.max.spout.pending: 2000
 ```
 
 __Spout and Bolt Settings__
+
 ```
 kafkaSpout
     parallelism=48
@@ -341,6 +505,7 @@ cat ${METRON_HOME}/config/zookeeper/indexing/bro.json
 And here are the settings we used for the indexing topology
 
 __General storm settings__
+
 ```
 topology.workers: 4
 topology.acker.executors: 24
@@ -348,6 +513,7 @@ topology.max.spout.pending: 2000
 ```
 
 __Spout and Bolt Settings__
+
 ```
 hdfsSyncPolicy
     org.apache.storm.hdfs.bolt.sync.CountSyncPolicy
@@ -372,12 +538,14 @@ PCAP is a specialized topology that is a Spout-only topology. Both Kafka topic c
 avoid the additional network hop required if using an additional bolt.
 
 __General Storm topology properties__
+
 ```
 topology.workers=16
 topology.ackers.executors: 0
 ```
 
 __Spout and Bolt properties__
+
 ```
 kafkaSpout
     parallelism: 128
@@ -403,6 +571,69 @@ writerConfig
         dfs.blocksize=1073741824
 ```
 
+## Debugging
+
+Set the following env vars accordingly for your cluster. This is how we would configure it for the Metron full dev development environment.
+
+```
+export HDP_HOME=/usr/hdp/current
+export KAFKA_HOME=$HDP_HOME/kafka-broker
+export STORM_UI=http://node1:8744
+export ELASTIC=http://node1:9200
+export ZOOKEEPER=node1:2181
+export METRON_VERSION=0.4.3
+export METRON_HOME=/usr/metron/${METRON_VERSION}
+```
+
+Note that the output from Storm will be a flattened blob of JSON. In order to pretty print for readability, you can pipe it through a JSON formatter, e.g.
+
+```
+[some Storm curl command] | python -m json.tool
+```
+
+**Getting Storm Configuration Details**
+
+Storm has a useful REST API you can use to get full details about your running topologies. This is generally more convenient and complete for troubleshooting performance problems than going to the Storm UI alone. See Storm's [REST API docs](http://storm.apache.org/releases/1.1.0/STORM-UI-REST-API.html) for more details.
+
+```
+# get Storm cluster summary info including version
+curl -XGET ${STORM_UI}'/api/v1/cluster/summary'
+```
+
+```
+# get overall Storm cluster configuration
+curl -XGET ${STORM_UI}'/api/v1/cluster/configuration'
+```
+
+```
+# get list of topologies and brief summary detail
+curl -XGET ${STORM_UI}'/api/v1/topology/summary'
+```
+
+```
+# get all topology runtime settings. Plugin the ID for your topology, which you can get from the topology summary command or from the Storm UI. Passing sys=1 will also return system stats.
+curl -XGET ${STORM_UI}'/api/v1/topology/:id?sys=1‚Äč'
+```
+
+**Getting Kafka Configuration Details**
+
+```
+# Get list of Kafka topics
+${HDP_HOME}/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --list
+```
+
+```
+# Get Kafka topic details - plugin the desired topic name in place of "enrichments"
+${HDP_HOME}/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --topic enrichments --describe
+```
+
+**Getting Metron Topology Zookeeper Configuration**
+
+```
+# Provides a full listing of all Metron parser, enrichment, and indexing topology configuration
+$METRON_HOME/bin/zk_load_configs.sh -m DUMP -z $ZOOKEEPER
+```
+
 ## Issues
 
 __Error__
@@ -423,11 +654,12 @@ modifying the options outlined above, increasing the poll timeout, or both.
 ## Reference
 
 * [Enrichment Performance](metron-enrichment/Performance.md)
-* http://storm.apache.org/releases/1.0.1/flux.html
+* http://storm.apache.org/releases/1.1.0/flux.html
 * https://stackoverflow.com/questions/17257448/what-is-the-task-in-storm-parallelism
 * http://storm.apache.org/releases/current/Understanding-the-parallelism-of-a-Storm-topology.html
 * http://www.malinga.me/reading-and-understanding-the-storm-ui-storm-ui-explained/
 * https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
 * https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_storm-component-guide/content/storm-kafkaspout-perf.html
+* http://storm.apache.org/releases/1.1.0/STORM-UI-REST-API.html
 
 

http://git-wip-us.apache.org/repos/asf/metron/blob/e0949142/metron-platform/metron-common/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/README.md b/metron-platform/metron-common/README.md
index ab90a66..b25fbc8 100644
--- a/metron-platform/metron-common/README.md
+++ b/metron-platform/metron-common/README.md
@@ -23,6 +23,7 @@ limitations under the License.
 * [Management Utility](#management-utility)
 * [Topology Errors](topology-errors)
 * [Performance Logging](#performance-logging)
+* [Metron Debugging](#metron-debugging)
 
 # Stellar Language
 
@@ -400,3 +401,34 @@ __Side Effects__
 Calling the mark() method multiple times simply resets the start time to the current nano time. Calling log() with a non-existent mark name will log 0 ns elapsed time with a warning indicating that log has been invoked for a mark name that does not exist.
 The class is not thread-safe and makes no attempt at keeping multiple threads from modifying the same markers.
 
+# Metron Debugging
+
+A Python script is provided for gathering information useful in debugging your Metron cluster. Run from the node that has Metron installed on it. All options listed below are required.
+
+_Note:_ Be aware that no anonymization/scrubbing is performed on the captured configuration details.
+
+```
+# $METRON_HOME/bin/cluster_info.py -h
+Usage: cluster_info.py [options]
+
+Options:
+  -h, --help            show this help message and exit
+  -a HOST:PORT, --ambari-host=HOST:PORT
+                        Connect to Ambari via the supplied host:port
+  -c NAME, --cluster-name=NAME
+                        Name of cluster in Ambari to retrieve info for
+  -o DIRECTORY, --out-dir=DIRECTORY
+                        Write debugging data to specified root directory
+  -s HOST:PORT, --storm-host=HOST:PORT
+                        Connect to Storm via the supplied host:port
+  -b HOST1:PORT,HOST2:PORT, --broker_list=HOST1:PORT,HOST2:PORT
+                        Connect to Kafka via the supplied comma-delimited
+                        host:port list
+  -z HOST1:PORT,HOST2:PORT, --zookeeper_quorum=HOST1:PORT,HOST2:PORT
+                        Connect to Zookeeper via the supplied comma-delimited
+                        host:port quorum list
+  -m DIRECTORY, --metron_home=DIRECTORY
+                        Metron home directory
+  -p DIRECTORY, --hdp_home=DIRECTORY
+                        HDP home directory
+```

http://git-wip-us.apache.org/repos/asf/metron/blob/e0949142/metron-platform/metron-common/src/main/scripts/cluster_info.py
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/scripts/cluster_info.py b/metron-platform/metron-common/src/main/scripts/cluster_info.py
new file mode 100755
index 0000000..6e853c0
--- /dev/null
+++ b/metron-platform/metron-common/src/main/scripts/cluster_info.py
@@ -0,0 +1,389 @@
+#!/usr/bin/python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from optparse import OptionParser
+from requests.auth import HTTPBasicAuth
+from contextlib import closing
+import datetime
+import getpass
+import json
+import os
+import os.path
+import requests
+import shutil
+import subprocess
+import sys
+import tarfile
+import zlib
+
+INDENT_SIZE = 2
+
+class UserPrompt(object):
+    
+    def __init__(self, prompt):
+        self.prompt = prompt
+
+    def get_hidden(self):
+        return getpass.getpass(self.prompt)
+
+class FileWriter(object):
+
+    def write(self, path, content):
+        print "Writing config to " + path
+        if not os.path.exists(os.path.dirname(path)):
+            try:
+                os.makedirs(os.path.dirname(path))
+            except OSError as exc: # Guard against race condition
+                if exc.errno != errno.EEXIST:
+                    raise
+        with open(path, 'w') as outfile:
+            outfile.write(content)
+        print "...done"
+
+class ShellHandler(object):
+
+    def __init__(self):
+        pass
+
+    # returns full stdout of process call
+    def call(self, command):
+        try:
+            return subprocess.call(command)
+        except OSError as e:
+            print >> sys.stderr, "Execution failed:", e
+    
+    # partly hijacked from Python 2.7+ check_output for use in 2.6
+    def ret_output(self, cmd):
+        process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
+        output, unused_err = process.communicate()
+        retcode = process.poll()
+        if retcode:
+            raise subprocess.CalledProcessError(retcode, cmd, output=output)
+        return output
+
+class InfoGatherer(object):
+
+    def __init__(self, name):
+        self.name = name
+
+class AmbariInfo(InfoGatherer):
+
+    def __init__(self, host_info, cluster_name):
+        super(AmbariInfo, self).__init__('Ambari')
+        self.cluster_name = cluster_name
+        self.ambari_config_url = 'http://{0}/api/v1/clusters/{1}/configurations/service_config_versions'.format(host_info, cluster_name)
+        self.params_payload = { 'is_current' : 'true' }
+
+    def collect(self, out_dir):
+        print "Ambari request URL: " + self.ambari_config_url
+        ambari_user = UserPrompt('Ambari username: ').get_hidden()
+        ambari_pass = UserPrompt('Ambari password: ').get_hidden()
+        self.get_cluster_config(out_dir, ambari_user, ambari_pass)
+
+    def get_cluster_config(self, out_dir, ambari_user, ambari_pass):
+        # set encoding to 'identity' to keep Ambari from passing back gzipped content for large requests
+        headers = {
+                    'X-Requested-By' : 'ambari',
+                    'Authorization' : 'Basic',
+                    'Accept-Encoding': 'identity'
+                  }
+        # Retrieving Ambari config detail
+        response = requests.get(self.ambari_config_url, headers=headers, params=self.params_payload, stream=True, auth=HTTPBasicAuth(ambari_user, ambari_pass))
+        if response.status_code == 200:
+            file_name = 'ambari-cluster-config.json'
+            full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+            FileWriter().write(full_out_path, response.text)
+        else:
+            print "Request failed with status code: " + str(response.status_code)
+
+class StormInfo(InfoGatherer):
+
+    def __init__(self, host_info):
+        super(StormInfo, self).__init__('Storm')
+        url_base = 'http://{0}/api/v1'.format(host_info)
+        self.url_cluster_summary = url_base + '/cluster/summary'
+        self.url_cluster_configuration = url_base + '/cluster/configuration'
+        self.url_topology_summary = url_base + '/topology/summary'
+        self.url_topology_stats_summary = url_base + '/topology/{0}?sys=1'
+
+    def collect(self, out_dir):
+        self.get_cluster_summary(out_dir)
+        self.get_cluster_configuration(out_dir)
+        self.get_topology_summary(out_dir)
+        self.get_topology_stats_summary(out_dir)
+
+    def get_cluster_summary(self, out_dir):
+        response = requests.get(self.url_cluster_summary)
+        if response.status_code == 200:
+            file_name = 'cluster-summary.json'
+            full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+            FileWriter().write(full_out_path, json.dumps(response.json(), indent=INDENT_SIZE))
+        else:
+            print "Request failed with status code: " + str(response.status_code)
+
+    def get_cluster_configuration(self, out_dir):
+        response = requests.get(self.url_cluster_configuration)
+        if response.status_code == 200:
+            file_name = 'cluster-configuration.json'
+            full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+            FileWriter().write(full_out_path, json.dumps(response.json(), indent=INDENT_SIZE))
+        else:
+            print "Request failed with status code: " + str(response.status_code)
+
+    def get_topology_summary(self, out_dir):
+        response = requests.get(self.url_topology_summary)
+        if response.status_code == 200:
+            file_name = 'topology-summary.json'
+            full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+            FileWriter().write(full_out_path, json.dumps(response.json(), indent=INDENT_SIZE))
+        else:
+            print "Request failed with status code: " + str(response.status_code)
+
+    def get_topology_stats_summary(self, out_dir):
+        summary_response = requests.get(self.url_topology_summary)
+        if summary_response.status_code == 200:
+            for feature, value in summary_response.json().iteritems():
+                if feature == 'topologies':
+                    for topology in value:
+                        for k, v in topology.iteritems():
+                            if k == 'id':
+                                print "Retrieving Storm topology stats summary for topology-id " + v
+                                response = requests.get(self.url_topology_stats_summary.format(v))
+                                if response.status_code == 200:
+                                    file_name = 'topology-{0}-stats-summary.json'.format(v)
+                                    full_out_path = os.path.join(out_dir, self.name.lower(), 'stats-summaries', file_name)
+                                    FileWriter().write(full_out_path, json.dumps(response.json(), indent=INDENT_SIZE))
+                                else:
+                                    print "Request failed with status code: " + str(response.status_code)
+        else:
+            print "Topology listing request failed with status code: " + str(summary_response.status_code)
+
+class KafkaInfo(InfoGatherer):
+
+    def __init__(self, broker_list, zookeeper_quorum, hdp_home):
+        super(KafkaInfo, self).__init__('Kafka')
+        self.broker_list = broker_list
+        self.zookeeper_quorum = zookeeper_quorum
+        self.hdp_home = hdp_home
+        # note, need to escape the last single quote with the trim command so the string literal works
+        self.cmd_broker_id = '''{0}/kafka-broker/bin/zookeeper-shell.sh {1} <<< "ls /brokers/ids" | grep -e '\[.*\]' | tr -d [] | tr , ' \''''.format(self.hdp_home, self.zookeeper_quorum)
+        # broker id is dynamic and replaced later
+        self.cmd_broker_info = '''echo "get /brokers/ids/{0}" | {1}/kafka-broker/bin/zookeeper-shell.sh {2} 2>&1'''.format('{0}', self.hdp_home, self.zookeeper_quorum)
+        self.cmd_kafka_topics = '''{0}/kafka-broker/bin/kafka-topics.sh --zookeeper {1} --list'''.format(self.hdp_home, self.zookeeper_quorum)
+        self.cmd_topic_detail = '''{0}/kafka-broker/bin/kafka-topics.sh --zookeeper {1} --topic {2} --describe'''.format(self.hdp_home, self.zookeeper_quorum, '{0}')
+
+    def collect(self, out_dir):
+        print "Retrieving Kafka detail"
+        self.get_broker_info(out_dir)
+        self.get_kafka_topics(out_dir)
+        self.get_topic_detail(out_dir)
+
+    def get_broker_info(self, out_dir):
+        print "Retrieving Kafka broker info"
+        broker_ids = ShellHandler().ret_output(self.cmd_broker_id)
+        for broker in broker_ids.strip().split(','):
+            file_name = 'kafka-broker-{0}-info.txt'.format(broker)
+            full_out_path = os.path.join(out_dir, self.name.lower(), 'broker-info', file_name)
+            broker_data = ShellHandler().ret_output(self.cmd_broker_info.format(broker))
+            FileWriter().write(full_out_path, broker_data)
+
+    def get_kafka_topics(self, out_dir):
+        file_name = 'kafka-topics.txt'
+        full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+        topic_list = ShellHandler().ret_output(self.cmd_kafka_topics)
+        FileWriter().write(full_out_path, topic_list)
+
+    def get_topic_detail(self, out_dir):
+        file_name = 'kafka-enrichments-topic.txt'
+        full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+        enrichment_topic_detail = ShellHandler().ret_output(self.cmd_topic_detail.format('enrichments'))
+        FileWriter().write(full_out_path, enrichment_topic_detail)
+
+        file_name = 'kafka-indexing-topic.txt'
+        full_out_path = os.path.join(out_dir, self.name.lower(), file_name)
+        indexing_topic_detail = ShellHandler().ret_output(self.cmd_topic_detail.format('indexing'))
+        FileWriter().write(full_out_path, indexing_topic_detail)
+
+class MetronInfo(InfoGatherer):
+
+    def __init__(self, metron_home, zookeeper_quorum):
+        super(MetronInfo, self).__init__('Metron')
+        self.metron_home = metron_home
+        self.zookeeper_quorum = zookeeper_quorum
+        self.cmd_zk_load_configs = '''{0}/bin/zk_load_configs.sh -m DUMP -z {1}'''.format(self.metron_home, self.zookeeper_quorum)
+        self.cmd_metron_lib_list = '''ls -al {0}/lib'''.format(self.metron_home)
+
+    def collect(self, out_dir):
+        self.get_metron_config(out_dir)
+        self.get_metron_flux(out_dir)
+        self.get_metron_zk_config(out_dir)
+        self.get_lib_listing(out_dir)
+        self.get_rpm_listing(out_dir)
+    
+    def get_metron_config(self, out_dir):
+        print 'Copying ' + self.metron_home + '/config'
+        full_out_path = os.path.join(out_dir, self.name.lower(), 'config')
+        shutil.copytree(self.metron_home + '/config', full_out_path)
+
+    def get_metron_flux(self, out_dir):
+        print 'Copying ' + self.metron_home + '/flux'
+        full_out_path = os.path.join(out_dir, self.name.lower(), 'flux')
+        shutil.copytree(self.metron_home + '/flux', full_out_path)
+
+    def get_metron_zk_config(self, out_dir):
+        zk_config_dump = ShellHandler().ret_output(self.cmd_zk_load_configs)
+        full_out_path = os.path.join(out_dir, self.name.lower(), 'zk-configs.txt')
+        FileWriter().write(full_out_path, zk_config_dump)
+
+    def get_lib_listing(self, out_dir):
+        metron_lib_list = ShellHandler().ret_output(self.cmd_metron_lib_list)
+        full_out_path = os.path.join(out_dir, self.name.lower(), 'metron-libs-dir.txt')
+        FileWriter().write(full_out_path, metron_lib_list)
+
+    def get_rpm_listing(self, out_dir):
+        metron_rpm_list = ShellHandler().ret_output('''rpm -qa | grep 'metron\|elasticsearch\|kibana\'''')
+        full_out_path = os.path.join(out_dir, self.name.lower(), 'metron-rpm-list.txt')
+        FileWriter().write(full_out_path, metron_rpm_list)
+
+class HdpInfo(InfoGatherer):
+
+    def __init__(self, hdp_home):
+        super(HdpInfo, self).__init__('HDP')
+        self.hdp_home = hdp_home
+
+    def collect(self, out_dir):
+        hadoop_version_info = ShellHandler().ret_output('hadoop version')
+        full_out_path = os.path.join(out_dir, self.name.lower(), 'version-info.txt')
+        FileWriter().write(full_out_path, hadoop_version_info)
+
+class ClusterInfo:
+
+    def __init__(self):
+        pass
+
+    def main(self):
+        (options, args) = self.get_cli_args()
+        self.collect_data(options.out_dir,
+                          options.ambari_host,
+                          options.cluster_name,
+                          options.storm_host,
+                          options.broker_list,
+                          options.zookeeper_quorum,
+                          options.metron_home,
+                          options.hdp_home)
+
+    def get_cli_args(self):
+        parser = OptionParser()
+        parser.add_option("-a", "--ambari-host", 
+                          action="store",
+                          type="string",
+                          dest="ambari_host",
+                          help="Connect to Ambari via the supplied host:port",
+                          default="node1:8080",
+                          metavar="HOST:PORT")
+        parser.add_option("-c", "--cluster-name", 
+                          action="store",
+                          type="string",
+                          dest="cluster_name",
+                          help="Name of cluster in Ambari to retrieve info for",
+                          default="metron_cluster",
+                          metavar="NAME")
+        parser.add_option("-o", "--out-dir", 
+                          action="store",
+                          type="string",
+                          dest="out_dir",
+                          help="Write debugging data to specified root directory",
+                          default=".",
+                          metavar="DIRECTORY")
+        parser.add_option("-s", "--storm-host", 
+                          action="store",
+                          type="string",
+                          dest="storm_host",
+                          help="Connect to Storm via the supplied host:port",
+                          default="node1:8744",
+                          metavar="HOST:PORT")
+        parser.add_option("-b", "--broker_list", 
+                          action="store",
+                          type="string",
+                          dest="broker_list",
+                          help="Connect to Kafka via the supplied comma-delimited host:port list",
+                          default="node1:6667",
+                          metavar="HOST1:PORT,HOST2:PORT")
+        parser.add_option("-z", "--zookeeper_quorum", 
+                          action="store",
+                          type="string",
+                          dest="zookeeper_quorum",
+                          help="Connect to Zookeeper via the supplied comma-delimited host:port quorum list",
+                          default="node1:2181",
+                          metavar="HOST1:PORT,HOST2:PORT")
+        parser.add_option("-m", "--metron_home", 
+                          action="store",
+                          type="string",
+                          dest="metron_home",
+                          help="Metron home directory",
+                          default="/usr/metron/0.4.3",
+                          metavar="DIRECTORY")
+        parser.add_option("-p", "--hdp_home", 
+                          action="store",
+                          type="string",
+                          dest="hdp_home",
+                          help="HDP home directory",
+                          default="/usr/hdp/current",
+                          metavar="DIRECTORY")
+
+        return parser.parse_args()
+    
+    def collect_data(self, 
+                     out_dir_base,
+                     ambari_host,
+                     cluster_name,
+                     storm_host,
+                     broker_list,
+                     zookeeper_quorum,
+                     metron_home,
+                     hdp_home):
+        out_dir = self.get_out_dirname(out_dir_base)
+        info_getters = [
+                AmbariInfo(ambari_host, cluster_name),
+                StormInfo(storm_host),
+                KafkaInfo(broker_list, zookeeper_quorum, hdp_home),
+                MetronInfo(metron_home, zookeeper_quorum),
+                HdpInfo(hdp_home)
+        ]
+        for getter in info_getters:
+            getter.collect(out_dir)
+        self.compress_files(out_dir)
+        print "Finished gathering debug info"
+
+    # creates dir w/timestamp to drop all configs
+    # e.g. metron-debug-2018-03-24_06-50-34
+    def get_out_dirname(self, out_dir_base):
+        return os.path.join(out_dir_base, 'metron-debug-' + datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
+
+    def compress_files(self, out_dir):
+        tarball_name = out_dir + '.tgz'
+        print "Creating tarfile bundle with all configs: '{0}'".format(tarball_name)
+        with closing(tarfile.open(tarball_name, 'w:gz')) as tar:
+            tar.add(out_dir, arcname=os.path.basename(out_dir))
+        print "...done"
+
+if __name__ == "__main__":
+    ClusterInfo().main()
+


Mime
View raw message