metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jonzeo...@apache.org
Subject [metron-bro-plugin-kafka] branch master updated: METRON-1992 Support sending a log to multiple topics (JonZeolla) closes apache/metron-bro-plugin-kafka#23
Date Thu, 14 Feb 2019 14:26:14 GMT
This is an automated email from the ASF dual-hosted git repository.

jonzeolla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron-bro-plugin-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new d96568e  METRON-1992 Support sending a log to multiple topics (JonZeolla) closes
apache/metron-bro-plugin-kafka#23
d96568e is described below

commit d96568e6df21b3f091ffc3ad3fd7f93d7cb16788
Author: JonZeolla <zeolla@gmail.com>
AuthorDate: Thu Feb 14 09:25:46 2019 -0500

    METRON-1992 Support sending a log to multiple topics (JonZeolla) closes apache/metron-bro-plugin-kafka#23
---
 README.md          | 41 +++++++++++++++++++++++++++++++++++++++--
 src/KafkaWriter.cc | 18 +++++++++++++++---
 src/KafkaWriter.h  |  2 ++
 3 files changed, 56 insertions(+), 5 deletions(-)

diff --git a/README.md b/README.md
index 7cc2c46..72436e9 100644
--- a/README.md
+++ b/README.md
@@ -114,7 +114,7 @@ redef Kafka::kafka_conf = table(
 
 ### Example 2 - Send all active logs
 
-This plugin has the ability send all active logs to kafka with the following configuration.
+This plugin has the ability send all active logs to the "bro" kafka topic with the following
configuration.
 
 ```
 @load packages/metron-bro-plugin-kafka/Apache/Kafka
@@ -138,7 +138,7 @@ redef Kafka::kafka_conf = table(
 );
 ```
 
-### Example 4 - Send logs to unique topics
+### Example 4 - Send each bro log to a unique topic
 
 It is also possible to send each log stream to a uniquely named topic.  The goal in this
example is to send all HTTP records to a Kafka topic named `http` and all DNS records to a
separate Kafka topic named `dns`.
  * The `topic_name` value must be set to an empty string.
@@ -228,6 +228,43 @@ event bro_init() &priority=-10
  * You can also filter IPv6 logs from within your Metron cluster [using Stellar](https://github.com/apache/metron/tree/master/metron-stellar/stellar-common#is_ip).
 In that case, you wouldn't apply a predicate in your bro configuration, and instead Stellar
would filter the logs out before they were processed by the enrichment layer of Metron.
  * It is also possible to use the `is_v6_subnet()` bro function in your predicate, as of
their [2.5 release](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-5), however
the above example should work on [bro 2.4](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-4)
and newer, which has been the focus of the kafka plugin.
 
+### Example 6 - Sending a log to multiple topics
+
+You are able to send a single bro log to multiple different kafka topics in the same kafka
cluster by overriding the default topic (configured with `Kafka::topic_name`) by creating
a custom bro `Log::Filter`.  In this example, the DHCP, RADIUS, and DNS logs are sent to the
"bro" topic; the RADIUS log is duplicated to the "shew_bro_radius" topic; and the DHCP log
is duplicated to the "shew_bro_dhcp" topic.
+
+```
+@load packages/metron-bro-plugin-kafka/Apache/Kafka
+redef Kafka::logs_to_send = set(DHCP::LOG, RADIUS::LOG, DNS::LOG);
+redef Kafka::topic_name = "bro";
+redef Kafka::kafka_conf = table(
+    ["metadata.broker.list"] = "server1.example.com:9092,server2.example.com:9092"
+);
+redef Kafka::tag_json = T;
+
+event bro_init() &priority=-10
+{
+    # Send RADIUS to the shew_bro_radius topic
+    local shew_radius_filter: Log::Filter = [
+        $name = "kafka-radius-shew",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $path = "shew_bro_radius"
+        $config = table(["topic_name"] = "shew_bro_radius")
+    ];
+    Log::add_filter(RADIUS::LOG, shew_radius_filter);
+
+    # Send DHCP to the shew_bro_dhcp topic
+    local shew_dhcp_filter: Log::Filter = [
+        $name = "kafka-dhcp-shew",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $path = "shew_bro_dhcp"
+        $config = table(["topic_name"] = "shew_bro_dhcp")
+    ];
+    Log::add_filter(DHCP::LOG, shew_dhcp_filter);
+}
+```
+
+_Note_:  Because `Kafka::tag_json` is set to True in this example, the value of `$path` is
used as the tag for each `Log::Filter`. If you were to add a log filter with the same `$path`
as an existing filter, Bro will append "-N", where N is an integer starting at 2, to the end
of the log path so that each filter has its own unique log path. For instance, the second
instance of `conn` would become `conn-2`.
+
 ## Settings
 
 ### `logs_to_send`
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index 1d4a28a..563ef74 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -73,6 +73,15 @@ KafkaWriter::~KafkaWriter()
   // Cleanup must happen in DoFinish, not in the destructor
 }
 
+string KafkaWriter::GetConfigValue(const WriterInfo& info, const string name) const
+{
+    map<const char*, const char*>::const_iterator it = info.config.find(name.c_str());
+    if (it == info.config.end())
+        return string();
+    else
+        return it->second;
+}
+
 /**
  * DoInit is called once for each call to the constructor, but in a separate
  * thread
@@ -82,9 +91,12 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const
threading
     // Timeformat object, default to TS_EPOCH
     threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH;
 
-    // if no global 'topic_name' is defined, use the log stream's 'path'
-    if(topic_name.empty()) {
-        topic_name = info.path;
+    // Allow overriding of the kafka topic via the Bro script constant "topic_name"
+    // which can be applied when adding a new Bro log filter.
+    topic_name_override = GetConfigValue(info, "topic_name");
+
+    if(!topic_name_override.empty()) {
+        topic_name = topic_name_override;
     }
 
     /**
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index 14e0f7e..c67c664 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -65,12 +65,14 @@ protected:
     virtual bool DoHeartbeat(double network_time, double current_time);
 
 private:
+    string GetConfigValue(const WriterInfo& info, const string name) const;
     static const string default_topic_key;
     string stream_id;
     bool tag_json;
     string json_timestamps;
     map<string, string> kafka_conf;
     string topic_name;
+    string topic_name_override;
     threading::formatter::Formatter *formatter;
     RdKafka::Producer* producer;
     RdKafka::Topic* topic;


Mime
View raw message