pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] srkukarni commented on a change in pull request #3551: Simplified the workflow of functionruntime manager
Date Tue, 12 Feb 2019 18:27:46 GMT
srkukarni commented on a change in pull request #3551: Simplified the workflow of functionruntime
manager
URL: https://github.com/apache/pulsar/pull/3551#discussion_r256086636
 
 

 ##########
 File path: pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 ##########
 @@ -76,120 +73,74 @@
 @EqualsAndHashCode
 @ToString
 @Slf4j
-public class FunctionActioner implements AutoCloseable {
+public class FunctionActioner {
 
     private final WorkerConfig workerConfig;
     private final RuntimeFactory runtimeFactory;
     private final Namespace dlogNamespace;
-    private LinkedBlockingQueue<FunctionAction> actionQueue;
-    private volatile boolean running;
-    private Thread actioner;
     private final ConnectorsManager connectorsManager;
     private final PulsarAdmin pulsarAdmin;
 
     public FunctionActioner(WorkerConfig workerConfig,
                             RuntimeFactory runtimeFactory,
                             Namespace dlogNamespace,
-                            LinkedBlockingQueue<FunctionAction> actionQueue,
                             ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin)
{
         this.workerConfig = workerConfig;
         this.runtimeFactory = runtimeFactory;
         this.dlogNamespace = dlogNamespace;
-        this.actionQueue = actionQueue;
         this.connectorsManager = connectorsManager;
         this.pulsarAdmin = pulsarAdmin;
-        actioner = new Thread(() -> {
-            log.info("Starting Actioner Thread...");
-            while(running) {
-                try {
-                    FunctionAction action = actionQueue.poll(1, TimeUnit.SECONDS);
-                    processAction(action);
-                } catch (InterruptedException ex) {
-                }
-            }
-        });
-        actioner.setName("FunctionActionerThread");
-    }
-
-
-    void processAction(FunctionAction action) {
-        if (action == null) return;
-
-        switch (action.getAction()) {
-            case START:
-                try {
-                    startFunction(action.getFunctionRuntimeInfo());
-                } catch (Exception ex) {
-                    FunctionDetails details = action.getFunctionRuntimeInfo().getFunctionInstance()
-                            .getFunctionMetaData().getFunctionDetails();
-                    log.info("{}/{}/{} Error starting function", details.getTenant(), details.getNamespace(),
-                            details.getName(), ex);
-                    action.getFunctionRuntimeInfo().setStartupException(ex);
-                }
-                break;
-            case STOP:
-                stopFunction(action.getFunctionRuntimeInfo());
-                break;
-            case TERMINATE:
-                terminateFunction(action.getFunctionRuntimeInfo());
-                break;
-        }
     }
 
-    public void start() {
-        this.running = true;
-        actioner.start();
-    }
-
-    @Override
-    public void close() {
-        running = false;
-    }
-
-    public void join() throws InterruptedException {
-        actioner.join();
-    }
-
-    @VisibleForTesting
-    public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception {
-        FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
-        FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
-        int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
+    public boolean startFunction(FunctionRuntimeInfo functionRuntimeInfo) {
 
 Review comment:
   Changed back to void like the rest of others

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message