apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jaspal Singh <jaspal.singh1...@gmail.com>
Subject Re: Datatorrent fault tolerance
Date Fri, 07 Oct 2016 16:01:06 GMT
Thomas,

Below is the operator implementation we are trying to run. This operator is
getting an object of tenant class from updtream operator.

public class KafkaSinglePortExactlyOnceOutputOperator extends
AbstractKafkaOutputOperator {

    private static final Logger LOG =
LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);

    public transient final DefaultInputPort<Tenant> in = new
DefaultInputPort<Tenant>() {

        Gson gson = new Gson();

        @Override
        public void process(Tenant tenant) {

            try {
                Producer<String, String> producer = getKafkaProducer();
                //ObjectMapper mapper = new ObjectMapper();
                long now = System.currentTimeMillis();
                //Configuration conf = HBaseConfiguration.create();
                //TenantDao dao = new TenantDao(conf);
                //ArrayList<Put> puts = new ArrayList<>();
                if (tenant != null) {
                    //Tenant tenant = tenant.next();
                    if (StringUtils.isNotEmpty(tenant.getGl())) {
                        producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
tenant.getVolumeName(), gson.toJson(tenant)));
                        //puts.add(dao.mkPut(tenant));
                    } else {
                        producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
tenant.getVolumeName(), gson.toJson(tenant)));

                    }
                    producer.flush();
                }
            }


After building the application, it throws error during launch:

An error occurred trying to launch the application. Server message:
java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
java.lang.Class.getDeclaredFields0(Native Method) at
java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
java.lang.Class.getDeclaredFields(Class.java:1916) at


Thanks
Jaspal

On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <jaspal.singh1404@gmail.com>
wrote:

> Thomas,
>
> I was trying to refer to the input from previous operator.
>
> Another thing when we extend the AbstractKafkaOutputOperator, do we need
> to specify <String, T> ? Since we are getting an object of class type from
> previous operator.
>
>
> Thanks
> Jaspal
>
> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <thw@apache.org> wrote:
>
>> Are you referring to the upstream operator in the DAG or the state of the
>> previous application after relaunch? Since the data is stored in MapR
>> streams, an operator that is a producer can also act as a consumer. Please
>> clarify your question.
>>
>>
>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <jaspal.singh1404@gmail.com>
>> wrote:
>>
>>> Hi Thomas,
>>>
>>> I have a question, so when we are using
>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>> maprstream topic will it be able to read messgaes from the previous
>>> operator ?
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <thw@apache.org> wrote:
>>>
>>>> For recovery you need to set the window data manager like so:
>>>>
>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>> s/exactly-once/src/main/java/com/example/myapexapp/Application.java#L33
>>>>
>>>> That will also apply to stateful restart of the entire application
>>>> (relaunch from previous instance's checkpointed state).
>>>>
>>>> For cold restart, you would need to consider the property you mention
>>>> and decide what is applicable to your use case.
>>>>
>>>> Thomas
>>>>
>>>>
>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>
>>>>> One more thing, so you mentioned about checkpointing the offset ranges
>>>>> to replay in same order from kafka.
>>>>>
>>>>> Is there any property we need to configure to do that? like
>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>
>>>>>
>>>>> Thanks
>>>>> Jaspal
>>>>>
>>>>>
>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.weise@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>> exactly-once processing in a distributed system. In this case it
would be
>>>>>> rather "produce exactly-once. Upstream operators, on failure, will
recover
>>>>>> to checkpointed state and re-process the stream from there. This
is
>>>>>> at-least-once, the default behavior. Because in the input operator
you have
>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>> and the output operator can discard the results that were already
published
>>>>>> instead of producing duplicates.
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> I think this is something called a customized operator
>>>>>>> implementation that is taking care of exactly once processing
at output.
>>>>>>>
>>>>>>> What if any previous operators fail ? How we can make sure they
also
>>>>>>> recover using EXACTLY_ONCE processing mode ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>>
>>>>>>>
>>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.weise@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> In that case please have a look at:
>>>>>>>>
>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>> yOnceOutputOperator.java
>>>>>>>>
>>>>>>>> The operator will ensure that messages are not duplicated,
under
>>>>>>>> the stated assumptions.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Thomas,
>>>>>>>>>
>>>>>>>>> In our case we are writing the results back to maprstreams
topic based
>>>>>>>>> on some validations.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <thw@apache.org>
wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> which operators in your application are writing to
external
>>>>>>>>>> systems?
>>>>>>>>>>
>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>> ls/exactly-once), there is Kafka input, which is
configured to
>>>>>>>>>> be idempotent. The results are written to JDBC. That
operator by itself
>>>>>>>>>> supports exactly-once through transactions (in conjunction
with idempotent
>>>>>>>>>> input), hence there is no need to configure the processing
mode at all.
>>>>>>>>>>
>>>>>>>>>> Thomas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Mime
View raw message