From commits-return-18752-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Fri Dec 7 19:39:45 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 7B04118067A for ; Fri, 7 Dec 2018 19:39:44 +0100 (CET) Received: (qmail 92523 invoked by uid 500); 7 Dec 2018 18:39:43 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 92514 invoked by uid 99); 7 Dec 2018 18:39:43 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Dec 2018 18:39:43 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] srkukarni closed pull request #3137: Make Source/Sink status Source/Sink specific Message-ID: <154420798307.2459.13797070009163137923.gitbox@gitbox.apache.org> Date: Fri, 07 Dec 2018 18:39:43 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit srkukarni closed pull request #3137: Make Source/Sink status Source/Sink specific URL: https://github.com/apache/pulsar/pull/3137 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java index 72cf7362a8..609f1071e4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SinkStatus.java @@ -27,7 +27,9 @@ @Data public class SinkStatus { + // The total number of sink instances that ought to be running public int numInstances; + // The number of source instances that are actually running public int numRunning; public List instances = new LinkedList<>(); @@ -38,20 +40,29 @@ @Data public static class SinkInstanceStatusData { - + // Is this instance running? public boolean running; + // Do we have any error while running this instance public String error; + // Number of times this instance has restarted public long numRestarts; - public long numReceived; + // Number of messages read from Pulsar + public long numReadFromPulsar; + // Number of times there was a system exception handling messages public long numSystemExceptions; + // A list of the most recent system exceptions public List latestSystemExceptions; - public long lastInvocationTime; + // Number of messages written to sink + public long numWrittenToSink; + + // When was the last time we received a message from Pulsar + public long lastReceivedTime; public String workerId; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java index 1ea8a801c0..4043900421 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SourceStatus.java @@ -27,7 +27,9 @@ @Data public class SourceStatus { + // The total number of source instances that ought to be running public int numInstances; + // The number of source instances that are actually running public int numRunning; public List instances = new LinkedList<>(); @@ -38,21 +40,31 @@ @Data public static class SourceInstanceStatusData { - + // Is this instance running? public boolean running; + // Do we have any error while running this instance public String error; + // Number of times this instance has restarted public long numRestarts; - public long numReceived; + // Number of messages received from source + public long numReceivedFromSource; + // Number of times there was a system exception handling messages public long numSystemExceptions; + // A list of the most recent system exceptions public List latestSystemExceptions; - public long lastInvocationTime; + // Number of messages written into pulsar + public long numWritten; + + // When was the last time we received a message from the source + public long lastReceivedTime; + // The worker id on which the source is running public String workerId; } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java index 4ecba12964..45bbba4436 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java @@ -60,7 +60,7 @@ sinkInstanceStatusData.setRunning(status.getRunning()); sinkInstanceStatusData.setError(status.getFailureException()); sinkInstanceStatusData.setNumRestarts(status.getNumRestarts()); - sinkInstanceStatusData.setNumReceived(status.getNumReceived()); + sinkInstanceStatusData.setNumReadFromPulsar(status.getNumReceived()); List userExceptionInformationList = new LinkedList<>(); for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) { @@ -82,7 +82,8 @@ } sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList); - sinkInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime()); + sinkInstanceStatusData.setNumWrittenToSink(status.getNumSuccessfullyProcessed()); + sinkInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime()); sinkInstanceStatusData.setWorkerId(assignedWorkerId); return sinkInstanceStatusData; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java index 412019a184..f05eea6fd4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java @@ -59,7 +59,7 @@ sourceInstanceStatusData.setRunning(status.getRunning()); sourceInstanceStatusData.setError(status.getFailureException()); sourceInstanceStatusData.setNumRestarts(status.getNumRestarts()); - sourceInstanceStatusData.setNumReceived(status.getNumReceived()); + sourceInstanceStatusData.setNumReceivedFromSource(status.getNumReceived()); List userExceptionInformationList = new LinkedList<>(); for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) { @@ -81,7 +81,8 @@ } sourceInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList); - sourceInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime()); + sourceInstanceStatusData.setNumWritten(status.getNumSuccessfullyProcessed()); + sourceInstanceStatusData.setLastReceivedTime(status.getLastInvocationTime()); sourceInstanceStatusData.setWorkerId(assignedWorkerId); return sourceInstanceStatusData; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 5add9a1455..f100c21a13 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -245,7 +245,7 @@ protected void getSinkStatus(String tenant, String namespace, String sinkName) t String[] commands = { PulsarCluster.ADMIN_SCRIPT, "sink", - "getstatus", + "status", "--tenant", tenant, "--namespace", namespace, "--name", sinkName @@ -472,7 +472,7 @@ protected void getSourceStatus(String tenant, String namespace, String sourceNam String[] commands = { PulsarCluster.ADMIN_SCRIPT, "source", - "getstatus", + "status", "--tenant", tenant, "--namespace", namespace, "--name", sourceName @@ -524,7 +524,7 @@ protected void waitForProcessingSourceMessages(String tenant, String[] commands = { PulsarCluster.ADMIN_SCRIPT, "source", - "getstatus", + "status", "--tenant", tenant, "--namespace", namespace, "--name", sourceName @@ -544,8 +544,9 @@ protected void waitForProcessingSourceMessages(String tenant, assertEquals(sourceStatus.getInstances().size(), 1); assertEquals(sourceStatus.getInstances().get(0).getInstanceId(), 0); assertEquals(sourceStatus.getInstances().get(0).getStatus().isRunning(), true); - assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastInvocationTime() > 0); - assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceived(), numMessages); + assertTrue(sourceStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0); + assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumReceivedFromSource(), numMessages); + assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumWritten(), numMessages); assertEquals(sourceStatus.getInstances().get(0).getStatus().getNumRestarts(), 0); assertEquals(sourceStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0); return; @@ -568,7 +569,7 @@ protected void waitForProcessingSinkMessages(String tenant, String[] commands = { PulsarCluster.ADMIN_SCRIPT, "sink", - "getstatus", + "status", "--tenant", tenant, "--namespace", namespace, "--name", sinkName @@ -588,8 +589,9 @@ protected void waitForProcessingSinkMessages(String tenant, assertEquals(sinkStatus.getInstances().size(), 1); assertEquals(sinkStatus.getInstances().get(0).getInstanceId(), 0); assertEquals(sinkStatus.getInstances().get(0).getStatus().isRunning(), true); - assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastInvocationTime() > 0); - assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReceived(), numMessages); + assertTrue(sinkStatus.getInstances().get(0).getStatus().getLastReceivedTime() > 0); + assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumReadFromPulsar(), numMessages); + assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumWrittenToSink(), numMessages); assertEquals(sinkStatus.getInstances().get(0).getStatus().getNumRestarts(), 0); assertEquals(sinkStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0); return; @@ -940,7 +942,7 @@ private static void getFunctionStatus(String functionName, int numMessages) thro ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd( PulsarCluster.ADMIN_SCRIPT, "functions", - "getstatus", + "status", "--tenant", "public", "--namespace", "default", "--name", functionName ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services