pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [incubator-pulsar.wiki] branch master updated: Created PIP 23: Message Tracing By Interceptors (markdown)
Date Tue, 21 Aug 2018 10:47:11 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
     new e0388dd  Created PIP 23: Message Tracing By Interceptors (markdown)
e0388dd is described below

commit e0388dd8484f562109c6a4e689983883d7c1beba
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Tue Aug 21 03:47:09 2018 -0700

    Created PIP 23: Message Tracing By Interceptors (markdown)
---
 PIP-23:-Message-Tracing-By-Interceptors.md | 239 +++++++++++++++++++++++++++++
 1 file changed, 239 insertions(+)

diff --git a/PIP-23:-Message-Tracing-By-Interceptors.md b/PIP-23:-Message-Tracing-By-Interceptors.md
new file mode 100644
index 0000000..d57d558
--- /dev/null
+++ b/PIP-23:-Message-Tracing-By-Interceptors.md
@@ -0,0 +1,239 @@
+- **Status**: Proposed
+- **Author**: [Penghui Li](https://github.com/codelipenghui)
+- **Pull Request**: -
+- **Mailing List discussion**: 
+
+## Requirement & Overview
+
+This is related to issue #2280. 
+And for Pulsar itself, That will be useful also to track performance issues and understand
more of the latency breakdowns. Tracing is the easiest way to do: "show me a request with
high-latency and show where the latency (for this particular request) was coming from.
+
+## GOALS
+
+A good approach to that, is to integrate a tracing standard, either OpenTracing or OpenCensus,
at both bookkeeper and pulsar. so their applications can integrate with OpenTracing or OpenCensus
as well,  then application's traceId, spanId can be passed all way from producer to broker
to bookie and to consumer. Then those traces can be collected and fed into an ElasticSearch
cluster for query.
+
+## IMPLEMENTATION
+
+Integration with a tracing system typically requires tight integration with applications.
Especially if Pulsar is used in a data pipeline, Pulsar should be able to provide mechanism
to be able to propagate trace context end-to-end. That says Pulsar doesn’t have to integrate
with any tracing framework directly, but it has to provide mechanism for applications to do
so. This can be done by:
+
+- Use `properties` in message header for propagating trace context.
+- Provide an interface to intercept message pipeline in Pulsar, which allows applications
customize their own tracing behaviors and meet their requirements.
+
+This PIP introduces **Interceptor** to examine (and potentially modify) messages at key places
during the lifecycle of a Pulsar message. The interceptors include `ProducerInterceptor` for
intercepting messages at Producer side, and `ConsumerInterceptor` for intercepting messages
at Consumer side.
+
+### Producer Interceptor
+
+The ProducerInterceptor intercept messages before sending them and on receiving acknowledgement
from brokers. The interfaces for a ProducerInterceptor are defined as below:
+
+```java
+package org.apache.pulsar.client.api;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate) the 
+ * messages received by the producer before they are published to the Pulsar   
+ * brokers.
+ * <p>
+ * Exceptions thrown by ProducerInterceptor methods will be caught, logged, but  
+ * not propagated further.
+ * <p>
+ * ProducerInterceptor callbacks may be called from multiple threads. Interceptor  
+ * implementation must ensure thread-safety, if needed.
+ */
+public interface ProducerInterceptor<T> extends AutoCloseable {
+
+    /**
+     * Close the interceptor.
+     */
+    void close();
+
+    /**
+     * This is called from {@link Producer#send(Object)} and {@link  
+     * Producer#sendAsync(Object)} methods, before
+     * send the message to the brokers. This method is allowed to modify the 
+     * record, in which case, the new record
+     * will be returned.
+     * <p>
+     * Any exception thrown by this method will be caught by the caller and 
+     * logged, but not propagated further.
+     * <p>
+     * Since the producer may run multiple interceptors, a particular 
+     * interceptor's {@link #beforeSend(Message)} callback will be called in the   
+     * order specified by 
+     * {@link ProducerBuilder#intercept(ProducerInterceptor[])}.
+     * <p>
+     * The first interceptor in the list gets the message passed from the client, 
+     * the following interceptor will be passed the message returned by the   
+     * previous interceptor, and so on. Since interceptors are allowed to modify
+     * messages, interceptors may potentially get the message already modified by   
+     * other interceptors. However, building a pipeline of mutable interceptors   
+     * that depend on the output of the previous interceptor is discouraged, 
+     * because of potential side-effects caused by interceptors potentially   
+     * failing to modify the message and throwing an exception. If one of the 
+     * interceptors in the list throws an exception from   
+     * {@link#beforeSend(Message)}, the exception is caught, logged, and the next 
+     * interceptor is called with the message returned by the last successful   
+     * interceptor in the list, or otherwise the client.
+     *
+     * @param message message to send
+     * @return the intercepted message
+     */
+    Message<T> beforeSend(Message<T> message);
+
+    /**
+     * This method is called when the message sent to the broker has been 
+     * acknowledged, or when sending the message fails. 
+     * This method is generally called just before the user callback is 
+     * called, and in additional cases when an exception on the producer side.
+     * <p>
+     * Any exception thrown by this method will be ignored by the caller.
+     * <p>
+     * This method will generally execute in the background I/O thread, so the   
+     * implementation should be reasonably fast. Otherwise, sending of messages    
+     * from other threads could be delayed.
+     *
+     * @param message the message that application sends
+     * @param msgId the message id that assigned by the broker; null if send failed.
+     * @param cause the exception on sending messages, null indicates send has succeed.
+     */
+    void onSendAcknowledgement(Message<T> message, MessageId msgId, Throwable cause);
+
+}
+```
+
+### Consumer Interceptor
+
+The ConsumerInterceptor intercept messages before delivering to consumers and before sending
acknowledgements to brokers. The interfaces for a ConsumerInterceptor are defined as below:
+
+```java
+package org.apache.pulsar.client.api;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate)
+ * messages received by the consumer.
+ *
+ * <p>A primary use case is to hook into consumer applications for custom
+ * monitoring, logging, etc.
+ *
+ * <p>Exceptions thrown by interceptor methods will be caught, logged, but
+ * not propagated further.
+ */
+public interface ConsumerInterceptor<T> extends AutoCloseable {
+
+    /**
+     * Close the interceptor.
+     */
+    void close();
+
+    /**
+     * This is called just before the message is returned by
+     * {@link Consumer#receive()}, {@link MessageListener#received(Consumer, 
+     * Message)} or the {@link java.util.concurrent.CompletableFuture} returned by
+     * {@link Consumer#receiveAsync()} completes.
+     * <p>
+     * This method is allowed to modify message, in which case the new message
+     * will be returned.
+     * <p>
+     * Any exception thrown by this method will be caught by the caller, logged,
+     * but not propagated to client.
+     * <p>
+     * Since the consumer may run multiple interceptors, a particular 
+     * interceptor's
+     * <tt>beforeConsume</tt> callback will be called in the order specified
by
+     * {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}. The first 
+     * interceptor in the list gets the consumed message, the following 
+     * interceptor will be passed
+     * the message returned by the previous interceptor, and so on. Since 
+     * interceptors are allowed to modify message, interceptors may potentially 
+     * get the messages already modified by other interceptors. However building a   
+     * pipeline of mutable
+     * interceptors that depend on the output of the previous interceptor is   
+     * discouraged, because of potential side-effects caused by interceptors 
+     * potentially failing to modify the message and throwing an exception. 
+     * if one of interceptors in the list throws an exception from 
+     * <tt>beforeConsume</tt>, the exception is caught, logged,
+     * and the next interceptor is called with the message returned by the last 
+     * successful interceptor in the list, or otherwise the original consumed 
+     * message.
+     *
+     * @param message the message to be consumed by the client.
+     * @return message that is either modified by the interceptor or same message
+     *         passed into the method.
+     */
+    Message<T> beforeConsume(Message<T> message);
+
+    /**
+     * This is called consumer sends the acknowledgment to the broker.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param message message to ack, null if acknowledge fail.
+     * @param cause the exception on acknowledge.
+     */
+    void onAcknowledge(Message<T> message, Throwable cause);
+
+    /**
+     * This is called consumer send the cumulative acknowledgment to the broker.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param messages messages to ack cumulative, null if acknowledge fail.
+     * @param cause the exception on acknowledge.
+     */
+    void onAcknowledgeCumulative(List<Message<T>> messages, Throwable cause);
+
+}
+```
+
+### Configure Interceptors
+
+Applications can configure interceptors via ProducerBuilder and ConsumerBuilder.
+
+```java
+/**
+ * Intercept {@link Producer}.
+ *
+ * @param interceptors the list of interceptors to intercept the producer created by this
builder.
+ * @return producer builder.
+ */
+ProducerBuilder<T> intercept(ProducerInterceptor<T> ... interceptors);
+```
+
+```java
+/**
+ * Intercept {@link Consumer}.
+ *
+ * @param interceptors the list of interceptors to intercept the consumer created by this
builder.
+ * @return consumer builder.
+ */
+ConsumerBuilder<T> intercept(ConsumerInterceptor<T>... interceptors);
+```
+
+### Tracing Interceptor
+
+Applications can choose its preferred tracing framework for integration. The tracing framework
can be either OpenTracing or OpenCensus. Pulsar doesn’t enforce any tracing framework. 
+
+Following shows an example how end-to-end tracing using OpenTracing works using Interceptors
in Pulsar.
+
+#### Trace Context
+
+In order to achieve end-to-end tracing, the most important thing is to be able to propagate
trace context from Producer to Consumer. This trace context can be serialized and propagated
into the properties in message header and carried as part of message.
+
+Use OpenTracing as an example:
+
+- The trace context carrier is the Message.
+- A trace context Format need to be defined how to serialize / deserialize trace context
from the properties of this Message.
+
+#### Producer Tracing Interceptor
+
+A tracing interceptor is pretty straightforward:
+
+- In the implementation of `beforeSend` method, it should first try to deserialize a trace
context (because the application who is using Pulsar producer might already set a trace context
in the message header). If there is a trace context, then use the trace context, otherwise
it should start a new trace span.
+- Once a trace context is established, it can record any information that needs to be traced.
+
+Similarly,
+
+- In the implementation of `onSendAcknowledgment`, it can retrieve the trace context and
record any information that needs to be traced on acknowledgments.
+
+#### Consumer Tracing Interceptor
+
+Implementing a consumer tracing interceptor is similar as implementing a producer tracing
interceptor as above.
\ No newline at end of file


Mime
View raw message