kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6578: Changed the Connect distributed and standalone main method to log all exceptions (#4609)
Date Fri, 23 Feb 2018 06:33:40 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new d1994f2  KAFKA-6578: Changed the Connect distributed and standalone main method to
log all exceptions (#4609)
d1994f2 is described below

commit d1994f2b476fb1d9b4a9be0ab66221970fe03ebd
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Fri Feb 23 00:29:49 2018 -0600

    KAFKA-6578: Changed the Connect distributed and standalone main method to log all exceptions
(#4609)
    
    Any exception thrown by calls within a `main()` method are not logged unless explicitly
done so. This change simply adds a try-catch block around most of the content of the distributed
and standalone `main()` methods.
---
 .../kafka/connect/cli/ConnectDistributed.java      | 81 ++++++++++---------
 .../kafka/connect/cli/ConnectStandalone.java       | 93 ++++++++++++----------
 2 files changed, 95 insertions(+), 79 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 8930602..54854fe4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
 import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
 import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
@@ -57,51 +58,59 @@ public class ConnectDistributed {
             Exit.exit(1);
         }
 
-        Time time = Time.SYSTEM;
-        log.info("Kafka Connect distributed worker initializing ...");
-        long initStart = time.hiResClockMs();
-        WorkerInfo initInfo = new WorkerInfo();
-        initInfo.logAll();
+        try {
+            Time time = Time.SYSTEM;
+            log.info("Kafka Connect distributed worker initializing ...");
+            long initStart = time.hiResClockMs();
+            WorkerInfo initInfo = new WorkerInfo();
+            initInfo.logAll();
 
-        String workerPropsFile = args[0];
-        Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String,
String>emptyMap();
+            String workerPropsFile = args[0];
+            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String,
String>emptyMap();
 
-        log.info("Scanning for plugin classes. This might take a moment ...");
-        Plugins plugins = new Plugins(workerProps);
-        plugins.compareAndSwapWithDelegatingLoader();
-        DistributedConfig config = new DistributedConfig(workerProps);
+            log.info("Scanning for plugin classes. This might take a moment ...");
+            Plugins plugins = new Plugins(workerProps);
+            plugins.compareAndSwapWithDelegatingLoader();
+            DistributedConfig config = new DistributedConfig(workerProps);
 
-        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
-        log.debug("Kafka cluster ID: {}", kafkaClusterId);
+            String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+            log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
-        RestServer rest = new RestServer(config);
-        URI advertisedUrl = rest.advertisedUrl();
-        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+            RestServer rest = new RestServer(config);
+            URI advertisedUrl = rest.advertisedUrl();
+            String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
-        offsetBackingStore.configure(config);
+            KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+            offsetBackingStore.configure(config);
 
-        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
+            Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
 
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
-        statusBackingStore.configure(config);
+            Converter internalValueConverter = worker.getInternalValueConverter();
+            StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+            statusBackingStore.configure(config);
 
-        ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(),
config);
+            ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter,
config);
 
-        DistributedHerder herder = new DistributedHerder(config, time, worker,
-                kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl.toString());
-        final Connect connect = new Connect(herder, rest);
-        log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs()
- initStart);
-        try {
-            connect.start();
-        } catch (Exception e) {
-            log.error("Failed to start Connect", e);
-            connect.stop();
-        }
+            DistributedHerder herder = new DistributedHerder(config, time, worker,
+                    kafkaClusterId, statusBackingStore, configBackingStore,
+                    advertisedUrl.toString());
+            final Connect connect = new Connect(herder, rest);
+            log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs()
- initStart);
+            try {
+                connect.start();
+            } catch (Exception e) {
+                log.error("Failed to start Connect", e);
+                connect.stop();
+                Exit.exit(3);
+            }
 
-        // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
-        connect.awaitStop();
+            // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
+            connect.awaitStop();
+
+        } catch (Throwable t) {
+            log.error("Stopping due to error", t);
+            Exit.exit(2);
+        }
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 413cb46..aba9d9c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -62,58 +62,65 @@ public class ConnectStandalone {
             Exit.exit(1);
         }
 
-        Time time = Time.SYSTEM;
-        log.info("Kafka Connect standalone worker initializing ...");
-        long initStart = time.hiResClockMs();
-        WorkerInfo initInfo = new WorkerInfo();
-        initInfo.logAll();
+        try {
+            Time time = Time.SYSTEM;
+            log.info("Kafka Connect standalone worker initializing ...");
+            long initStart = time.hiResClockMs();
+            WorkerInfo initInfo = new WorkerInfo();
+            initInfo.logAll();
 
-        String workerPropsFile = args[0];
-        Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String,
String>emptyMap();
+            String workerPropsFile = args[0];
+            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String,
String>emptyMap();
 
-        log.info("Scanning for plugin classes. This might take a moment ...");
-        Plugins plugins = new Plugins(workerProps);
-        plugins.compareAndSwapWithDelegatingLoader();
-        StandaloneConfig config = new StandaloneConfig(workerProps);
+            log.info("Scanning for plugin classes. This might take a moment ...");
+            Plugins plugins = new Plugins(workerProps);
+            plugins.compareAndSwapWithDelegatingLoader();
+            StandaloneConfig config = new StandaloneConfig(workerProps);
 
-        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
-        log.debug("Kafka cluster ID: {}", kafkaClusterId);
+            String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+            log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
-        RestServer rest = new RestServer(config);
-        URI advertisedUrl = rest.advertisedUrl();
-        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+            RestServer rest = new RestServer(config);
+            URI advertisedUrl = rest.advertisedUrl();
+            String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
+            Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
 
-        Herder herder = new StandaloneHerder(worker, kafkaClusterId);
-        final Connect connect = new Connect(herder, rest);
-        log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs()
- initStart);
+            Herder herder = new StandaloneHerder(worker, kafkaClusterId);
+            final Connect connect = new Connect(herder, rest);
+            log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs()
- initStart);
 
-        try {
-            connect.start();
-            for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length))
{
-                Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
-                FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new
Callback<Herder.Created<ConnectorInfo>>() {
-                    @Override
-                    public void onCompletion(Throwable error, Herder.Created<ConnectorInfo>
info) {
-                        if (error != null)
-                            log.error("Failed to create job for {}", connectorPropsFile);
-                        else
-                            log.info("Created connector {}", info.result().name());
-                    }
-                });
-                herder.putConnectorConfig(
-                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
-                        connectorProps, false, cb);
-                cb.get();
+            try {
+                connect.start();
+                for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length))
{
+                    Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+                    FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new
Callback<Herder.Created<ConnectorInfo>>() {
+                        @Override
+                        public void onCompletion(Throwable error, Herder.Created<ConnectorInfo>
info) {
+                            if (error != null)
+                                log.error("Failed to create job for {}", connectorPropsFile);
+                            else
+                                log.info("Created connector {}", info.result().name());
+                        }
+                    });
+                    herder.putConnectorConfig(
+                            connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                            connectorProps, false, cb);
+                    cb.get();
+                }
+            } catch (Throwable t) {
+                log.error("Stopping after connector error", t);
+                connect.stop();
+                Exit.exit(3);
             }
+
+            // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
+            connect.awaitStop();
+
         } catch (Throwable t) {
-            log.error("Stopping after connector error", t);
-            connect.stop();
+            log.error("Stopping due to error", t);
+            Exit.exit(2);
         }
-
-        // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
-        connect.awaitStop();
     }
 }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message