samza-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [samza] prateekm commented on a change in pull request #1052: SAMZA-2222: Documentation for async API
Date Fri, 24 May 2019 20:26:53 GMT
prateekm commented on a change in pull request #1052: SAMZA-2222: Documentation for async API
URL: https://github.com/apache/samza/pull/1052#discussion_r287503895
 
 

 ##########
 File path: docs/learn/tutorials/versioned/samza-async-user-guide.md
 ##########
 @@ -107,16 +107,96 @@ task.callback.timeout.ms=5000
 
 **NOTE:** Samza also guarantees the in-order process of the messages within an AsyncStreamTask
by default, meaning the next processAsync() of a task won't be called until the previous processAsync()
callback has been triggered.
 
+### Asynchronous Process in High Level API
+
+If your application is asynchronous, e.g. making non-blocking remote IO calls, [AsyncFlatMapFunction](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/AsyncFlatMapFunction.html)
provides support for it. The following example illustrates an application that processes Wikipedia
feed updates and invokes a remote service to standardize the updates and sends the standardized
events to Wikipedia.
+
+{% highlight java %}
+
+public class WikipediaAsyncStandardizer implements StreamApplication {
+
+  @Override
+  public void describe(StreamApplicationDescriptor appDescriptor) {
+    // Define a SystemDescriptor for Wikipedia data
+    WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org",
6667);
+    // Define InputDescriptors for consuming wikipedia data
+    WikipediaInputDescriptor wikipediaInputDescriptor = wikipediaSystemDescriptor
+        .getInputDescriptor("en-wikipedia")
+        .withChannel("#en.wikipedia");
+    // Define OutputDescriptor for producing wikipedia data
+    WikipediaOutputDescriptor wikipediaOutputDescriptor = wikipediaSystemDescriptor
+        .getOutputDescriptor("en-wikipedia-standardized")
+        .withChannel("#en.wikipedia.standardized");
+
+    appDescriptor.getInputStream(wikipediaInputDescriptor)
+        .filter(WikipediaFeedEvent::isUpdate)
+        .flatMapAsync(new AsyncStandardizerFunction())
+        .sendTo(wikipediaOutputDescriptor);
+  }
+
+  static class AsyncStandardizerFunction implements AsyncFlatMapFunction<WikipediaFeedEvent,
StandardizedWikipediaFeedEvent> {
+    private transient Client client;
+
+    @Override
+    public void init(Context context) {
+      client = ClientBuilder.newClient(context.getJobContext().getConfig().get("standardizer.uri"));
+    }
+
+    @Override
+    public CompletionStage<Collection<StandardizedWikipediaFeedEvent>> apply(WikipediaFeedEvent
wikipediaFeedEvent) {
+      Request<StandardizerRequest> standardizerRequest = buildStandardizedRequest(wikipediaFeedEvent);
+      CompletableFuture<StandardizerResponse> standardizerResponse = client.sendRequest(standardizerRequest);
+
+      return standardizerResponse
+          .thenApply(response -> extractStandardizedWikipediaFeedEvent(response));
+    }
+
+    @Override
+    public void close() {
+      client.close();
+    }
+  }
+}
+{% endhighlight %}
+
+In the above example, the messages are not sent to Wikipedia after `AsyncStandardizerFunction`
returns. The framework keeps track of the future returned by the function and propagates the
standardized result to downstream operator (in this case sendTo) only when the future completes.
Samza has a timeout for the future to complete and can be configured through the following
property:
 
 Review comment:
   "The framework keeps track ..." etc is implementation detail. User guide should only talk
about what the semantics are. E.g. , "In the above example, the results from the asynchronous
... function are propagated to downstream operators once the future is complete."
   
   "Samza has a timeout for the future ..." makes it sound like we control when the future
is completed or have an await. It'll be better to just say that "there is an overall timeout
for each message to be processed and the processAsync callback to be invoked, and you can
tune that using ..."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message