pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhai...@apache.org
Subject [pulsar] branch master updated: Issue #2657: change function cli getstate to use REST endpoint (#2943)
Date Thu, 22 Nov 2018 12:05:29 GMT
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4253de7  Issue #2657: change function cli getstate to use REST endpoint (#2943)
4253de7 is described below

commit 4253de718cd1b1519a896995278cb9976ee2c0df
Author: Jia Zhai <jiazhai@users.noreply.github.com>
AuthorDate: Thu Nov 22 20:05:24 2018 +0800

    Issue #2657: change function cli getstate to use REST endpoint (#2943)
    
    # Motivation
    change StateGetter to use REST endpoint
    
    # Modifications
    change StateGetter to use REST endpoint
    change related unit test
    
    # Result
    unit tests pass.
---
 distribution/server/src/assemble/LICENSE.bin.txt   |  7 +-
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  | 95 ++++++++--------------
 pulsar-client-tools/pom.xml                        |  6 --
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 68 ++--------------
 4 files changed, 46 insertions(+), 130 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 49ac857..57f9638 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -343,7 +343,7 @@ The Apache Software License, Version 2.0
     - commons-configuration-commons-configuration-1.6.jar
     - commons-digester-commons-digester-1.8.jar
     - commons-io-commons-io-2.5.jar
-    - commons-lang-commons-lang-2.6.jar
+    - commons-lang-commons-lang-2.4.jar
     - commons-logging-commons-logging-1.1.1.jar
     - org.apache.commons-commons-collections4-4.1.jar
     - org.apache.commons-commons-compress-1.15.jar
@@ -476,14 +476,11 @@ BSD 3-clause "New" or "Revised" License
     - com.google.auth-google-auth-library-credentials-0.9.0.jar -- licenses/LICENSE-google-auth-library.txt
  * JLine -- jline-jline-0.9.94.jar -- licenses/LICENSE.JLine.txt
  * LevelDB -- (included in org.rocksdb.*.jar) -- licenses/LICENSE-LevelDB.txt
- * JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- licenses/LICENSE-JSR305.txt
+ * JSR305 -- com.google.code.findbugs-jsr305-3.0.0.jar -- licenses/LICENSE-JSR305.txt
 
 BSD 2-Clause License
  * HdrHistogram -- org.hdrhistogram-HdrHistogram-2.1.9.jar -- licenses/LICENSE-HdrHistogram.txt
 
-BSD License
- * Hamcrest -- org.hamcrest-hamcrest-core-1.1.jar -- licenses/LICENSE-Hamcrest.txt
-
 MIT License
  * Java SemVer -- com.github.zafarkhaja-java-semver-0.9.0.jar -- licenses/LICENSE-SemVer.txt
  * SLF4J -- licenses/LICENSE-SLF4J.txt
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index e0a0d4e..950ee94 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -18,32 +18,47 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.api.StorageClient;
-import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.admin.cli.CmdFunctions.CreateFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.GetFunctionStatus;
 import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
 import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction;
+import org.apache.pulsar.admin.cli.CmdFunctions.StateGetter;
 import org.apache.pulsar.admin.cli.CmdFunctions.StopFunction;
 import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
 import org.apache.pulsar.client.admin.Functions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
-import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.IObjectFactory;
@@ -51,30 +66,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.assertNull;
-
 /**
  * Unit test of {@link CmdFunctions}.
  */
@@ -507,42 +498,26 @@ public class CmdFunctionsTest {
 
     @Test
     public void testStateGetter() throws Exception {
-        String tenant = TEST_NAME + "_tenant";
-        String namespace = TEST_NAME + "_namespace";
-        String fnName = TEST_NAME + "_function";
-
-        mockStatic(StorageClientBuilder.class);
-
-        StorageClientBuilder builder = mock(StorageClientBuilder.class);
-        when(builder.withSettings(any(StorageClientSettings.class))).thenReturn(builder);
-        when(builder.withNamespace(eq(tenant + "_" + namespace))).thenReturn(builder);
-        StorageClient client = mock(StorageClient.class);
-        when(builder.build()).thenReturn(client);
-
-        PowerMockito.when(StorageClientBuilder.class, "newBuilder")
-            .thenReturn(builder);
-
-        Table<ByteBuf, ByteBuf> table = mock(Table.class);
-        when(client.openTable(eq(fnName))).thenReturn(FutureUtils.value(table));
-        AtomicReference<ByteBuf> keyHolder = new AtomicReference<>();
-        doAnswer(invocationOnMock -> {
-            ByteBuf buf = invocationOnMock.getArgumentAt(0, ByteBuf.class);
-            keyHolder.set(buf);
-            return FutureUtils.value(null);
-        }).when(table).getKv(any(ByteBuf.class));
+        String tenant = TEST_NAME + "-tenant";
+        String namespace = TEST_NAME + "-namespace";
+        String fnName = TEST_NAME + "-function";
+        String key = TEST_NAME + "-key";
 
         cmd.run(new String[] {
             "querystate",
             "--tenant", tenant,
             "--namespace", namespace,
             "--name", fnName,
-            "--key", "test-key",
-            "--storage-service-url", "bk://127.0.0.1:4181"
+            "--key", key
         });
 
-        assertEquals(
-            "test-key",
-            new String(ByteBufUtil.getBytes(keyHolder.get()), UTF_8));
+        StateGetter stateGetter = cmd.getStateGetter();
+
+        assertEquals(tenant, stateGetter.getTenant());
+        assertEquals(namespace, stateGetter.getNamespace());
+        assertEquals(fnName, stateGetter.getFunctionName());
+
+        verify(functions, times(1)).getFunctionState(eq(tenant), eq(namespace), eq(fnName),
eq(key));
     }
 
     private static final String fnName = TEST_NAME + "-function";
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 5ceb0df..d7ccbb6 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -68,12 +68,6 @@
     </dependency>
 
     <!-- functions related dependencies (begin) -->
-
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>stream-storage-java-client</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-functions-utils</artifactId>
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index bfadbe4..cfe709d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -18,9 +18,6 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.commons.lang.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
@@ -35,11 +32,7 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
-
 import com.google.protobuf.util.JsonFormat;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.Unpooled;
 
 import java.io.File;
 import java.lang.reflect.Field;
@@ -52,20 +45,14 @@ import java.util.Map;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
-import org.apache.bookkeeper.api.StorageClient;
-import org.apache.bookkeeper.api.kv.Table;
-import org.apache.bookkeeper.api.kv.result.KeyValue;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
-import org.apache.pulsar.common.functions.WindowConfig;
 import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.functions.WindowConfig;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style
compute processes that work with Pulsar)")
@@ -690,10 +677,10 @@ public class CmdFunctions extends CmdBase {
 
     @Parameters(commandDescription = "Temporary stops function instance. (If worker restarts
then it reassigns and starts functiona again")
     class StopFunction extends FunctionCommand {
-        
+
         @Parameter(names = "--instance-id", description = "The function instanceId (stop
all instances if instance-id is not provided")
         protected String instanceId;
-        
+
         @Override
         void runCmd() throws Exception {
             if (isNotBlank(instanceId)) {
@@ -763,55 +750,18 @@ public class CmdFunctions extends CmdBase {
         @Parameter(names = { "-k", "--key" }, description = "key")
         private String key = null;
 
-        // TODO: this url should be fetched along with bookkeeper location from pulsar admin
-        @Parameter(names = { "-u", "--storage-service-url" }, description = "The URL for
the storage service used by the function")
-        private String stateStorageServiceUrl = null;
-
         @Parameter(names = { "-w", "--watch" }, description = "Watch for changes in the value
associated with a key for a Pulsar Function")
         private boolean watch = false;
 
         @Override
         void runCmd() throws Exception {
-            checkNotNull(stateStorageServiceUrl, "The state storage service URL is missing");
-
-            String tableNs = String.format(
-                "%s_%s",
-                tenant,
-                namespace).replace('-', '_');
-
-            String tableName = getFunctionName();
-
-            try (StorageClient client = StorageClientBuilder.newBuilder()
-                 .withSettings(StorageClientSettings.newBuilder()
-                     .serviceUri(stateStorageServiceUrl)
-                     .clientName("functions-admin")
-                     .build())
-                 .withNamespace(tableNs)
-                 .build()) {
-                try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName)))
{
-                    long lastVersion = -1L;
-                    do {
-                        try (KeyValue<ByteBuf, ByteBuf> kv = result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8)))))
{
-                            if (null == kv) {
-                                System.out.println("key '" + key + "' doesn't exist.");
-                            } else {
-                                if (kv.version() > lastVersion) {
-                                    if (kv.isNumber()) {
-                                        System.out.println("value = " + kv.numberValue());
-                                    } else {
-                                        System.out.println("value = " + new String(ByteBufUtil.getBytes(kv.value()),
UTF_8));
-                                    }
-                                    lastVersion = kv.version();
-                                }
-                            }
-                        }
-                        if (watch) {
-                            Thread.sleep(1000);
-                        }
-                    } while (watch);
+            do {
+                String valueAndVersion = admin.functions().getFunctionState(tenant, namespace,
functionName, key);
+                System.out.println(valueAndVersion);
+                if (watch) {
+                    Thread.sleep(1000);
                 }
-            }
-
+            } while (watch);
         }
     }
 


Mime
View raw message