pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: QueryState now spits out FunctionState (#3076)
Date Wed, 28 Nov 2018 21:06:56 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aaf1996  QueryState now spits out FunctionState (#3076)
aaf1996 is described below

commit aaf199663ab7debf674d8de71c76bc126b627e88
Author: Sanjeev Kulkarni <sanjeevrk@gmail.com>
AuthorDate: Wed Nov 28 13:06:52 2018 -0800

    QueryState now spits out FunctionState (#3076)
    
    ### Motivation
    Instead of spitting some unformatted stuff, made querystate print out a proper json object.
---
 .../org/apache/pulsar/client/admin/Functions.java  |  4 +--
 .../client/admin/internal/FunctionsImpl.java       | 13 ++++----
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  6 ++--
 .../pulsar/common/functions/FunctionState.java     | 35 ++++++++++++++++++++++
 .../functions/worker/rest/api/FunctionsImpl.java   | 10 +++----
 5 files changed, 51 insertions(+), 17 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index f909557..ca93c02 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
+import org.apache.pulsar.common.functions.FunctionState;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -398,6 +399,5 @@ public interface Functions {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    String getFunctionState(String tenant, String namespace, String function, String key)
throws PulsarAdminException;
-
+    FunctionState getFunctionState(String tenant, String namespace, String function, String
key) throws PulsarAdminException;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 753955e..a068a05 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -19,8 +19,8 @@
 package org.apache.pulsar.client.admin.internal;
 
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import com.google.protobuf.AbstractMessage.Builder;
-import com.google.protobuf.MessageOrBuilder;
 import com.google.protobuf.util.JsonFormat;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionState;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
@@ -366,7 +367,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
         }
     }
 
-    public String getFunctionState(String tenant, String namespace, String function, String
key)
+    public FunctionState getFunctionState(String tenant, String namespace, String function,
String key)
         throws PulsarAdminException {
         try {
             Response response = request(functions.path(tenant)
@@ -374,7 +375,8 @@ public class FunctionsImpl extends BaseResource implements Functions {
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
                 throw new ClientErrorException(response);
             }
-            return response.readEntity(String.class);
+            String value = response.readEntity(String.class);
+            return new Gson().fromJson(value, new TypeToken<FunctionState>() {}.getType());
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -384,9 +386,4 @@ public class FunctionsImpl extends BaseResource implements Functions {
     public static void mergeJson(String json, Builder builder) throws IOException {
         JsonFormat.parser().merge(json, builder);
     }
-
-    public static String printJson(MessageOrBuilder msg) throws IOException {
-        return JsonFormat.printer().print(msg);
-    }
-
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index cfe709d..fea0ffd 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.functions.WindowConfig;
+import org.apache.pulsar.common.functions.FunctionState;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style
compute processes that work with Pulsar)")
@@ -756,8 +757,9 @@ public class CmdFunctions extends CmdBase {
         @Override
         void runCmd() throws Exception {
             do {
-                String valueAndVersion = admin.functions().getFunctionState(tenant, namespace,
functionName, key);
-                System.out.println(valueAndVersion);
+                FunctionState functionState = admin.functions().getFunctionState(tenant,
namespace, functionName, key);
+                Gson gson = new GsonBuilder().setPrettyPrinting().create();
+                System.out.println(gson.toJson(functionState));
                 if (watch) {
                     Thread.sleep(1000);
                 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
new file mode 100644
index 0000000..5062247
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.functions;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.*;
+
+@Getter
+@AllArgsConstructor
+@ToString
+@JsonInclude(JsonInclude.Include.USE_DEFAULTS)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FunctionState {
+    private String key;
+    private String stringValue;
+    private Long numberValue;
+    private Long version;
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 7f3ed44..6d6394f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionState;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
@@ -1094,15 +1095,14 @@ public class FunctionsImpl {
                         .entity(new String("key '" + key + "' doesn't exist."))
                         .build();
                 } else {
-                    String value;
+                    FunctionState value;
                     if (kv.isNumber()) {
-                        value = "value : " + kv.numberValue() + ", version : " + kv.version();
+                        value = new FunctionState(key, null, kv.numberValue(), kv.version());
                     } else {
-                        value = "value : " + new String(ByteBufUtil.getBytes(kv.value()),
UTF_8)
-                            + ", version : " + kv.version();
+                        value = new FunctionState(key, new String(ByteBufUtil.getBytes(kv.value()),
UTF_8), null, kv.version());
                     }
                     return Response.status(Status.OK)
-                        .entity(new String(value))
+                        .entity(new Gson().toJson(value))
                         .build();
                 }
             }


Mime
View raw message