pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] jerrypeng commented on a change in pull request #3551: Simplified the workflow of functionruntime manager
Date Fri, 08 Feb 2019 19:55:16 GMT
jerrypeng commented on a change in pull request #3551: Simplified the workflow of functionruntime
manager
URL: https://github.com/apache/pulsar/pull/3551#discussion_r255216131
 
 

 ##########
 File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 ##########
 @@ -76,120 +73,74 @@
 @EqualsAndHashCode
 @ToString
 @Slf4j
-public class FunctionActioner implements AutoCloseable {
+public class FunctionActioner {
 
     private final WorkerConfig workerConfig;
     private final RuntimeFactory runtimeFactory;
     private final Namespace dlogNamespace;
-    private LinkedBlockingQueue<FunctionAction> actionQueue;
-    private volatile boolean running;
-    private Thread actioner;
     private final ConnectorsManager connectorsManager;
     private final PulsarAdmin pulsarAdmin;
 
     public FunctionActioner(WorkerConfig workerConfig,
                             RuntimeFactory runtimeFactory,
                             Namespace dlogNamespace,
-                            LinkedBlockingQueue<FunctionAction> actionQueue,
                             ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin)
{
         this.workerConfig = workerConfig;
         this.runtimeFactory = runtimeFactory;
         this.dlogNamespace = dlogNamespace;
-        this.actionQueue = actionQueue;
         this.connectorsManager = connectorsManager;
         this.pulsarAdmin = pulsarAdmin;
-        actioner = new Thread(() -> {
-            log.info("Starting Actioner Thread...");
-            while(running) {
-                try {
-                    FunctionAction action = actionQueue.poll(1, TimeUnit.SECONDS);
-                    processAction(action);
-                } catch (InterruptedException ex) {
-                }
-            }
-        });
-        actioner.setName("FunctionActionerThread");
-    }
-
-
-    void processAction(FunctionAction action) {
-        if (action == null) return;
-
-        switch (action.getAction()) {
-            case START:
-                try {
-                    startFunction(action.getFunctionRuntimeInfo());
-                } catch (Exception ex) {
-                    FunctionDetails details = action.getFunctionRuntimeInfo().getFunctionInstance()
-                            .getFunctionMetaData().getFunctionDetails();
-                    log.info("{}/{}/{} Error starting function", details.getTenant(), details.getNamespace(),
-                            details.getName(), ex);
-                    action.getFunctionRuntimeInfo().setStartupException(ex);
-                }
-                break;
-            case STOP:
-                stopFunction(action.getFunctionRuntimeInfo());
-                break;
-            case TERMINATE:
-                terminateFunction(action.getFunctionRuntimeInfo());
-                break;
-        }
     }
 
-    public void start() {
-        this.running = true;
-        actioner.start();
-    }
-
-    @Override
-    public void close() {
-        running = false;
-    }
-
-    public void join() throws InterruptedException {
-        actioner.join();
-    }
-
-    @VisibleForTesting
-    public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception {
-        FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
-        FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
-        int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
+    public boolean startFunction(FunctionRuntimeInfo functionRuntimeInfo) {
 
 Review comment:
   Why do we need to change startFunction to return a boolean?  Other functions i.e stopFunction
and terminateFunction aren't returning booleans

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message