kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3606: Traverse CLASSPATH during herder start
Date Fri, 22 Apr 2016 00:59:28 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 327400449 -> c7f9bd2a6


KAFKA-3606: Traverse CLASSPATH during herder start

ewencp Can you take a quick look?

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1252 from Ishiihara/pre-list-connectors


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c7f9bd2a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c7f9bd2a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c7f9bd2a

Branch: refs/heads/trunk
Commit: c7f9bd2a68ea7bb604c4dcf2a2f0b030fc019ca7
Parents: 3274004
Author: Liquan Pei <liquanpei@gmail.com>
Authored: Thu Apr 21 17:59:23 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Apr 21 17:59:23 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/AbstractHerder.java   | 49 ++++++++++++++------
 1 file changed, 36 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c7f9bd2a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index a22f15c..bd73589 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -84,6 +84,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
     private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
     private static final List<Class<? extends Connector>> SKIPPED_CONNECTORS
= Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class);
     private static List<ConnectorPluginInfo> validConnectorPlugins;
+    private static final Object LOCK = new Object();
+    private Thread classPathTraverser;
+
 
     public AbstractHerder(Worker worker,
                           String workerId,
@@ -101,12 +104,20 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
         this.worker.start();
         this.statusBackingStore.start();
         this.configBackingStore.start();
+        traverseClassPath();
     }
 
     protected void stopServices() {
         this.statusBackingStore.stop();
         this.configBackingStore.stop();
         this.worker.stop();
+        if (this.classPathTraverser != null) {
+            try {
+                this.classPathTraverser.join();
+            } catch (InterruptedException e) {
+                // ignore as it can only happen during shutdown
+            }
+        }
     }
 
     @Override
@@ -248,22 +259,24 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
     }
 
     public static List<ConnectorPluginInfo> connectorPlugins() {
-        if (validConnectorPlugins != null) {
-            return validConnectorPlugins;
-        }
+        synchronized (LOCK) {
+            if (validConnectorPlugins != null) {
+                return validConnectorPlugins;
+            }
 
-        Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
-        Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class);
-        connectorClasses.removeAll(SKIPPED_CONNECTORS);
-        List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>();
-        for (Class<? extends Connector> connectorClass: connectorClasses) {
-            int mod = connectorClass.getModifiers();
-            if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) {
-                connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName()));
+            Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
+            Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class);
+            connectorClasses.removeAll(SKIPPED_CONNECTORS);
+            List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>();
+            for (Class<? extends Connector> connectorClass : connectorClasses) {
+                int mod = connectorClass.getModifiers();
+                if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) {
+                    connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName()));
+                }
             }
+            validConnectorPlugins = connectorPlugins;
+            return connectorPlugins;
         }
-        validConnectorPlugins = connectorPlugins;
-        return connectorPlugins;
     }
 
     // public for testing
@@ -354,4 +367,14 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
             return null;
         }
     }
+
+    private void traverseClassPath() {
+        classPathTraverser = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                connectorPlugins();
+            }
+        }, "CLASSPATH traversal thread.");
+        classPathTraverser.start();
+    }
 }


Mime
View raw message