From commits-return-13184-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Thu Aug 23 20:20:18 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6090E180677 for ; Thu, 23 Aug 2018 20:20:18 +0200 (CEST) Received: (qmail 77158 invoked by uid 500); 23 Aug 2018 18:20:17 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 77149 invoked by uid 99); 23 Aug 2018 18:20:17 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Aug 2018 18:20:17 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] rdhabalia closed pull request #2424: Support heartbeat function for worker Message-ID: <153504841692.6293.16403144502205307768.gitbox@gitbox.apache.org> Date: Thu, 23 Aug 2018 18:20:16 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit rdhabalia closed pull request #2424: Support heartbeat function for worker URL: https://github.com/apache/incubator-pulsar/pull/2424 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 38ef5d3a6c..0f695a974f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -143,7 +143,7 @@ public static WorkerConfig load(String yamlFile) throws IOException { public String getWorkerId() { if (StringUtils.isBlank(this.workerId)) { - this.workerId = getWorkerHostname(); + this.workerId = String.format("%s-%s", this.getWorkerHostname(), this.getWorkerPort()); } return this.workerId; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java index 4f9ad62b30..58c1a9a513 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.worker.scheduler; import org.apache.pulsar.functions.proto.Function.Assignment; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Instance; import java.util.HashMap; @@ -29,6 +30,9 @@ public class RoundRobinScheduler implements IScheduler { + public static final String HEARTBEAT_TENANT = "pulsar-function"; + public static final String HEARTBEAT_NAMESPACE = "heartbeat"; + @Override public List schedule(List unassignedFunctionInstances, List currentAssignments, List workers) { @@ -44,7 +48,8 @@ } for (Instance unassignedFunctionInstance : unassignedFunctionInstances) { - String workerId = findNextWorker(workerIdToAssignment); + String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance); + String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : findNextWorker(workerIdToAssignment); Assignment newAssignment = Assignment.newBuilder().setInstance(unassignedFunctionInstance) .setWorkerId(workerId).build(); workerIdToAssignment.get(workerId).add(newAssignment); @@ -57,6 +62,16 @@ return assignments; } + private static String checkHeartBeatFunction(Instance funInstance) { + if (funInstance.getFunctionMetaData() != null + && funInstance.getFunctionMetaData().getFunctionDetails() != null) { + FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails(); + return HEARTBEAT_TENANT.equals(funDetails.getTenant()) + && HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null; + } + return null; + } + private String findNextWorker(Map> workerIdToAssignment) { String targetWorkerId = null; int least = Integer.MAX_VALUE; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 97e9c365a5..19977bd812 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.Request; import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler; import org.mockito.Mockito; @@ -554,6 +555,62 @@ public void testScalingDown() throws Exception { ); } + @Test + public void testHeartbeatFunction() throws Exception { + List functionMetaDataList = new LinkedList<>(); + final long version = 5; + final String workerId1 = "host-workerId-1"; + final String workerId2 = "host-workerId-2"; + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() + .setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId1) + .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE) + .setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1)) + .setVersion(version).build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder() + .setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId2) + .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE) + .setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1)) + .setVersion(version).build(); + functionMetaDataList.add(function1); + functionMetaDataList.add(function2); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + Map> currentAssignments = new HashMap<>(); + Map assignmentEntry1 = new HashMap<>(); + + currentAssignments.put("worker-1", assignmentEntry1); + doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + + // set version + doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); + + List workerInfoList = new LinkedList<>(); + workerInfoList.add(WorkerInfo.of(workerId1, "workerHostname-1", 5000)); + workerInfoList.add(WorkerInfo.of(workerId2, "workerHostname-1", 6000)); + doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); + + // i am leader + doReturn(true).when(membershipManager).isLeader(); + + callSchedule(); + + List invocations = getMethodInvocationDetails(producer, + Producer.class.getMethod("sendAsync", Object.class)); + Assert.assertEquals(invocations.size(), 1); + + byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; + Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + + List assignmentList = assignmentsUpdate.getAssignmentsList(); + Assert.assertEquals(assignmentList.size(), 2); + for (Assignment assignment : assignmentList) { + String functionName = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(); + String assignedWorkerId = assignment.getWorkerId(); + Assert.assertEquals(functionName, assignedWorkerId); + } + } + @Test public void testUpdate() throws Exception { List functionMetaDataList = new LinkedList<>(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services