Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EB9E9200CA3 for ; Thu, 1 Jun 2017 23:41:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EA437160BC4; Thu, 1 Jun 2017 21:41:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 12802160BC1 for ; Thu, 1 Jun 2017 23:41:23 +0200 (CEST) Received: (qmail 83253 invoked by uid 500); 1 Jun 2017 21:41:23 -0000 Mailing-List: contact commits-help@metron.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@metron.apache.org Delivered-To: mailing list commits@metron.apache.org Received: (qmail 82555 invoked by uid 99); 1 Jun 2017 21:41:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Jun 2017 21:41:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20CBBF1755; Thu, 1 Jun 2017 21:41:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mattf@apache.org To: commits@metron.apache.org Date: Thu, 01 Jun 2017 21:42:03 -0000 Message-Id: <0765e4aefa3440bab591c4eefce504bb@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [44/44] metron git commit: METRON-858 bro-plugin-kafka is throwing segfaults (JonZeolla) closes apache/metron#547 archived-at: Thu, 01 Jun 2017 21:41:26 -0000 METRON-858 bro-plugin-kafka is throwing segfaults (JonZeolla) closes apache/metron#547 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/85872bd6 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/85872bd6 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/85872bd6 Branch: refs/heads/Metron_0.4.0 Commit: 85872bd68698149692c97a48dfe41a78435dcc99 Parents: 8779eb3 Author: JonZeolla Authored: Thu Jun 1 11:28:42 2017 -0400 Committer: jonzeolla Committed: Thu Jun 1 11:28:42 2017 -0400 ---------------------------------------------------------------------- metron-sensors/bro-plugin-kafka/README.md | 56 ++++++++++++++++++-- .../bro-plugin-kafka/configure.plugin | 2 +- .../bro-plugin-kafka/src/KafkaWriter.cc | 21 ++++---- 3 files changed, 63 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/85872bd6/metron-sensors/bro-plugin-kafka/README.md ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/README.md b/metron-sensors/bro-plugin-kafka/README.md index 31b1f54..e219360 100644 --- a/metron-sensors/bro-plugin-kafka/README.md +++ b/metron-sensors/bro-plugin-kafka/README.md @@ -36,13 +36,14 @@ Installation Activation ---------- -The following examples highlight different ways that the plugin can be used. Simply add Bro script to your `local.bro` file (for example, `/usr/share/bro/site/local.bro`) as shown to activate the plugin. +The following examples highlight different ways that the plugin can be used. Simply add the Bro script language to your `local.bro` file (for example, `/usr/share/bro/site/local.bro`) as shown to demonstrate the example. ### Example 1 The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`. * Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table. * By defining `topic_name` all records will be sent to the same Kafka topic. + * Defining `logs_to_send` will ensure that only HTTP and DNS records are sent. ``` @load Bro/Kafka/logs-to-kafka.bro @@ -73,7 +74,6 @@ event bro_init() $name = "kafka-http", $writer = Log::WRITER_KAFKAWRITER, $config = table( - ["stream_id"] = "HTTP::LOG", ["metadata.broker.list"] = "localhost:9092" ), $path = "http" @@ -85,7 +85,6 @@ event bro_init() $name = "kafka-dns", $writer = Log::WRITER_KAFKAWRITER, $config = table( - ["stream_id"] = "DNS::LOG", ["metadata.broker.list"] = "localhost:9092" ), $path = "dns" @@ -94,6 +93,57 @@ event bro_init() } ``` +### Example 3 + +You may want to configure bro to filter log messages with certain characteristics from being sent to your kafka topics. For instance, Metron currently doesn't support IPv6 source or destination IPs in the default enrichments, so it may be helpful to filter those log messages from being sent to kafka (although there are [multiple ways](#notes) to approach this). In this example we will do that that, and are assuming a somewhat standard bro kafka plugin configuration, such that: + * All bro logs are sent to the `bro` topic, by configuring `Kafka::topic_name`. + * Each JSON message is tagged with the appropriate log type (such as `http`, `dns`, or `conn`), by setting `tag_json` to true. + * If the log message contains a 128 byte long source or destination IP address, the log is not sent to kafka. + +``` +@load Bro/Kafka/logs-to-kafka.bro +redef Kafka::topic_name = "bro"; +redef Kafka::tag_json = T; + +event bro_init() &priority=-5 +{ + # handles HTTP + Log::add_filter(HTTP::LOG, [ + $name = "kafka-http", + $writer = Log::WRITER_KAFKAWRITER, + $pred(rec: HTTP::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); }, + $config = table( + ["metadata.broker.list"] = "localhost:9092" + ) + ]); + + # handles DNS + Log::add_filter(DNS::LOG, [ + $name = "kafka-dns", + $writer = Log::WRITER_KAFKAWRITER, + $pred(rec: DNS::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); }, + $config = table( + ["metadata.broker.list"] = "localhost:9092" + ) + ]); + + # handles Conn + Log::add_filter(Conn::LOG, [ + $name = "kafka-conn", + $writer = Log::WRITER_KAFKAWRITER, + $pred(rec: Conn::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); }, + $config = table( + ["metadata.broker.list"] = "localhost:9092" + ) + ]); +} +``` + +#### Notes + * `logs_to_send` is mutually exclusive with `$pred`, thus for each log you want to set `$pred` on, you must individually setup a `Log::add_filter` and refrain from including that log in `logs_to_send`. + * You can also filter IPv6 logs from within your Metron cluster [using Stellar](../../metron-platform/metron-common#IS_IP). In that case, you wouldn't apply a predicate in your bro configuration, and instead Stellar would filter the logs out before they were processed by the enrichment layer of Metron. + * It is also possible to use the `is_v6_subnet()` bro function in your predicate, as of their [2.5 release](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-5), however the above example should work on [bro 2.4](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-4) and newer, which has been the focus of the kafka plugin. + Settings -------- http://git-wip-us.apache.org/repos/asf/metron/blob/85872bd6/metron-sensors/bro-plugin-kafka/configure.plugin ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/configure.plugin b/metron-sensors/bro-plugin-kafka/configure.plugin index 1cb2086..c7e6662 100644 --- a/metron-sensors/bro-plugin-kafka/configure.plugin +++ b/metron-sensors/bro-plugin-kafka/configure.plugin @@ -31,7 +31,7 @@ plugin_option() { case "$1" in --with-librdkafka=*) - append_cache_entry LibRdKafka_ROOT_DIR PATH $optarg + append_cache_entry LibRDKafka_ROOT_DIR PATH $optarg ;; --with-openssl=*) append_cache_entry OpenSSL_ROOT_DIR PATH $optarg http://git-wip-us.apache.org/repos/asf/metron/blob/85872bd6/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc index 951a60c..c9ad44f 100644 --- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc +++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc @@ -75,13 +75,10 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading debug.assign((const char*)BifConst::Kafka::debug->Bytes(), BifConst::Kafka::debug->Len()); bool is_debug(!debug.empty()); if(is_debug) { - reporter->Info( "Debug is turned on and set to: %s. Available debug context: %s." - , debug.c_str() - , RdKafka::get_debug_contexts().c_str() - ); + MsgThread::Info(Fmt("Debug is turned on and set to: %s. Available debug context: %s.", debug.c_str(), RdKafka::get_debug_contexts().c_str())); } else { - reporter->Info( "Debug is turned off."); + MsgThread::Info(Fmt("Debug is turned off.")); } // kafka global configuration @@ -96,7 +93,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading // apply setting to kafka if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { - reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()); + Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str())); return false; } } @@ -105,7 +102,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading string key("debug"); string val(debug); if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { - reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()); + Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str())); return false; } } @@ -113,7 +110,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading // create kafka producer producer = RdKafka::Producer::create(conf, err); if (!producer) { - reporter->Error("Failed to create producer: %s", err.c_str()); + Error(Fmt("Failed to create producer: %s", err.c_str())); return false; } @@ -121,12 +118,12 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err); if (!topic) { - reporter->Error("Failed to create topic handle: %s", err.c_str()); + Error(Fmt("Failed to create topic handle: %s", err.c_str())); return false; } if(is_debug) { - reporter->Info("Successfully created producer."); + MsgThread::Info(Fmt("Successfully created producer.")); } return true; @@ -154,7 +151,7 @@ bool KafkaWriter::DoFinish(double network_time) if (producer->outq_len() == 0) { success = true; } else { - reporter->Error("Unable to deliver %0d message(s)", producer->outq_len()); + Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len())); } delete topic; @@ -187,7 +184,7 @@ bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields, } else { string err = RdKafka::err2str(resp); - reporter->Error("Kafka send failed: %s", err.c_str()); + Error(Fmt("Kafka send failed: %s", err.c_str())); } return true;