From commits-return-14629-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Wed Sep 19 00:22:21 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2B394180672 for ; Wed, 19 Sep 2018 00:22:19 +0200 (CEST) Received: (qmail 61310 invoked by uid 500); 18 Sep 2018 22:22:18 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 61300 invoked by uid 99); 18 Sep 2018 22:22:18 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Sep 2018 22:22:18 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 7260B82A62; Tue, 18 Sep 2018 22:22:17 +0000 (UTC) Date: Tue, 18 Sep 2018 22:22:17 +0000 To: "commits@pulsar.apache.org" Subject: [incubator-pulsar] branch master updated: [Function] avoid creating assignment snapshot and publish individual assigment msg (#2549) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153730933727.23824.7590862519471162940@gitbox.apache.org> From: rdhabalia@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pulsar X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 7530d64a679a0783122b18058c1148c89c0fee0a X-Git-Newrev: 49fc5e508a996cfe59949effbcaf0abfa46028ce X-Git-Rev: 49fc5e508a996cfe59949effbcaf0abfa46028ce X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 49fc5e5 [Function] avoid creating assignment snapshot and publish individual assigment msg (#2549) 49fc5e5 is described below commit 49fc5e508a996cfe59949effbcaf0abfa46028ce Author: Rajan Dhabalia AuthorDate: Tue Sep 18 15:22:12 2018 -0700 [Function] avoid creating assignment snapshot and publish individual assigment msg (#2549) Fix: Compaction with last deleted keys not completing compaction Delete assignment with empty payload --- conf/functions_worker.yml | 2 + .../pulsar/compaction/TwoPhaseCompactor.java | 2 +- .../apache/pulsar/compaction/CompactionTest.java | 2 +- .../worker/PulsarWorkerAssignmentTest.java | 370 +++++++++++++++++++++ .../proto/src/main/proto/Request.proto | 5 - .../functions/worker/FunctionAssignmentTailer.java | 47 ++- .../functions/worker/FunctionRuntimeManager.java | 283 +++++++--------- .../pulsar/functions/worker/SchedulerManager.java | 126 ++++--- .../pulsar/functions/worker/WorkerConfig.java | 5 +- .../pulsar/functions/worker/WorkerService.java | 15 +- .../worker/scheduler/RoundRobinScheduler.java | 10 +- .../worker/FunctionRuntimeManagerTest.java | 40 +-- .../functions/worker/MembershipManagerTest.java | 3 + .../functions/worker/SchedulerManagerTest.java | 298 +++++++++-------- 14 files changed, 791 insertions(+), 417 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 444b7fb..0c7b8af 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -44,6 +44,8 @@ rescheduleTimeoutMs: 60000 initialBrokerReconnectMaxRetries: 60 assignmentWriteMaxRetries: 60 instanceLivenessCheckFreqMs: 30000 +# Frequency how often worker performs compaction on function-topics +topicCompactionFrequencySec: 1800 metricsSamplingPeriodSec: 60 # Enforce authentication authenticationEnabled: false diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 425e049..612b336 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -359,4 +359,4 @@ public class TwoPhaseCompactor extends Compactor { this.latestForKey = latestForKey; } } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 32c93b4..8e7b292 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1268,4 +1268,4 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { } } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java new file mode 100644 index 0000000..fe28a51 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +import java.io.File; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.bookkeeper.test.PortManager; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import org.apache.pulsar.client.admin.BrokerStats; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.functions.api.utils.IdentityFunction; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.Assignment; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.proto.Function.SourceSpec; +import org.apache.pulsar.functions.sink.PulsarSink; +import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.gson.Gson; + +import jersey.repackaged.com.google.common.collect.Lists; + +/** + * Test Pulsar sink on function + * + */ +public class PulsarWorkerAssignmentTest { + LocalBookkeeperEnsemble bkEnsemble; + + ServiceConfiguration config; + WorkerConfig workerConfig; + PulsarService pulsar; + PulsarAdmin admin; + PulsarClient pulsarClient; + BrokerStats brokerStatsClient; + WorkerService functionsWorkerService; + final String tenant = "external-repl-prop"; + String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; + String primaryHost; + String workerId; + + private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); + private final int brokerWebServicePort = PortManager.nextFreePort(); + private final int brokerServicePort = PortManager.nextFreePort(); + private final int workerServicePort = PortManager.nextFreePort(); + + private static final Logger log = LoggerFactory.getLogger(PulsarWorkerAssignmentTest.class); + + @BeforeMethod + void setup(Method method) throws Exception { + + log.info("--- Setting up method {} ---", method.getName()); + + // Start local bookkeeper ensemble + bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); + bkEnsemble.start(); + + String brokerServiceUrl = "http://127.0.0.1:" + brokerServicePort; + String brokerWeServiceUrl = "http://127.0.0.1:" + brokerWebServicePort; + + config = spy(new ServiceConfiguration()); + config.setClusterName("use"); + Set superUsers = Sets.newHashSet("superUser"); + config.setSuperUserRoles(superUsers); + config.setWebServicePort(brokerWebServicePort); + config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); + config.setBrokerServicePort(brokerServicePort); + config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); + + functionsWorkerService = createPulsarFunctionWorker(config); + Optional functionWorkerService = Optional.of(functionsWorkerService); + pulsar = new PulsarService(config, functionWorkerService); + pulsar.start(); + + admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerWeServiceUrl).build()); + + brokerStatsClient = admin.brokerStats(); + primaryHost = String.format("http://%s:%d", InetAddress.getLocalHost().getHostName(), brokerWebServicePort); + + // update cluster metadata + ClusterData clusterData = new ClusterData(brokerServiceUrl); + admin.clusters().updateCluster(config.getClusterName(), clusterData); + + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl()); + pulsarClient = clientBuilder.build(); + + TenantInfo propAdmin = new TenantInfo(); + propAdmin.getAdminRoles().add("superUser"); + propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); + admin.tenants().updateTenant(tenant, propAdmin); + + Thread.sleep(100); + } + + @AfterMethod + void shutdown() throws Exception { + log.info("--- Shutting down ---"); + pulsarClient.close(); + admin.close(); + functionsWorkerService.stop(); + pulsar.close(); + bkEnsemble.stop(); + } + + private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) { + workerConfig = new WorkerConfig(); + workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace); + workerConfig.setSchedulerClassName( + org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName()); + workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use")); + // worker talks to local broker + workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort()); + workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort()); + workerConfig.setFailureCheckFreqMs(100); + workerConfig.setNumFunctionPackageReplicas(1); + workerConfig.setClusterCoordinationTopicName("coordinate"); + workerConfig.setFunctionAssignmentTopicName("assignment"); + workerConfig.setFunctionMetadataTopicName("metadata"); + workerConfig.setInstanceLivenessCheckFreqMs(100); + workerConfig.setWorkerPort(workerServicePort); + workerConfig.setPulsarFunctionsCluster(config.getClusterName()); + String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); + this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort(); + workerConfig.setWorkerHostname(hostname); + workerConfig.setWorkerId(workerId); + workerConfig.setTopicCompactionFrequencySec(1); + + return new WorkerService(workerConfig); + } + + @Test + public void testFunctionAssignments() throws Exception { + + final String namespacePortion = "assignment-test"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sinkTopic = "persistent://" + replNamespace + "/my-topic1"; + final String functionName = "assign"; + final String subscriptionName = "test-sub"; + admin.namespaces().createNamespace(replNamespace); + Set clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + String jarFilePathUrl = Utils.FILE + ":" + + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + FunctionDetails.Builder functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, tenant, namespacePortion, + functionName, "my.*", sinkTopic, subscriptionName); + functionDetailsBuilder.setParallelism(2); + FunctionDetails functionDetails = functionDetailsBuilder.build(); + + // (1) Create function with 2 instance + admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); + retryStrategically((test) -> { + try { + return admin.topics().getStats(sinkTopic).subscriptions.size() == 1 + && admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers + .size() == 2; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + // validate 2 instances have been started + assertEquals(admin.topics().getStats(sinkTopic).subscriptions.size(), 1); + assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(), 2); + + // (2) Update function with 1 instance + functionDetailsBuilder.setParallelism(1); + functionDetails = functionDetailsBuilder.build(); + // try to update function to test: update-function functionality + admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl); + retryStrategically((test) -> { + try { + return admin.topics().getStats(sinkTopic).subscriptions.size() == 1 + && admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers + .size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + // validate pulsar sink consumer has started on the topic + assertEquals(admin.topics().getStats(sinkTopic).subscriptions.values().iterator().next().consumers.size(), 1); + } + + @Test(timeOut=20000) + public void testFunctionAssignmentsWithRestart() throws Exception { + + final String namespacePortion = "assignment-test"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sinkTopic = "persistent://" + replNamespace + "/my-topic1"; + final String baseFunctionName = "assign-restart"; + final String subscriptionName = "test-sub"; + final int totalFunctions = 5; + final int parallelism = 2; + admin.namespaces().createNamespace(replNamespace); + Set clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + final FunctionRuntimeManager runtimeManager = functionsWorkerService.getFunctionRuntimeManager(); + + String jarFilePathUrl = Utils.FILE + ":" + + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + FunctionDetails.Builder functionDetailsBuilder = null; + // (1) Register functions with 2 instances + for (int i = 0; i < totalFunctions; i++) { + String functionName = baseFunctionName + i; + functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, tenant, namespacePortion, functionName, + "my.*", sinkTopic, subscriptionName); + functionDetailsBuilder.setParallelism(parallelism); + // set-auto-ack prop =true + functionDetailsBuilder.setAutoAck(true); + FunctionDetails functionDetails = functionDetailsBuilder.build(); + admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); + } + retryStrategically((test) -> { + try { + Map assgn = runtimeManager.getCurrentAssignments().values().iterator().next(); + return assgn.size() == (totalFunctions * parallelism); + } catch (Exception e) { + return false; + } + }, 5, 150); + + // Validate registered assignments + Map assignments = runtimeManager.getCurrentAssignments().values().iterator().next(); + assertEquals(assignments.size(), (totalFunctions * parallelism)); + + // (2) Update function with prop=auto-ack and Delete 2 functions + for (int i = 0; i < totalFunctions; i++) { + String functionName = baseFunctionName + i; + functionDetailsBuilder = createFunctionDetails(jarFilePathUrl, tenant, namespacePortion, functionName, + "my.*", sinkTopic, subscriptionName); + functionDetailsBuilder.setParallelism(parallelism); + // set-auto-ack prop =false + functionDetailsBuilder.setAutoAck(false); + FunctionDetails functionDetails = functionDetailsBuilder.build(); + admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl); + } + + int totalDeletedFunction = 2; + for (int i = (totalFunctions - 1); i >= (totalFunctions - totalDeletedFunction); i--) { + String functionName = baseFunctionName + i; + admin.functions().deleteFunction(tenant, namespacePortion, functionName); + } + retryStrategically((test) -> { + try { + Map assgn = runtimeManager.getCurrentAssignments().values().iterator().next(); + return assgn.size() == ((totalFunctions - totalDeletedFunction) * parallelism); + } catch (Exception e) { + return false; + } + }, 5, 150); + + // Validate registered assignments + assignments = runtimeManager.getCurrentAssignments().values().iterator().next(); + assertEquals(assignments.size(), ((totalFunctions - totalDeletedFunction) * parallelism)); + + // (3) Restart worker service and check registered functions + URI dlUri = functionsWorkerService.getDlogUri(); + functionsWorkerService.stop(); + functionsWorkerService = new WorkerService(workerConfig); + functionsWorkerService.start(dlUri); + FunctionRuntimeManager runtimeManager2 = functionsWorkerService.getFunctionRuntimeManager(); + retryStrategically((test) -> { + try { + Map assgn = runtimeManager2.getCurrentAssignments().values().iterator().next(); + return assgn.size() == ((totalFunctions - totalDeletedFunction) * parallelism); + } catch (Exception e) { + return false; + } + }, 5, 150); + + // Validate registered assignments + assignments = runtimeManager2.getCurrentAssignments().values().iterator().next(); + assertEquals(assignments.size(), ((totalFunctions - totalDeletedFunction) * parallelism)); + + // validate updated function prop = auto-ack=false and instnaceid + for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) { + String functionName = baseFunctionName + i; + assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getAutoAck()); + } + } + + protected static FunctionDetails.Builder createFunctionDetails(String jarFile, String tenant, String namespace, + String functionName, String sourceTopic, String sinkTopic, String subscriptionName) { + + File file = new File(jarFile); + try { + Reflections.loadJar(file); + } catch (MalformedURLException e) { + throw new RuntimeException("Failed to load user jar " + file, e); + } + String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic); + Class typeArg = byte[].class; + + FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); + functionDetailsBuilder.setTenant(tenant); + functionDetailsBuilder.setNamespace(namespace); + functionDetailsBuilder.setName(functionName); + functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); + functionDetailsBuilder.setParallelism(1); + functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); + + // set source spec + // source spec classname should be empty so that the default pulsar source will be used + SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); + sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); + sourceSpecBuilder.setTypeClassName(typeArg.getName()); + sourceSpecBuilder.setTopicsPattern(sourceTopicPattern); + sourceSpecBuilder.setSubscriptionName(subscriptionName); + sourceSpecBuilder.putTopicsToSerDeClassName(sourceTopicPattern, ""); + functionDetailsBuilder.setAutoAck(true); + functionDetailsBuilder.setSource(sourceSpecBuilder); + + // set up sink spec + SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder(); + // sinkSpecBuilder.setClassName(PulsarSink.class.getName()); + sinkSpecBuilder.setTopic(sinkTopic); + Map sinkConfigMap = Maps.newHashMap(); + sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap)); + sinkSpecBuilder.setTypeClassName(typeArg.getName()); + functionDetailsBuilder.setSink(sinkSpecBuilder); + + return functionDetailsBuilder; + } + +} \ No newline at end of file diff --git a/pulsar-functions/proto/src/main/proto/Request.proto b/pulsar-functions/proto/src/main/proto/Request.proto index c8e31d7..5743469 100644 --- a/pulsar-functions/proto/src/main/proto/Request.proto +++ b/pulsar-functions/proto/src/main/proto/Request.proto @@ -37,8 +37,3 @@ message ServiceRequest { FunctionMetaData functionMetaData = 3; string workerId = 4; } - -message AssignmentsUpdate { - repeated Assignment assignments = 1; - uint64 version = 2; -} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java index 366eaba..3ad6c7c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java @@ -18,14 +18,16 @@ */ package org.apache.pulsar.functions.worker; -import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.util.function.Function; + import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.functions.proto.Request; +import org.apache.pulsar.functions.proto.Function.Assignment; -import java.io.IOException; -import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; @Slf4j public class FunctionAssignmentTailer @@ -34,15 +36,16 @@ public class FunctionAssignmentTailer private final FunctionRuntimeManager functionRuntimeManager; private final Reader reader; - public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, - Reader reader) + public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager) throws PulsarClientException { - this.functionRuntimeManager = functionRuntimeManager; - this.reader = reader; - } + this.functionRuntimeManager = functionRuntimeManager; - public void start() { + this.reader = functionRuntimeManager.getWorkerService().getClient().newReader() + .topic(functionRuntimeManager.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true) + .startMessageId(MessageId.earliest).create(); + } + public void start() { receiveOne(); } @@ -65,29 +68,21 @@ public class FunctionAssignmentTailer @Override public void accept(Message msg) { - - // check if latest - boolean hasMessageAvailable; - try { - hasMessageAvailable = this.reader.hasMessageAvailable(); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - if (!hasMessageAvailable) { - Request.AssignmentsUpdate assignmentsUpdate; + if(msg.getData()==null || (msg.getData().length==0)) { + log.info("Received assignment delete: {}", msg.getKey()); + this.functionRuntimeManager.deleteAssignment(msg.getKey()); + } else { + Assignment assignment; try { - assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(msg.getData()); + assignment = Assignment.parseFrom(msg.getData()); } catch (IOException e) { log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), e); // TODO: find a better way to handle bad request throw new RuntimeException(e); } - if (log.isDebugEnabled()) { - log.debug("Received assignment update: {}", assignmentsUpdate); - } - - this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), assignmentsUpdate); + log.info("Received assignment update: {}", assignment); + this.functionRuntimeManager.processAssignment(assignment); } // receive next request receiveOne(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 43cd27b..e634130 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -18,48 +18,45 @@ */ package org.apache.pulsar.functions.worker; -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriBuilder; -import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.common.policies.data.ErrorData; -import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.instance.AuthenticationConfig; +import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate; -import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; import org.apache.pulsar.functions.runtime.Runtime; -import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; +import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Entity; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.Response.Status; +import com.google.common.annotations.VisibleForTesting; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.stream.Collectors; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; /** * This class managers all aspects of functions assignments and running of function assignments for this worker @@ -78,13 +75,12 @@ public class FunctionRuntimeManager implements AutoCloseable{ Map functionRuntimeInfoMap = new ConcurrentHashMap<>(); @VisibleForTesting + @Getter final WorkerConfig workerConfig; @VisibleForTesting LinkedBlockingQueue actionQueue; - private long currentAssignmentVersion = 0; - private final FunctionAssignmentTailer functionAssignmentTailer; private FunctionActioner functionActioner; @@ -92,22 +88,19 @@ public class FunctionRuntimeManager implements AutoCloseable{ private RuntimeFactory runtimeFactory; private MembershipManager membershipManager; - private final ConnectorsManager connectorsManager; private final PulsarAdmin functionAdmin; + + @Getter + private WorkerService workerService; public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception { this.workerConfig = workerConfig; - this.connectorsManager = connectorsManager; + this.workerService = workerService; this.functionAdmin = workerService.getFunctionAdmin(); - Reader reader = workerService.getClient().newReader() - .topic(this.workerConfig.getFunctionAssignmentTopic()) - .startMessageId(MessageId.earliest) - .create(); - - this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader); + this.functionAssignmentTailer = new FunctionAssignmentTailer(this); AuthenticationConfig authConfig = AuthenticationConfig.builder() .clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin()) @@ -222,13 +215,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ return assignments; } - /** - * get the current version number of assignments - * @return assignments version number - */ - public synchronized long getCurrentAssignmentVersion() { - return new Long(this.currentAssignmentVersion); - } /** * Removes a collection of assignments @@ -461,105 +447,108 @@ public class FunctionRuntimeManager implements AutoCloseable{ /** * Process an assignment update from the assignment topic * @param messageId the message id of the update assignment - * @param assignmentsUpdate the assignment update + * @param newAssignment the assignment */ - public synchronized void processAssignmentUpdate(MessageId messageId, AssignmentsUpdate assignmentsUpdate) { + public synchronized void processAssignment(Assignment newAssignment) { - if (assignmentsUpdate.getVersion() > this.currentAssignmentVersion) { + Map existingAssignmentMap = new HashMap<>(); + for (Map entry : this.workerIdToAssignments.values()) { + existingAssignmentMap.putAll(entry); + } - Map assignmentMap = new HashMap<>(); - for (Assignment assignment : assignmentsUpdate.getAssignmentsList()) { - assignmentMap.put( - Utils.getFullyQualifiedInstanceId(assignment.getInstance()), - assignment); + if (existingAssignmentMap.containsKey(Utils.getFullyQualifiedInstanceId(newAssignment.getInstance()))) { + updateAssignment(newAssignment); + } else { + addAssignment(newAssignment); + } + } + + private void updateAssignment(Assignment assignment) { + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + Assignment existingAssignment = this.findAssignment(assignment); + // potential updates need to happen + if (!existingAssignment.equals(assignment)) { + FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); + //stop function + if (functionRuntimeInfo != null) { + this.insertStopAction(functionRuntimeInfo); } - Map existingAssignmentMap = new HashMap<>(); - for (Map entry : this.workerIdToAssignments.values()) { - existingAssignmentMap.putAll(entry); + // still assigned to me, need to restart + if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) { + //start again + FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo(); + newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance()); + this.insertStartAction(newFunctionRuntimeInfo); + this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo); } - Map assignmentsToAdd = diff(assignmentMap, existingAssignmentMap); - - Map assignmentsToDelete = diff(existingAssignmentMap, assignmentMap); - - Map existingAssignments = inCommon(assignmentMap, existingAssignmentMap); - - // functions to add - for (Map.Entry assignmentEntry : assignmentsToAdd.entrySet()) { - String fullyQualifiedInstanceId = assignmentEntry.getKey(); - Assignment assignment = assignmentEntry.getValue(); - - //add new function - this.setAssignment(assignment); - - //Assigned to me - if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) { - if (!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) { - this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new FunctionRuntimeInfo() - .setFunctionInstance(assignment.getInstance())); - - } else { - //Somehow this function is already started - log.warn("Function {} already running. Going to restart function.", - this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); - this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); - } - FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); - this.insertStartAction(functionRuntimeInfo); - } + // find existing assignment + Assignment existing_assignment = this.findAssignment(assignment); + if (existing_assignment != null) { + // delete old assignment that could have old data + this.deleteAssignment(existing_assignment); } + // set to newest assignment + this.setAssignment(assignment); + } + } + + public synchronized void deleteAssignment(String fullyQualifiedInstanceId) { + FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); + if (functionRuntimeInfo != null) { + this.insertStopAction(functionRuntimeInfo); + this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId); + } + + String workerId = null; + for(Entry> workerAssignments : workerIdToAssignments.entrySet()) { + if(workerAssignments.getValue().remove(fullyQualifiedInstanceId)!=null) { + workerId = workerAssignments.getKey(); + break; + } + } + Map worker; + if (workerId != null && ((worker = workerIdToAssignments.get(workerId)) != null && worker.isEmpty())) { + this.workerIdToAssignments.remove(workerId); + } + } - // functions to delete - for (Map.Entry assignmentEntry : assignmentsToDelete.entrySet()) { - String fullyQualifiedInstanceId = assignmentEntry.getKey(); - Assignment assignment = assignmentEntry.getValue(); - - FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); - if (functionRuntimeInfo != null) { - this.insertStopAction(functionRuntimeInfo); - this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId); - } - this.deleteAssignment(assignment); + @VisibleForTesting + void deleteAssignment(Assignment assignment) { + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + Map assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId()); + if (assignmentMap != null) { + if (assignmentMap.containsKey(fullyQualifiedInstanceId)) { + assignmentMap.remove(fullyQualifiedInstanceId); } + if (assignmentMap.isEmpty()) { + this.workerIdToAssignments.remove(assignment.getWorkerId()); + } + } + } - // functions to update - for (Map.Entry assignmentEntry : existingAssignments.entrySet()) { - String fullyQualifiedInstanceId = assignmentEntry.getKey(); - Assignment assignment = assignmentEntry.getValue(); - Assignment existingAssignment = this.findAssignment(assignment); - // potential updates need to happen - if (!existingAssignment.equals(assignment)) { - FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); - //stop function - if (functionRuntimeInfo != null) { - this.insertStopAction(functionRuntimeInfo); - } - // still assigned to me, need to restart - if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) { - //start again - FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo(); - newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance()); - this.insertStartAction(newFunctionRuntimeInfo); - this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo); - } + private void addAssignment(Assignment assignment) { + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); - // find existing assignment - Assignment existing_assignment = this.findAssignment(assignment); - if (existing_assignment != null) { - // delete old assignment that could have old data - this.deleteAssignment(existing_assignment); - } - // set to newest assignment - this.setAssignment(assignment); - } - } + //add new function + this.setAssignment(assignment); - // set as current assignment - this.currentAssignmentVersion = assignmentsUpdate.getVersion(); + //Assigned to me + if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) { + if (!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) { + this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new FunctionRuntimeInfo() + .setFunctionInstance(assignment.getInstance())); - } else { - log.debug("Received out of date assignment update: {}", assignmentsUpdate); + } else { + //Somehow this function is already started + log.warn("Function {} already running. Going to restart function.", + this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); + this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId)); + } + FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); + this.insertStartAction(functionRuntimeInfo); } + } public Map getFunctionRuntimeInfos() { @@ -642,20 +631,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ assignment); } - @VisibleForTesting - void deleteAssignment(Assignment assignment) { - Map assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId()); - if (assignmentMap != null) { - String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); - if (assignmentMap.containsKey(fullyQualifiedInstanceId)) { - assignmentMap.remove(fullyQualifiedInstanceId); - } - if (assignmentMap.isEmpty()) { - this.workerIdToAssignments.remove(assignment.getWorkerId()); - } - } - } - private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) { this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); } @@ -674,28 +649,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ } } - private Map diff(Map assignmentMap1, Map assignmentMap2) { - Map result = new HashMap<>(); - for (Map.Entry entry : assignmentMap1.entrySet()) { - if (!assignmentMap2.containsKey(entry.getKey())) { - result.put(entry.getKey(), entry.getValue()); - } - } - return result; - } - - private Map inCommon(Map assignmentMap1, Map assignmentMap2) { - - Map result = new HashMap<>(); - for (Map.Entry entry : assignmentMap1.entrySet()) { - if (assignmentMap2.containsKey(entry.getKey())) { - result.put(entry.getKey(), entry.getValue()); - } - } - return result; - } - private FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) { return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); } + } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index ed00958..db7785a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -23,30 +23,29 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; - +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; -import org.apache.pulsar.functions.proto.Request; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.worker.scheduler.IScheduler; +import com.google.common.annotations.VisibleForTesting; + +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + @Slf4j public class SchedulerManager implements AutoCloseable { @@ -65,16 +64,22 @@ public class SchedulerManager implements AutoCloseable { private final Producer producer; - private final ExecutorService executorService; + private final ScheduledExecutorService executorService; + + private final PulsarAdmin admin; + + AtomicBoolean isCompactionNeeded = new AtomicBoolean(false); + private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60; - public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient) { + public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, PulsarAdmin admin, ScheduledExecutorService executor) { this.workerConfig = workerConfig; + this.admin = admin; this.scheduler = Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class, Thread.currentThread().getContextClassLoader()); try { this.producer = pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic()) - .enableBatching(true).blockIfQueueFull(true).compressionType(CompressionType.LZ4). + .enableBatching(false).blockIfQueueFull(true).compressionType(CompressionType.LZ4). sendTimeout(0, TimeUnit.MILLISECONDS).create(); } catch (PulsarClientException e) { log.error("Failed to create producer to function assignment topic " @@ -82,9 +87,9 @@ public class SchedulerManager implements AutoCloseable { throw new RuntimeException(e); } - this.executorService = - new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>()); + this.executorService = executor; + + scheduleCompaction(executor, workerConfig.getTopicCompactionFrequencySec()); } public Future schedule() { @@ -92,13 +97,31 @@ public class SchedulerManager implements AutoCloseable { synchronized (SchedulerManager.this) { boolean isLeader = membershipManager.isLeader(); if (isLeader) { - invokeScheduler(); + try { + invokeScheduler(); + } catch (Exception e) { + log.warn("Failed to invoke scheduler", e); + schedule(); + } } } }); } - private void invokeScheduler() { + private void scheduleCompaction(ScheduledExecutorService executor, long scheduleFrequencySec) { + if (executor != null) { + executor.scheduleWithFixedDelay(() -> { + if (membershipManager.isLeader() && isCompactionNeeded.get()) { + compactAssignmentTopic(); + isCompactionNeeded.set(false); + } + }, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS); + } + } + + @VisibleForTesting + public void invokeScheduler() { + List currentMembership = this.membershipManager.getCurrentMembership() .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList()); @@ -111,12 +134,16 @@ public class SchedulerManager implements AutoCloseable { while (it.hasNext()) { Map.Entry> workerIdToAssignmentEntry = it.next(); Map functionMap = workerIdToAssignmentEntry.getValue(); + // remove instances that don't exist anymore - functionMap.entrySet().removeIf( - entry -> { - String fullyQualifiedInstanceId = entry.getKey(); - return !allInstances.containsKey(fullyQualifiedInstanceId); - }); + functionMap.entrySet().removeIf(entry -> { + String fullyQualifiedInstanceId = entry.getKey(); + boolean deleted = !allInstances.containsKey(fullyQualifiedInstanceId); + if (deleted) { + publishNewAssignment(entry.getValue().toBuilder().build(), true); + } + return deleted; + }); // update assignment instances in case attributes of a function gets updated for (Map.Entry entry : functionMap.entrySet()) { @@ -143,36 +170,40 @@ public class SchedulerManager implements AutoCloseable { List assignments = this.scheduler.schedule( needsAssignment, currentAssignments, currentMembership); - log.debug("New assignments computed: {}", assignments); + if (log.isDebugEnabled()) { + log.debug("New assignments computed: {}", assignments); + } - long assignmentVersion = this.functionRuntimeManager.getCurrentAssignmentVersion() + 1; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() - .setVersion(assignmentVersion) - .addAllAssignments(assignments) - .build(); + isCompactionNeeded.set(!assignments.isEmpty()); - CompletableFuture messageIdCompletableFuture = producer.sendAsync(assignmentsUpdate.toByteArray()); - try { - messageIdCompletableFuture.get(); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to send assignment update", e); - throw new RuntimeException(e); + for(Assignment assignment : assignments) { + publishNewAssignment(assignment, false); } + + } - // wait for assignment update to go throw the pipeline - int retries = 0; - while (this.functionRuntimeManager.getCurrentAssignmentVersion() < assignmentVersion) { - if (retries >= this.workerConfig.getAssignmentWriteMaxRetries()) { - log.warn("Max number of retries reached for waiting for assignment to propagate. Will continue now."); - break; - } - log.info("Waiting for assignments to propagate..."); + public void compactAssignmentTopic() { + if (this.admin != null) { try { - Thread.sleep(500); - } catch (InterruptedException e) { - throw new RuntimeException(e); + this.admin.topics().triggerCompaction(workerConfig.getFunctionAssignmentTopic()); + } catch (PulsarAdminException e) { + log.error("Failed to trigger compaction {}", e); + executorService.schedule(() -> compactAssignmentTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC, + TimeUnit.SECONDS); } - retries++; + } + } + + private void publishNewAssignment(Assignment assignment, boolean deleted) { + try { + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + // publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id + // message + producer.newMessage().key(fullyQualifiedInstanceId) + .value(deleted ? "".getBytes() : assignment.toByteArray()).sendAsync().get(); + } catch (Exception e) { + log.error("Failed to {} assignment update {}", assignment, deleted ? "send" : "deleted", e); + throw new RuntimeException(e); } } @@ -224,6 +255,5 @@ public class SchedulerManager implements AutoCloseable { } catch (PulsarClientException e) { log.warn("Failed to shutdown scheduler manager assignment producer", e); } - this.executorService.shutdown(); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 0f695a9..a2524c6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -74,6 +74,9 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { private long instanceLivenessCheckFreqMs; private String clientAuthenticationPlugin; private String clientAuthenticationParameters; + // Frequency how often worker performs compaction on function-topics + private long topicCompactionFrequencySec = 30 * 60; // 30 minutes + private int metricsSamplingPeriodSec = 60; /***** --- TLS --- ****/ // Enable TLS private boolean tlsEnabled = false; @@ -88,8 +91,6 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { private boolean tlsRequireTrustedClientCertOnConnect = false; private boolean useTls = false; private boolean tlsHostnameVerificationEnable = false; - - private int metricsSamplingPeriodSec = 60; // Enforce authentication private boolean authenticationEnabled = false; // Autentication provider name list, which is a list of class names diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 7fc0cc9..488bcd7 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import io.netty.util.concurrent.DefaultThreadFactory; @@ -68,11 +69,15 @@ public class WorkerService { private PulsarAdmin brokerAdmin; private PulsarAdmin functionAdmin; private final MetricsGenerator metricsGenerator; + private final ScheduledExecutorService executor; + @VisibleForTesting + private URI dlogUri; public WorkerService(WorkerConfig workerConfig) { this.workerConfig = workerConfig; this.statsUpdater = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater")); + this.executor = Executors.newScheduledThreadPool(10, new DefaultThreadFactory("pulsar-worker")); this.metricsGenerator = new MetricsGenerator(this.statsUpdater); } @@ -98,12 +103,13 @@ public class WorkerService { } // create the dlog namespace for storing function packages + this.dlogUri = dlogUri; DistributedLogConfiguration dlogConf = Utils.getDlogConf(workerConfig); try { this.dlogNamespace = NamespaceBuilder.newBuilder() .conf(dlogConf) .clientId("function-worker-" + workerConfig.getWorkerId()) - .uri(dlogUri) + .uri(this.dlogUri) .build(); } catch (Exception e) { log.error("Failed to initialize dlog namespace {} for storing function packages", @@ -128,7 +134,8 @@ public class WorkerService { log.info("Created Pulsar client"); //create scheduler manager - this.schedulerManager = new SchedulerManager(this.workerConfig, this.client); + this.schedulerManager = new SchedulerManager(this.workerConfig, this.client, this.brokerAdmin, + this.executor); //create function meta data manager this.functionMetaDataManager = new FunctionMetaDataManager( @@ -232,6 +239,10 @@ public class WorkerService { if (null != this.functionAdmin) { this.functionAdmin.close(); } + + if(this.executor != null) { + this.executor.shutdown(); + } } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java index 58c1a9a..817962d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java @@ -22,6 +22,8 @@ import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Instance; +import com.google.common.collect.Lists; + import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -47,19 +49,17 @@ public class RoundRobinScheduler implements IScheduler { workerIdToAssignment.get(existingAssignment.getWorkerId()).add(existingAssignment); } + List newAssignments = Lists.newArrayList(); for (Instance unassignedFunctionInstance : unassignedFunctionInstances) { String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance); String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : findNextWorker(workerIdToAssignment); Assignment newAssignment = Assignment.newBuilder().setInstance(unassignedFunctionInstance) .setWorkerId(workerId).build(); workerIdToAssignment.get(workerId).add(newAssignment); + newAssignments.add(newAssignment); } - List assignments - = workerIdToAssignment.entrySet().stream() - .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList()); - - return assignments; + return newAssignments; } private static String checkHeartBeatFunction(Instance funInstance) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 85a2122..5e1ed02 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doReturn; @@ -86,6 +87,8 @@ public class FunctionRuntimeManagerTest { doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); doReturn(pulsarClient).when(workerService).getClient(); @@ -123,12 +126,8 @@ public class FunctionRuntimeManagerTest { assignments.add(assignment1); assignments.add(assignment2); - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() - .addAllAssignments(assignments) - .setVersion(1) - .build(); - - functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, assignmentsUpdate); + functionRuntimeManager.processAssignment(assignment1); + functionRuntimeManager.processAssignment(assignment2); verify(functionRuntimeManager, times(2)).setAssignment(any(Function.Assignment.class)); verify(functionRuntimeManager, times(0)).deleteAssignment(any(Function.Assignment.class)); @@ -183,6 +182,7 @@ public class FunctionRuntimeManagerTest { doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); doReturn(pulsarClient).when(workerService).getClient(); @@ -205,6 +205,7 @@ public class FunctionRuntimeManagerTest { Function.FunctionDetails.newBuilder() .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build(); + // Delete this assignment Function.Assignment assignment1 = Function.Assignment.newBuilder() .setWorkerId("worker-1") .setInstance(Function.Instance.newBuilder() @@ -221,23 +222,18 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.setAssignment(assignment2); reset(functionRuntimeManager); - List assignments = new LinkedList<>(); - assignments.add(assignment2); - - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() - .addAllAssignments(assignments) - .setVersion(1) - .build(); - functionRuntimeManager.functionRuntimeInfoMap.put( "test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance( Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) .build())); - functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, assignmentsUpdate); + functionRuntimeManager.processAssignment(assignment1); + functionRuntimeManager.processAssignment(assignment2); + functionRuntimeManager.deleteAssignment(Utils.getFullyQualifiedInstanceId(assignment1.getInstance())); + verify(functionRuntimeManager, times(0)).setAssignment(any(Function.Assignment.class)); - verify(functionRuntimeManager, times(1)).deleteAssignment(any(Function.Assignment.class)); + verify(functionRuntimeManager, times(1)).deleteAssignment(any(String.class)); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments @@ -284,6 +280,7 @@ public class FunctionRuntimeManagerTest { doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); doReturn(pulsarClient).when(workerService).getClient(); @@ -327,14 +324,6 @@ public class FunctionRuntimeManagerTest { .setInstance(Function.Instance.newBuilder() .setFunctionMetaData(function2).setInstanceId(0).build()) .build(); - List assignments = new LinkedList<>(); - assignments.add(assignment1); - assignments.add(assignment3); - - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() - .addAllAssignments(assignments) - .setVersion(1) - .build(); functionRuntimeManager.functionRuntimeInfoMap.put( "test-tenant/test-namespace/func-1:0", new FunctionRuntimeInfo().setFunctionInstance( @@ -345,7 +334,8 @@ public class FunctionRuntimeManagerTest { Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0) .build())); - functionRuntimeManager.processAssignmentUpdate(MessageId.earliest, assignmentsUpdate); + functionRuntimeManager.processAssignment(assignment1); + functionRuntimeManager.processAssignment(assignment3); verify(functionRuntimeManager, times(1)).insertStopAction(any(FunctionRuntimeInfo.class)); verify(functionRuntimeManager).insertStopAction(argThat(new ArgumentMatcher() { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 2753bf1..c849fd3 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -131,6 +131,7 @@ public class MembershipManagerTest { ReaderBuilder readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(true); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); @@ -202,6 +203,7 @@ public class MembershipManagerTest { doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(true); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); doReturn(pulsarClient).when(workerService).getClient(); @@ -296,6 +298,7 @@ public class MembershipManagerTest { ReaderBuilder readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(true); doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); doReturn(mock(Reader.class)).when(readerBuilder).create(); WorkerService workerService = mock(WorkerService.class); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 19977bd..00e4b6e 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -28,15 +28,19 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertTrue; import java.lang.reflect.Method; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -46,6 +50,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.Request; @@ -53,9 +58,14 @@ import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler; import org.mockito.Mockito; import org.mockito.invocation.Invocation; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.collect.Sets; +import com.google.protobuf.InvalidProtocolBufferException; + +import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -67,6 +77,8 @@ public class SchedulerManagerTest { private MembershipManager membershipManager; private CompletableFuture completableFuture; private Producer producer; + private TypedMessageBuilder message; + private ScheduledExecutorService executor; private static PulsarClient mockPulsarClient() throws PulsarClientException { ProducerBuilder builder = mock(ProducerBuilder.class); @@ -94,8 +106,12 @@ public class SchedulerManagerTest { producer = mock(Producer.class); completableFuture = spy(new CompletableFuture<>()); completableFuture.complete(MessageId.earliest); - byte[] bytes = any(); - when(producer.sendAsync(bytes)).thenReturn(completableFuture); + //byte[] bytes = any(); + message = mock(TypedMessageBuilder.class); + when(producer.newMessage()).thenReturn(message); + when(message.key(anyString())).thenReturn(message); + when(message.value(any())).thenReturn(message); + when(message.sendAsync()).thenReturn(completableFuture); ProducerBuilder builder = mock(ProducerBuilder.class); when(builder.topic(anyString())).thenReturn(builder); @@ -109,7 +125,9 @@ public class SchedulerManagerTest { PulsarClient pulsarClient = mock(PulsarClient.class); when(pulsarClient.newProducer()).thenReturn(builder); - schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient)); + this.executor = Executors + .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-test")); + schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient, null, executor)); functionRuntimeManager = mock(FunctionRuntimeManager.class); functionMetaDataManager = mock(FunctionMetaDataManager.class); membershipManager = mock(MembershipManager.class); @@ -118,6 +136,11 @@ public class SchedulerManagerTest { schedulerManager.setMembershipManager(membershipManager); } + @AfterMethod + public void stop() { + this.executor.shutdown(); + } + @Test public void testSchedule() throws Exception { @@ -143,9 +166,6 @@ public class SchedulerManagerTest { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -159,7 +179,9 @@ public class SchedulerManagerTest { // i am leader doReturn(true).when(membershipManager).isLeader(); callSchedule(); - verify(producer, times(1)).sendAsync(any(byte[].class)); + List invocations = getMethodInvocationDetails(schedulerManager, + SchedulerManager.class.getMethod("invokeScheduler")); + Assert.assertEquals(invocations.size(), 1); } @Test @@ -187,9 +209,6 @@ public class SchedulerManagerTest { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -200,17 +219,8 @@ public class SchedulerManagerTest { callSchedule(); - List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", - Object.class)); - Assert.assertEquals(invocations.size(), 1); - - byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); - log.info("assignmentsUpdate: {}", assignmentsUpdate); - Assert.assertEquals( - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).build(), - assignmentsUpdate); + List invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 0); } @Test @@ -243,9 +253,6 @@ public class SchedulerManagerTest { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -256,23 +263,20 @@ public class SchedulerManagerTest { callSchedule(); - List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", - Object.class)); + List invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); Assert.assertEquals(invocations.size(), 1); - + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", + Object.class)); byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assignment assignments = Assignment.parseFrom(send); - log.info("assignmentsUpdate: {}", assignmentsUpdate); + log.info("assignments: {}", assignments); Function.Assignment assignment2 = Function.Assignment.newBuilder() .setWorkerId("worker-1") .setInstance(Function.Instance.newBuilder() .setFunctionMetaData(function2).setInstanceId(0).build()) .build(); - Assert.assertEquals( - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).addAssignments(assignment2).build(), - assignmentsUpdate); + Assert.assertEquals(assignment2, assignments); } @@ -288,7 +292,7 @@ public class SchedulerManagerTest { // simulate function2 got removed Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder() .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2") - .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)) .build(); functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); @@ -300,6 +304,7 @@ public class SchedulerManagerTest { .setFunctionMetaData(function1).setInstanceId(0).build()) .build(); + // Delete this assignment Function.Assignment assignment2 = Function.Assignment.newBuilder() .setWorkerId("worker-1") .setInstance(Function.Instance.newBuilder() @@ -309,14 +314,12 @@ public class SchedulerManagerTest { Map> currentAssignments = new HashMap<>(); Map assignmentEntry1 = new HashMap<>(); assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + //TODO: delete this assignment assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -327,19 +330,16 @@ public class SchedulerManagerTest { callSchedule(); - List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", - Object.class)); + List invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); Assert.assertEquals(invocations.size(), 1); - + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", + Object.class)); byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assignment assignments = Assignment.parseFrom(send); - log.info("assignmentsUpdate: {}", assignmentsUpdate); + log.info("assignments: {}", assignments); - Assert.assertEquals( - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).build(), - assignmentsUpdate); + Assert.assertEquals(0, send.length); } @Test @@ -373,9 +373,6 @@ public class SchedulerManagerTest { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -386,25 +383,21 @@ public class SchedulerManagerTest { callSchedule(); - List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", - Object.class)); + List invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); Assert.assertEquals(invocations.size(), 1); - + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", + Object.class)); byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assignment assignments = Assignment.parseFrom(send); - log.info("assignmentsUpdate: {}", assignmentsUpdate); + log.info("assignments: {}", assignments); Function.Assignment assignment2 = Function.Assignment.newBuilder() .setWorkerId("worker-1") .setInstance(Function.Instance.newBuilder() .setFunctionMetaData(function2).setInstanceId(0).build()) .build(); - Assert.assertEquals( - assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).addAssignments(assignment2).build() - ); + Assert.assertEquals(assignments, assignment2); // scale up @@ -435,17 +428,23 @@ public class SchedulerManagerTest { callSchedule(); - invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 4); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Assert.assertEquals(invocations.size(), 2); + + Set allAssignments = Sets.newHashSet(); + invocations.forEach(invocation -> { + try { + allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0])); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); - send = (byte[]) invocations.get(1).getRawArguments()[0]; - assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); - Assert.assertEquals(assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 + 1) - .addAssignments(assignment1).addAssignments(assignment2Scaled1) - .addAssignments(assignment2Scaled2).addAssignments(assignment2Scaled3).build() - ); + assertTrue(allAssignments.contains(assignment2Scaled1)); + assertTrue(allAssignments.contains(assignment2Scaled2)); + assertTrue(allAssignments.contains(assignment2Scaled3)); } @Test @@ -479,9 +478,6 @@ public class SchedulerManagerTest { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -492,14 +488,24 @@ public class SchedulerManagerTest { callSchedule(); - List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + List invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 3); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Assert.assertEquals(invocations.size(), 1); - byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assignment assignments = Assignment.parseFrom(send); + + log.info("assignments: {}", assignments); + + Set allAssignments = Sets.newHashSet(); + invocations.forEach(invocation -> { + try { + allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0])); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); - log.info("assignmentsUpdate: {}", assignmentsUpdate); Function.Assignment assignment2_1 = Function.Assignment.newBuilder() .setWorkerId("worker-1") @@ -516,13 +522,11 @@ public class SchedulerManagerTest { .setInstance(Function.Instance.newBuilder() .setFunctionMetaData(function2).setInstanceId(2).build()) .build(); - Assert.assertEquals( - assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).addAssignments(assignment2_1) - .addAssignments(assignment2_2).addAssignments(assignment2_3).build() - ); - + + assertTrue(allAssignments.contains(assignment2_1)); + assertTrue(allAssignments.contains(assignment2_2)); + assertTrue(allAssignments.contains(assignment2_3)); + // scale down Function.FunctionMetaData function2Scaled = Function.FunctionMetaData.newBuilder() @@ -542,17 +546,23 @@ public class SchedulerManagerTest { callSchedule(); - invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 4); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Assert.assertEquals(invocations.size(), 2); + send = (byte[]) invocations.get(0).getRawArguments()[0]; + assignments = Assignment.parseFrom(send); + + Set allAssignments2 = Sets.newHashSet(); + invocations.forEach(invocation -> { + try { + allAssignments2.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0])); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); - send = (byte[]) invocations.get(1).getRawArguments()[0]; - assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); - Assert.assertEquals(assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 + 1) - .addAssignments(assignment1).addAssignments(assignment2Scaled) - .build() - ); + assertTrue(allAssignments2.contains(assignment2Scaled)); } @Test @@ -582,9 +592,6 @@ public class SchedulerManagerTest { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - // set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - List workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of(workerId1, "workerHostname-1", 5000)); workerInfoList.add(WorkerInfo.of(workerId2, "workerHostname-1", 6000)); @@ -595,20 +602,20 @@ public class SchedulerManagerTest { callSchedule(); - List invocations = getMethodInvocationDetails(producer, - Producer.class.getMethod("sendAsync", Object.class)); - Assert.assertEquals(invocations.size(), 1); - - byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); - - List assignmentList = assignmentsUpdate.getAssignmentsList(); - Assert.assertEquals(assignmentList.size(), 2); - for (Assignment assignment : assignmentList) { - String functionName = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(); - String assignedWorkerId = assignment.getWorkerId(); - Assert.assertEquals(functionName, assignedWorkerId); - } + List invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 2); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", + Object.class)); + invocations.forEach(invocation -> { + try { + Assignment assignment = Assignment.parseFrom((byte[])invocation.getRawArguments()[0]); + String functionName = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(); + String assignedWorkerId = assignment.getWorkerId(); + Assert.assertEquals(functionName, assignedWorkerId); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); } @Test @@ -644,9 +651,6 @@ public class SchedulerManagerTest { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - //set version - doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - // single node List workerInfoList = new LinkedList<>(); workerInfoList.add(WorkerInfo.of("worker-1", "workerHostname-1", 5000)); @@ -657,14 +661,14 @@ public class SchedulerManagerTest { callSchedule(); - List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + List invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 3); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Assert.assertEquals(invocations.size(), 1); - byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assignment assignments = Assignment.parseFrom(send); - log.info("assignmentsUpdate: {}", assignmentsUpdate); + log.info("assignmentsUpdate: {}", assignments); Function.Assignment assignment2_1 = Function.Assignment.newBuilder() .setWorkerId("worker-1") @@ -681,13 +685,27 @@ public class SchedulerManagerTest { .setInstance(Function.Instance.newBuilder() .setFunctionMetaData(function2).setInstanceId(2).build()) .build(); - Assert.assertEquals( - assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) - .addAssignments(assignment1).addAssignments(assignment2_1) - .addAssignments(assignment2_2).addAssignments(assignment2_3).build() - ); - + + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 3); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", + Object.class)); + send = (byte[]) invocations.get(0).getRawArguments()[0]; + assignments = Assignment.parseFrom(send); + + Set allAssignments = Sets.newHashSet(); + invocations.forEach(invocation -> { + try { + allAssignments.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0])); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); + + assertTrue(allAssignments.contains(assignment2_1)); + assertTrue(allAssignments.contains(assignment2_2)); + assertTrue(allAssignments.contains(assignment2_3)); + // scale down Function.FunctionMetaData function2Updated = Function.FunctionMetaData.newBuilder() @@ -718,28 +736,32 @@ public class SchedulerManagerTest { callSchedule(); - invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + Assert.assertEquals(invocations.size(), 6); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Assert.assertEquals(invocations.size(), 2); - - send = (byte[]) invocations.get(1).getRawArguments()[0]; - assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); - Assert.assertEquals(assignmentsUpdate, - Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 + 1) - .addAssignments(assignment1).addAssignments(assignment2Updated1) - .addAssignments(assignment2Updated2) - .addAssignments(assignment2Updated3) - .build() - ); + send = (byte[]) invocations.get(0).getRawArguments()[0]; + assignments = Assignment.parseFrom(send); + + Set allAssignments2 = Sets.newHashSet(); + invocations.forEach(invocation -> { + try { + allAssignments2.add(Assignment.parseFrom((byte[])invocation.getRawArguments()[0])); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); + + assertTrue(allAssignments2.contains(assignment2Updated1)); + assertTrue(allAssignments2.contains(assignment2Updated2)); + assertTrue(allAssignments2.contains(assignment2Updated3)); } private void callSchedule() throws NoSuchMethodException, InterruptedException, TimeoutException, ExecutionException { - long intialVersion = functionRuntimeManager.getCurrentAssignmentVersion(); Future complete = schedulerManager.schedule(); complete.get(30, TimeUnit.SECONDS); - doReturn(intialVersion + 1).when(functionRuntimeManager).getCurrentAssignmentVersion(); } private List getMethodInvocationDetails(Object o, Method method) throws NoSuchMethodException {