pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] srkukarni closed pull request #2932: Made Window Context up to date with Context
Date Tue, 06 Nov 2018 17:11:18 GMT
srkukarni closed pull request #2932: Made Window Context up to date with Context
URL: https://github.com/apache/pulsar/pull/2932
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
index 63e395ce22..2f1f2e700c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
@@ -20,9 +20,25 @@
 
 import org.slf4j.Logger;
 
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 public interface WindowContext {
+
+    /**
+     * The tenant this function belongs to
+     * @return the tenant this function belongs to
+     */
+    String getTenant();
+
+    /**
+     * The namespace this function belongs to
+     * @return the namespace this function belongs to
+     */
+    String getNamespace();
+
     /**
      * The name of the function that we are executing
      * @return The Function name
@@ -42,6 +58,13 @@
      */
     int getInstanceId();
 
+    /**
+     * Get the number of instances that invoke this function.
+     *
+     * @return the number of instances that invoke this function.
+     */
+    int getNumInstances();
+
     /**
      * The version of the function that we are executing
      * @return The version id
@@ -49,23 +72,22 @@
     String getFunctionVersion();
 
     /**
-     * The memory limit that this function is limited to
-     * @return Memory limit in bytes
+     * Get a list of all input topics
+     * @return a list of all input topics
      */
-    long getMemoryLimit();
+    Collection<String> getInputTopics();
 
     /**
-     * The time budget in ms that the function is limited to.
-     * @return Time budget in msecs.
+     * Get the output topic of the function
+     * @return output topic name
      */
-    long getTimeBudgetInMs();
+    String getOutputTopic();
 
     /**
-     * The time in ms remaining for this function execution to complete before it
-     * will be flagged as an error
-     * @return Time remaining in ms.
+     * Get output schema builtin type or custom class name
+     * @return output schema builtin type or custom class name
      */
-    long getRemainingTimeInMs();
+    String getOutputSchemaType();
 
     /**
      * The logger object that can be used to log in a function
@@ -73,6 +95,43 @@
      */
     Logger getLogger();
 
+    /**
+     * Increment the builtin distributed counter refered by key
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    void incrCounter(String key, long amount);
+
+    /**
+     * Retrieve the counter value for the key.
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    long getCounter(String key);
+
+    /**
+     * Updare the state value for the key.
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    void putState(String key, ByteBuffer value);
+
+    /**
+     * Retrieve the state value for the key.
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    ByteBuffer getState(String key);
+
+    /**
+     * Get a map of all user-defined key/value configs for the function
+     * @return The full map of user-defined config values
+     */
+    Map<String, Object> getUserConfigMap();
+
     /**
      * Get Any user defined key/value
      * @param key The key
@@ -80,6 +139,14 @@
      */
     String getUserConfigValue(String key);
 
+    /**
+     * Get any user-defined key/value or a default value if none is present
+     * @param key
+     * @param defaultValue
+     * @return Either the user config value associated with a given key or a supplied default
value
+     */
+    Object getUserConfigValueOrDefault(String key, Object defaultValue);
+
     /**
      * Record a user defined metric
      * @param metricName The name of the metric
@@ -89,10 +156,22 @@
 
     /**
      * Publish an object using serDe for serializing to the topic
+     *
+     * @param topicName
+     *            The name of the topic for publishing
+     * @param object
+     *            The object that needs to be published
+     * @param schemaOrSerdeClassName
+     *            Either a builtin schema type (eg: "avro", "json", "protobuf") or the class
name of the custom schema class
+     * @return A future that completes when the framework is done publishing the message
+     */
+    <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
+
+    /**
+     * Publish an object to the topic using default schemas
      * @param topicName The name of the topic for publishing
      * @param object The object that needs to be published
-     * @param serDeClassName The class name of the class that needs to be used to serialize
the object before publishing
-     * @return
+     * @return A future that completes when the framework is done publishing the message
      */
-    CompletableFuture<Void> publish(String topicName, Object object, String serDeClassName);
+    <O> CompletableFuture<Void> publish(String topicName, O object);
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
index e03ed9701d..41e8ebed76 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
@@ -21,6 +21,9 @@
 import org.apache.pulsar.functions.api.Context;
 import org.slf4j.Logger;
 
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 public class WindowContextImpl implements WindowContext {
@@ -31,6 +34,16 @@ public WindowContextImpl(Context context) {
         this.context = context;
     }
 
+    @Override
+    public String getTenant() {
+        return this.context.getTenant();
+    }
+
+    @Override
+    public String getNamespace() {
+        return this.context.getNamespace();
+    }
+
     @Override
     public String getFunctionName() {
         return this.context.getFunctionName();
@@ -46,24 +59,29 @@ public int getInstanceId() {
         return this.context.getInstanceId();
     }
 
+    @Override
+    public int getNumInstances() {
+        return this.context.getNumInstances();
+    }
+
     @Override
     public String getFunctionVersion() {
         return this.getFunctionVersion();
     }
 
     @Override
-    public long getMemoryLimit() {
-        return this.getMemoryLimit();
+    public Collection<String> getInputTopics() {
+        return this.context.getInputTopics();
     }
 
     @Override
-    public long getTimeBudgetInMs() {
-        return this.getTimeBudgetInMs();
+    public String getOutputTopic() {
+        return this.context.getOutputTopic();
     }
 
     @Override
-    public long getRemainingTimeInMs() {
-        return this.getRemainingTimeInMs();
+    public String getOutputSchemaType() {
+        return this.context.getOutputSchemaType();
     }
 
     @Override
@@ -71,18 +89,53 @@ public Logger getLogger() {
         return this.getLogger();
     }
 
+    @Override
+    public void incrCounter(String key, long amount) {
+        this.context.incrCounter(key, amount);
+    }
+
+    @Override
+    public long getCounter(String key) {
+        return this.context.getCounter(key);
+    }
+
+    @Override
+    public void putState(String key, ByteBuffer value) {
+        this.context.putState(key, value);
+    }
+
+    @Override
+    public ByteBuffer getState(String key) {
+        return this.context.getState(key);
+    }
+
+    @Override
+    public Map<String, Object> getUserConfigMap() {
+        return this.context.getUserConfigMap();
+    }
+
     @Override
     public String getUserConfigValue(String key) {
         return this.getUserConfigValue(key);
     }
 
+    @Override
+    public Object getUserConfigValueOrDefault(String key, Object defaultValue) {
+        return this.context.getUserConfigValueOrDefault(key, defaultValue);
+    }
+
     @Override
     public void recordMetric(String metricName, double value) {
         this.context.recordMetric(metricName, value);
     }
 
     @Override
-    public CompletableFuture<Void> publish(String topicName, Object object, String
serDeClassName) {
-        return this.context.publish(topicName, object, serDeClassName);
+    public <O> CompletableFuture<Void> publish(String topicName, O object) {
+        return this.context.publish(topicName, object);
+    }
+
+    @Override
+    public CompletableFuture<Void> publish(String topicName, Object object, String
schemaOrSerdeClassName) {
+        return this.context.publish(topicName, object, schemaOrSerdeClassName);
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message