From commits-return-4210-archive-asf-public=cust-asf.ponee.io@metron.apache.org Thu Feb 14 14:26:26 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0715F1807BC for ; Thu, 14 Feb 2019 15:26:24 +0100 (CET) Received: (qmail 46523 invoked by uid 500); 14 Feb 2019 14:26:18 -0000 Mailing-List: contact commits-help@metron.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@metron.apache.org Delivered-To: mailing list commits@metron.apache.org Received: (qmail 46514 invoked by uid 99); 14 Feb 2019 14:26:18 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Feb 2019 14:26:18 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 1C83D82F2F; Thu, 14 Feb 2019 14:26:14 +0000 (UTC) Date: Thu, 14 Feb 2019 14:26:14 +0000 To: "commits@metron.apache.org" Subject: [metron-bro-plugin-kafka] branch master updated: METRON-1992 Support sending a log to multiple topics (JonZeolla) closes apache/metron-bro-plugin-kafka#23 MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155015437407.8349.2409181301150345299@gitbox.apache.org> From: jonzeolla@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: metron-bro-plugin-kafka X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: b360b85e00a8d6b8db9c790c44a767a54c81eb2b X-Git-Newrev: d96568e6df21b3f091ffc3ad3fd7f93d7cb16788 X-Git-Rev: d96568e6df21b3f091ffc3ad3fd7f93d7cb16788 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jonzeolla pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/metron-bro-plugin-kafka.git The following commit(s) were added to refs/heads/master by this push: new d96568e METRON-1992 Support sending a log to multiple topics (JonZeolla) closes apache/metron-bro-plugin-kafka#23 d96568e is described below commit d96568e6df21b3f091ffc3ad3fd7f93d7cb16788 Author: JonZeolla AuthorDate: Thu Feb 14 09:25:46 2019 -0500 METRON-1992 Support sending a log to multiple topics (JonZeolla) closes apache/metron-bro-plugin-kafka#23 --- README.md | 41 +++++++++++++++++++++++++++++++++++++++-- src/KafkaWriter.cc | 18 +++++++++++++++--- src/KafkaWriter.h | 2 ++ 3 files changed, 56 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 7cc2c46..72436e9 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,7 @@ redef Kafka::kafka_conf = table( ### Example 2 - Send all active logs -This plugin has the ability send all active logs to kafka with the following configuration. +This plugin has the ability send all active logs to the "bro" kafka topic with the following configuration. ``` @load packages/metron-bro-plugin-kafka/Apache/Kafka @@ -138,7 +138,7 @@ redef Kafka::kafka_conf = table( ); ``` -### Example 4 - Send logs to unique topics +### Example 4 - Send each bro log to a unique topic It is also possible to send each log stream to a uniquely named topic. The goal in this example is to send all HTTP records to a Kafka topic named `http` and all DNS records to a separate Kafka topic named `dns`. * The `topic_name` value must be set to an empty string. @@ -228,6 +228,43 @@ event bro_init() &priority=-10 * You can also filter IPv6 logs from within your Metron cluster [using Stellar](https://github.com/apache/metron/tree/master/metron-stellar/stellar-common#is_ip). In that case, you wouldn't apply a predicate in your bro configuration, and instead Stellar would filter the logs out before they were processed by the enrichment layer of Metron. * It is also possible to use the `is_v6_subnet()` bro function in your predicate, as of their [2.5 release](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-5), however the above example should work on [bro 2.4](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-4) and newer, which has been the focus of the kafka plugin. +### Example 6 - Sending a log to multiple topics + +You are able to send a single bro log to multiple different kafka topics in the same kafka cluster by overriding the default topic (configured with `Kafka::topic_name`) by creating a custom bro `Log::Filter`. In this example, the DHCP, RADIUS, and DNS logs are sent to the "bro" topic; the RADIUS log is duplicated to the "shew_bro_radius" topic; and the DHCP log is duplicated to the "shew_bro_dhcp" topic. + +``` +@load packages/metron-bro-plugin-kafka/Apache/Kafka +redef Kafka::logs_to_send = set(DHCP::LOG, RADIUS::LOG, DNS::LOG); +redef Kafka::topic_name = "bro"; +redef Kafka::kafka_conf = table( + ["metadata.broker.list"] = "server1.example.com:9092,server2.example.com:9092" +); +redef Kafka::tag_json = T; + +event bro_init() &priority=-10 +{ + # Send RADIUS to the shew_bro_radius topic + local shew_radius_filter: Log::Filter = [ + $name = "kafka-radius-shew", + $writer = Log::WRITER_KAFKAWRITER, + $path = "shew_bro_radius" + $config = table(["topic_name"] = "shew_bro_radius") + ]; + Log::add_filter(RADIUS::LOG, shew_radius_filter); + + # Send DHCP to the shew_bro_dhcp topic + local shew_dhcp_filter: Log::Filter = [ + $name = "kafka-dhcp-shew", + $writer = Log::WRITER_KAFKAWRITER, + $path = "shew_bro_dhcp" + $config = table(["topic_name"] = "shew_bro_dhcp") + ]; + Log::add_filter(DHCP::LOG, shew_dhcp_filter); +} +``` + +_Note_: Because `Kafka::tag_json` is set to True in this example, the value of `$path` is used as the tag for each `Log::Filter`. If you were to add a log filter with the same `$path` as an existing filter, Bro will append "-N", where N is an integer starting at 2, to the end of the log path so that each filter has its own unique log path. For instance, the second instance of `conn` would become `conn-2`. + ## Settings ### `logs_to_send` diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc index 1d4a28a..563ef74 100644 --- a/src/KafkaWriter.cc +++ b/src/KafkaWriter.cc @@ -73,6 +73,15 @@ KafkaWriter::~KafkaWriter() // Cleanup must happen in DoFinish, not in the destructor } +string KafkaWriter::GetConfigValue(const WriterInfo& info, const string name) const +{ + map::const_iterator it = info.config.find(name.c_str()); + if (it == info.config.end()) + return string(); + else + return it->second; +} + /** * DoInit is called once for each call to the constructor, but in a separate * thread @@ -82,9 +91,12 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading // Timeformat object, default to TS_EPOCH threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH; - // if no global 'topic_name' is defined, use the log stream's 'path' - if(topic_name.empty()) { - topic_name = info.path; + // Allow overriding of the kafka topic via the Bro script constant "topic_name" + // which can be applied when adding a new Bro log filter. + topic_name_override = GetConfigValue(info, "topic_name"); + + if(!topic_name_override.empty()) { + topic_name = topic_name_override; } /** diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h index 14e0f7e..c67c664 100644 --- a/src/KafkaWriter.h +++ b/src/KafkaWriter.h @@ -65,12 +65,14 @@ protected: virtual bool DoHeartbeat(double network_time, double current_time); private: + string GetConfigValue(const WriterInfo& info, const string name) const; static const string default_topic_key; string stream_id; bool tag_json; string json_timestamps; map kafka_conf; string topic_name; + string topic_name_override; threading::formatter::Formatter *formatter; RdKafka::Producer* producer; RdKafka::Topic* topic;