airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [46/48] airavata git commit: [AIRAVATA-2054][WIP] create docker images for airavata deployment components
Date Fri, 16 Sep 2016 16:12:21 GMT
[AIRAVATA-2054][WIP] create docker images for airavata deployment components

1. Introduce Docker images for each deployment component of airavata.
2. Deployed those in docker hub repository (scigap),
	try: docker search scigap
3. Use exhibitor docker images intead of zookeeper which is a much better
compare to using vanilla zookeeper.
http://techblog.netflix.com/2012/04/introducing-exhibitor-supervisor-system.html

4. IMHO we should never use docker images from public repository, Everything
we should create our own docker images from public images and test with those and
move to production.

5. Added a simple script(airavata/build.sh) to build airavata docker components.

      ./build.sh [component-name] - This will build a docker image for given component.

This is a temporary script we can use until AIRAVATA-2056 which integrates docker push with
some CI tool like jenkins.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e27c54ea
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e27c54ea
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e27c54ea

Branch: refs/heads/lahiru/AIRAVATA-2065
Commit: e27c54eafbb443950d213d06afb5e1d43ce72ce7
Parents: 8474763
Author: Lahiru Ginnaliya Gamathige <lahiru@apache.org>
Authored: Sun Aug 21 00:49:58 2016 -0700
Committer: Lahiru Ginnaliya Gamathige <lahiru@apache.org>
Committed: Mon Sep 12 23:27:52 2016 -0700

----------------------------------------------------------------------
 deploy/images/kafka/Dockerfile                  |   8 ++
 deploy/images/kafka/start-kafka.sh              |  17 +++
 deploy/images/logstash/Dockerfile               |   5 +
 deploy/images/logstash/elasticsearchpolicy      |  27 ++++
 deploy/images/logstash/logstash-elastic.conf    |  94 ++++++++++++++
 .../logstash-output-amazon_es-0.3-java.gem      | Bin 0 -> 22016 bytes
 deploy/images/logstash/logstash.conf            |  97 +++++++++++++++
 deploy/systemd/kafka-manager.service            |  15 +++
 deploy/systemd/kafka.service                    |  13 ++
 deploy/systemd/logstash.service                 |  16 +++
 .../common/logging/kafka/KafkaAppender.java     | 122 +++++++++++++++++++
 11 files changed, 414 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e27c54ea/deploy/images/kafka/Dockerfile
----------------------------------------------------------------------
diff --git a/deploy/images/kafka/Dockerfile b/deploy/images/kafka/Dockerfile
new file mode 100644
index 0000000..59770e1
--- /dev/null
+++ b/deploy/images/kafka/Dockerfile
@@ -0,0 +1,8 @@
+FROM scigap/java:8
+ENV ZOOKEEPER localhost:2181/kafka
+ENV LOG_DIRS /var/lib/kafka
+ENV JMX_PORT 9999
+ADD start-kafka.sh /start-kafka.sh
+RUN mkdir -p /opt/kafka && curl http://mirrors.sonic.net/apache/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz
> /tmp/kafka.tgz && \
+    tar -zxf /tmp/kafka.tgz  -C /opt/kafka --strip-components=1 && rm -f /tmp/kafka.tgz
+ENTRYPOINT /start-kafka.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/e27c54ea/deploy/images/kafka/start-kafka.sh
----------------------------------------------------------------------
diff --git a/deploy/images/kafka/start-kafka.sh b/deploy/images/kafka/start-kafka.sh
new file mode 100755
index 0000000..6026010
--- /dev/null
+++ b/deploy/images/kafka/start-kafka.sh
@@ -0,0 +1,17 @@
+#!/bin/sh
+
+if test -z "${BROKER_ID}"; then
+  BROKER_ID=$(ip a | grep 'eth0' | awk '/inet /{print substr($2,4)}'| sed 's/\///g' | head
-n1 | tr -d .)
+fi
+
+mkdir -p /opt/kafka/etc
+cat <<EOF > /opt/kafka/etc/server.properties
+broker.id=${BROKER_ID}
+zookeeper.connect=${ZOOKEEPER}
+log.dirs=${LOG_DIRS}
+num.partitions=2
+default.replication.factor=1
+advertised.host.name=${ADVERTISED_HOST_NAME}
+EOF
+
+exec /opt/kafka/bin/kafka-server-start.sh /opt/kafka/etc/server.properties

http://git-wip-us.apache.org/repos/asf/airavata/blob/e27c54ea/deploy/images/logstash/Dockerfile
----------------------------------------------------------------------
diff --git a/deploy/images/logstash/Dockerfile b/deploy/images/logstash/Dockerfile
new file mode 100644
index 0000000..6fc9bd3
--- /dev/null
+++ b/deploy/images/logstash/Dockerfile
@@ -0,0 +1,5 @@
+FROM logstash:2.3.4
+
+RUN /opt/logstash/bin/plugin install logstash-output-amazon_es
+RUN /opt/logstash/bin/plugin install logstash-codec-avro logstash-codec-cloudtrail logstash-input-journald
+ENTRYPOINT []

http://git-wip-us.apache.org/repos/asf/airavata/blob/e27c54ea/deploy/images/logstash/elasticsearchpolicy
----------------------------------------------------------------------
diff --git a/deploy/images/logstash/elasticsearchpolicy b/deploy/images/logstash/elasticsearchpolicy
new file mode 100644
index 0000000..8852280
--- /dev/null
+++ b/deploy/images/logstash/elasticsearchpolicy
@@ -0,0 +1,27 @@
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Principal": {
+        "AWS": "arn:aws:iam::691488976375:root"
+      },
+      "Action": "es:*",
+      "Resource": "arn:aws:es:us-east-1:691488976375:domain/scigap/*"
+    },
+    {
+      "Sid": "",
+      "Effect": "Allow",
+      "Principal": {
+        "AWS": "*"
+      },
+      "Action": "es:*",
+      "Resource": "arn:aws:es:us-east-1:691488976375:domain/scigap/*",
+      "Condition": {
+        "IpAddress": {
+          "aws:SourceIp": "50.200.229.250"
+        }
+      }
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/e27c54ea/deploy/images/logstash/logstash-elastic.conf
----------------------------------------------------------------------
diff --git a/deploy/images/logstash/logstash-elastic.conf b/deploy/images/logstash/logstash-elastic.conf
new file mode 100644
index 0000000..2871657
--- /dev/null
+++ b/deploy/images/logstash/logstash-elastic.conf
@@ -0,0 +1,94 @@
+input {
+  kafka {
+    topic_id => "local_all_logs"
+    zk_connect => "127.0.0.1:2181"
+    auto_offset_reset => "smallest"
+    type => "all_logs"
+  }
+  kafka {
+    topic_id => "local_apiserver_logs"
+    zk_connect => "127.0.0.1:2181"
+    auto_offset_reset => "smallest"
+    type => "apiserver_logs"
+  }
+  kafka {
+    topic_id => "local_gfac_logs"
+    zk_connect => "127.0.0.1:2181"
+    auto_offset_reset => "smallest"
+    type => "gfac_logs"
+  }
+  kafka {
+    topic_id => "local_orchestrator_logs"
+    zk_connect => "127.0.0.1:2181"
+    auto_offset_reset => "smallest"
+    type => "orchestrator_logs"
+  }
+  kafka {
+    topic_id => "local_credentialstore_logs"
+    zk_connect => "127.0.0.1:2181"
+    auto_offset_reset => "smallest"
+    type => "credentialstore_logs"
+  }
+}
+
+filter {
+  mutate { add_field => { "[@metadata][level]" => "%{[level]}" } }
+  mutate { lowercase => ["[@metadata][level]"] }
+  mutate { gsub => ["level", "LOG_", ""] }
+  mutate {
+    add_tag => ["local", "CoreOS-899.13.0"]
+  }
+  ruby {
+    code => "
+    begin
+    t = Time.iso8601(event['timestamp'])
+    rescue ArgumentError => e
+    # drop the event if format is invalid
+    event.cancel
+    return
+    end
+    event['timestamp_usec'] = t.usec % 1000
+    event['timestamp'] = t.utc.strftime('%FT%T.%LZ')
+    "
+  }
+}
+
+output {
+  stdout { codec => rubydebug }
+  if [type] == "apiserver_logs" {
+    elasticsearch {
+      hosts => ["d5b696fac75ae2f1dda3c515ba904ff4.us-east-1.aws.found.io:9200"]
+      user => "admin"
+      password => "15tij9wc26p2qf3fgm"
+      index => "local-apiserver-logs-logstash-%{+YYYY.MM.dd}"
+    }
+  } else if [type] == "gfac_logs" {
+    elasticsearch {
+      hosts => ["d5b696fac75ae2f1dda3c515ba904ff4.us-east-1.aws.found.io:9200"]
+      user => "admin"
+      password => "15tij9wc26p2qf3fgm"
+      index => "local-gfac-logs-logstash-%{+YYYY.MM.dd}"
+    }
+  } else if [type] == "orchestrator_logs" {
+    elasticsearch {
+      hosts => ["d5b696fac75ae2f1dda3c515ba904ff4.us-east-1.aws.found.io:9200"]
+      user => "admin"
+      password => "15tij9wc26p2qf3fgm"
+      index => "local-orchestrator-logs-logstash-%{+YYYY.MM.dd}"
+    }
+  } else if [type] == "credentialstore_logs" {
+    elasticsearch {
+      hosts => ["d5b696fac75ae2f1dda3c515ba904ff4.us-east-1.aws.found.io:9200"]
+      user => "admin"
+      password => "15tij9wc26p2qf3fgm"
+      index => "local-credentialstore-logs-logstash-%{+YYYY.MM.dd}"
+    }
+  } else {
+  elasticsearch {
+    hosts => ["d5b696fac75ae2f1dda3c515ba904ff4.us-east-1.aws.found.io:9200"]
+    user => "admin"
+    password => "15tij9wc26p2qf3fgm"
+    index => "local-airavata-logs-logstash-%{+YYYY.MM.dd}"
+  }
+}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/e27c54ea/deploy/images/logstash/logstash-output-amazon_es-0.3-java.gem
----------------------------------------------------------------------
diff --git a/deploy/images/logstash/logstash-output-amazon_es-0.3-java.gem b/deploy/images/logstash/logstash-output-amazon_es-0.3-java.gem
new file mode 100644
index 0000000..d3c913a
Binary files /dev/null and b/deploy/images/logstash/logstash-output-amazon_es-0.3-java.gem
differ

http://git-wip-us.apache.org/repos/asf/airavata/blob/e27c54ea/deploy/images/logstash/logstash.conf
----------------------------------------------------------------------
diff --git a/deploy/images/logstash/logstash.conf b/deploy/images/logstash/logstash.conf
new file mode 100644
index 0000000..6dd3c41
--- /dev/null
+++ b/deploy/images/logstash/logstash.conf
@@ -0,0 +1,97 @@
+input {
+  kafka {
+    topic_id => "local_all_logs"
+    zk_connect => "107.20.88.15:2181/kafka"
+    auto_offset_reset => "smallest"
+    type => "all_logs"
+  }
+  kafka {
+    topic_id => "local_apiserver_logs"
+    zk_connect => "107.20.88.15:2181/kafka"
+    auto_offset_reset => "smallest"
+    type => "apiserver_logs"
+  }
+  kafka {
+    topic_id => "local_gfac_logs"
+    zk_connect => "107.20.88.15:2181/kafka"
+    auto_offset_reset => "smallest"
+    type => "gfac_logs"
+  }
+  kafka {
+    topic_id => "local_orchestrator_logs"
+    zk_connect => "107.20.88.15:2181/kafka"
+    auto_offset_reset => "smallest"
+    type => "orchestrator_logs"
+  }
+  kafka {
+    topic_id => "local_credentialstore_logs"
+    zk_connect => "107.20.88.15:2181/kafka"
+    auto_offset_reset => "smallest"
+    type => "credentialstore_logs"
+  }
+}
+
+filter {
+  mutate { add_field => { "[@metadata][level]" => "%{[level]}" } }
+  mutate { lowercase => ["[@metadata][level]"] }
+  mutate { gsub => ["level", "LOG_", ""] }
+  mutate {
+    add_tag => ["local", "CoreOS-899.13.0"]
+  }
+  ruby {
+    code => "
+    begin
+    t = Time.iso8601(event['timestamp'])
+    rescue ArgumentError => e
+    # drop the event if format is invalid
+    event.cancel
+    return
+    end
+    event['timestamp_usec'] = t.usec % 1000
+    event['timestamp'] = t.utc.strftime('%FT%T.%LZ')
+    "
+  }
+}
+
+output {
+  if [type] == "apiserver_logs" {
+    if [@metadata][level] == "debug" {
+      amazon_es {
+        hosts => ["search-scigap1-je4ln2j5dwlibskeuheh7nr2sa.us-east-1.es.amazonaws.com"]
+        region => "us-east-1"
+        index => "local-apiserver-logs-logstash-%{+YYYY.MM.dd}"
+      }
+    }
+  } else if [type] == "gfac_logs" {
+    if [@metadata][level] == "debug" {
+      amazon_es {
+        hosts => ["search-scigap1-je4ln2j5dwlibskeuheh7nr2sa.us-east-1.es.amazonaws.com"]
+        region => "us-east-1"
+        index => "local-gfac-logs-logstash-%{+YYYY.MM.dd}"
+      }
+    }
+  } else if [type] == "orchestrator_logs" {
+    if [@metadata][level] == "debug" {
+      amazon_es {
+        hosts => ["search-scigap1-je4ln2j5dwlibskeuheh7nr2sa.us-east-1.es.amazonaws.com"]
+        region => "us-east-1"
+        index => "local-orchestrator-logs-logstash-%{+YYYY.MM.dd}"
+            }
+    }
+  } else if [type] == "credentialstore_logs" {
+    if [@metadata][level] == "debug" {
+      amazon_es {
+        hosts => ["search-scigap1-je4ln2j5dwlibskeuheh7nr2sa.us-east-1.es.amazonaws.com"]
+        region => "us-east-1"
+        index => "local-credentialstore-logs-logstash-%{+YYYY.MM.dd}"
+      }
+    }
+  } else {
+  amazon_es {
+    hosts => ["search-scigap1-je4ln2j5dwlibskeuheh7nr2sa.us-east-1.es.amazonaws.com"]
+    region => "us-east-1"
+    index => "local-airavata-logs-logstash-%{+YYYY.MM.dd}"
+  }
+}
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/e27c54ea/deploy/systemd/kafka-manager.service
----------------------------------------------------------------------
diff --git a/deploy/systemd/kafka-manager.service b/deploy/systemd/kafka-manager.service
new file mode 100644
index 0000000..1bed655
--- /dev/null
+++ b/deploy/systemd/kafka-manager.service
@@ -0,0 +1,15 @@
+[Unit]
+Description=kafka-manager
+After=docker.service
+Requires=docker.service
+[Service]
+EnvironmentFile=/etc/environment
+TimeoutStartSec=60
+Restart=on-failure
+ExecStartPre=-/usr/bin/docker rm -f kafka-manager
+ExecStartPre=-/usr/bin/docker pull sheepkiller/kafka-manager
+ExecStart=/usr/bin/docker run --name kafka-manager -p 9000:9000 -e ZK_HOSTS=localhost:2181
-e APPLICATION_SECRET=face2face sheepkiller/kafka-manager
+ExecStop=/usr/bin/docker stop kafka-manager
+[Install]
+WantedBy=multi-user.target
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/e27c54ea/deploy/systemd/kafka.service
----------------------------------------------------------------------
diff --git a/deploy/systemd/kafka.service b/deploy/systemd/kafka.service
new file mode 100644
index 0000000..cad29e5
--- /dev/null
+++ b/deploy/systemd/kafka.service
@@ -0,0 +1,13 @@
+[Unit]
+Description=Kafka
+Requires=docker.service
+[Service]
+EnvironmentFile=/etc/environment
+TimeoutStartSec=60
+Restart=on-failure
+ExecStartPre=-/usr/bin/docker rm -f kafka
+ExecStartPre=-/usr/bin/docker pull scigap/kafka
+ExecStart=/usr/bin/docker run --net=host --name kafka -e ADVERTISED_HOST_NAME=54.163.192.179
-v /var/lib/kafka:/var/lib/kafka scigap/kafka
+ExecStop=/usr/bin/docker stop kafka
+[Install]
+WantedBy=multi-user.target

http://git-wip-us.apache.org/repos/asf/airavata/blob/e27c54ea/deploy/systemd/logstash.service
----------------------------------------------------------------------
diff --git a/deploy/systemd/logstash.service b/deploy/systemd/logstash.service
new file mode 100644
index 0000000..7ed4955
--- /dev/null
+++ b/deploy/systemd/logstash.service
@@ -0,0 +1,16 @@
+[Unit]
+Description=logstash
+Requires=docker.service
+After=docker.service
+[Service]
+EnvironmentFile=/etc/os-release
+LimitNOFILE=infinity
+TimeoutStartSec=90
+Restart=on-failure
+ExecStartPre=-/usr/bin/docker rm -f logstash
+ExecStartPre=-/usr/bin/docker pull scigap/logstash
+ExecStart=/usr/bin/docker run --name logstash -e ENV_NAME=local -e OS_NAME=${NAME} -e OS_VERSION=${VERSION}
\
+    -e KAFKA_ZK="localhost:2181/kafka" -e ES_ENDPOINT="search-scigap-62tebdueebw5dfyn7bfyn63rru.us-east-1.es.amazonaws.com"
\
+    -v /etc/logstash:/config -v /var/log/journal:/var/log/journal -v /var/lib/logstash:/var/lib/logstash
\
+    scigap/logstash logstash -f /config/logstash.conf
+ExecStop=/usr/bin/docker stop logstash

http://git-wip-us.apache.org/repos/asf/airavata/blob/e27c54ea/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java
b/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java
new file mode 100644
index 0000000..06649c6
--- /dev/null
+++ b/modules/commons/src/main/java/org/apache/airavata/common/logging/kafka/KafkaAppender.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.common.logging.kafka;
+
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.classic.spi.IThrowableProxy;
+import ch.qos.logback.classic.spi.StackTraceElementProxy;
+import ch.qos.logback.core.UnsynchronizedAppenderBase;
+import com.google.gson.Gson;
+import org.apache.airavata.common.logging.Exception;
+import org.apache.airavata.common.logging.LogEntry;
+import org.apache.airavata.common.logging.ServerId;
+import org.apache.airavata.common.utils.AwsMetadata;
+import org.apache.airavata.common.utils.BuildConstant;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Properties;
+
+public class KafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
+    private final static Logger logger = LoggerFactory.getLogger(KafkaAppender.class);
+
+    private final Producer<String, String> producer;
+    private final String kafkaTopic;
+
+    private  ServerId serverId = null;
+
+    public KafkaAppender(String kafkaHost, String kafkaTopicPrefix) {
+        Properties props = new Properties();
+        props.put("bootstrap.servers", kafkaHost);
+        props.put("acks", "0");
+        props.put("retries", 0);
+        props.put("batch.size", 16384);
+        props.put("linger.ms", 10000); // Send the batch every 10 seconds
+        props.put("buffer.memory", 33554432);
+        props.put("producer.type", "async");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        this.kafkaTopic = getKafkaTopic(kafkaTopicPrefix);
+        logger.info("Starting kafka producer: bootstrap-server:{}, topic : {}", kafkaHost,
this.kafkaTopic);
+        this.producer = new KafkaProducer<>(props);
+        if(ServerSettings.isRunningOnAws()) {
+            final AwsMetadata awsMetadata = new AwsMetadata();
+            serverId = new ServerId(awsMetadata.getId(), awsMetadata.getHostname(),
+                    BuildConstant.VERSION, ServerSettings.getServerRoles());
+        } else {
+            serverId = new ServerId(ServerSettings.getIp(), ServerSettings.getIp(),
+                    BuildConstant.VERSION, ServerSettings.getServerRoles());
+        }
+    }
+
+    @Override
+    protected void append(ILoggingEvent event) {
+        event.prepareForDeferredProcessing();
+        //todo do more elegant streaming approach to publish logs
+        if (!event.getLevel().equals(Level.ALL) &&         // OFF AND ALL are not
loggable levels
+                !event.getLevel().equals(Level.OFF)) {
+            final IThrowableProxy throwableProxy = event.getThrowableProxy();
+            final LogEntry entry = throwableProxy != null ?
+                    new LogEntry(serverId, event.getMessage(), Instant.ofEpochMilli(event.getTimeStamp()).toString(),
+                            event.getLevel().toString(), event.getLoggerName(), event.getMDCPropertyMap(),
+                            event.getThreadName() != null ? event.getThreadName() : null,
+                            new Exception(throwableProxy.getMessage(), toStringArray(throwableProxy.getStackTraceElementProxyArray())
+                            , throwableProxy.getClassName()))
+                    : new LogEntry(serverId, event.getMessage(), Instant.ofEpochMilli(event.getTimeStamp()).toString(),
+                    event.getLevel().toString(), event.getLoggerName(), event.getMDCPropertyMap(),
+                    event.getThreadName() != null ? event.getThreadName() : null);
+            producer.send(new ProducerRecord<>(kafkaTopic, new Gson().toJson(entry)));
+        }
+    }
+
+
+    private String[] toStringArray(StackTraceElementProxy[] stackTraceElement) {
+        return Arrays.stream(stackTraceElement).map(StackTraceElementProxy::getSTEAsString).toArray(String[]::new);
+    }
+
+    private String getKafkaTopic(String kafkaTopicPrefix) {
+        final StringBuilder stringBuffer = new StringBuilder("");
+        final String[] serverRoles = ServerSettings.getServerRoles();
+        if (serverRoles.length == 4) {
+            return kafkaTopicPrefix + "_all";
+        }
+        for (String role : ServerSettings.getServerRoles()) {
+            stringBuffer.append("_");
+            stringBuffer.append(role);
+            stringBuffer.append("_logs");
+            // do not support multiple roles yet, topic name will become complex
+            break;
+        }
+        return kafkaTopicPrefix + stringBuffer.toString();
+    }
+
+    public void close() {
+        producer.close();
+    }
+}


Mime
View raw message