apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jaspal Singh <jaspal.singh1...@gmail.com>
Subject Re: Datatorrent fault tolerance
Date Fri, 07 Oct 2016 18:32:49 GMT
Siyuan,

That's how we have given it in properties file:

[image: Inline image 1]


Thanks!!

On Fri, Oct 7, 2016 at 1:27 PM, hsy541@gmail.com <hsy541@gmail.com> wrote:

> Jaspal,
>
> Topic is a mandatory property you have to set. In mapr, the value should
> be set to the full stream path example:  /your/stream/path:streamname
>
> Regards,
> Siyuan
>
> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <jaspal.singh1404@gmail.com>
> wrote:
>
>> After making the change, we are getting the below error while application
>> launch:
>>
>> *An error occurred trying to launch the application. Server message:
>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>> constraints
>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>> propertyPath='topic', message='may not be null', *
>>
>>
>>
>> Thanks!!
>>
>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <jaspal.singh1404@gmail.com>
>> wrote:
>>
>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>
>>> Thanks for your inputs !!
>>>
>>>
>>>
>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <jaspal.singh1404@gmail.com
>>> > wrote:
>>>
>>>> Should we use malhar-library version 3.5 then ?
>>>>
>>>>
>>>> Thanks!!
>>>>
>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <thw@apache.org> wrote:
>>>>
>>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>>>> This operator is not in malhar-library, it's a separate module.
>>>>>
>>>>>
>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> Hi Siyuan,
>>>>>>
>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>
>>>>>>
>>>>>> Thanks!!
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <hsy541@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Also which kafka output operator you are using?
>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator
works
>>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>
>>>>>>> Regards,
>>>>>>> Siyuan
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <hsy541@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Jaspal,
>>>>>>>>
>>>>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>> from malhar?  If so please make sure the producer you use
here is
>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that
is not
>>>>>>>> supported by MapR stream.
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Siyuan
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thomas,
>>>>>>>>>
>>>>>>>>> Below is the operator implementation we are trying to
run. This
>>>>>>>>> operator is getting an object of tenant class from updtream
operator.
>>>>>>>>>
>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator
extends AbstractKafkaOutputOperator {
>>>>>>>>>
>>>>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>
>>>>>>>>>     public transient final DefaultInputPort<Tenant>
in = new DefaultInputPort<Tenant>() {
>>>>>>>>>
>>>>>>>>>         Gson gson = new Gson();
>>>>>>>>>
>>>>>>>>>         @Override
>>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>>
>>>>>>>>>             try {
>>>>>>>>>                 Producer<String, String> producer
= getKafkaProducer();
>>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>>                 if (tenant != null) {
>>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl()))
{
>>>>>>>>>                         producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>>                     } else {
>>>>>>>>>                         producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(),
gson.toJson(tenant)));
>>>>>>>>>
>>>>>>>>>                     }
>>>>>>>>>                     producer.flush();
>>>>>>>>>                 }
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> After building the application, it throws error during
launch:
>>>>>>>>>
>>>>>>>>> An error occurred trying to launch the application. Server
>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583)
at
>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thomas,
>>>>>>>>>>
>>>>>>>>>> I was trying to refer to the input from previous
operator.
>>>>>>>>>>
>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator,
do
>>>>>>>>>> we need to specify <String, T> ? Since we are
getting an object of class
>>>>>>>>>> type from previous operator.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <thw@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you referring to the upstream operator in
the DAG or the
>>>>>>>>>>> state of the previous application after relaunch?
Since the data is stored
>>>>>>>>>>> in MapR streams, an operator that is a producer
can also act as a consumer.
>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh
<
>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator*
to write results
>>>>>>>>>>>> into maprstream topic will it be able to
read messgaes from the previous
>>>>>>>>>>>> operator ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise
<thw@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> For recovery you need to set the window
data manager like so:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>>>> on.java#L33
>>>>>>>>>>>>>
>>>>>>>>>>>>> That will also apply to stateful restart
of the entire
>>>>>>>>>>>>> application (relaunch from previous instance's
checkpointed state).
>>>>>>>>>>>>>
>>>>>>>>>>>>> For cold restart, you would need to consider
the property you
>>>>>>>>>>>>> mention and decide what is applicable
to your use case.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal
Singh <
>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice
explaination !!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> One more thing, so you mentioned
about checkpointing the
>>>>>>>>>>>>>> offset ranges to replay in same order
from kafka.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is there any property we need to
configure to do that? like
>>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas
Weise <
>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What you want is the effect of
exactly-once output (that's
>>>>>>>>>>>>>>> why we call it also end-to-end
exactly-once). There is no such thing as
>>>>>>>>>>>>>>> exactly-once processing in a
distributed system. In this case it would be
>>>>>>>>>>>>>>> rather "produce exactly-once.
Upstream operators, on failure, will recover
>>>>>>>>>>>>>>> to checkpointed state and re-process
the stream from there. This is
>>>>>>>>>>>>>>> at-least-once, the default behavior.
Because in the input operator you have
>>>>>>>>>>>>>>> configured to replay in the same
order from Kafka (this is done by
>>>>>>>>>>>>>>> checkpointing the offset ranges),
the computation in the DAG is idempotent
>>>>>>>>>>>>>>> and the output operator can discard
the results that were already published
>>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM,
Jaspal Singh <
>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com>
wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think this is something
called a customized operator
>>>>>>>>>>>>>>>> implementation that is taking
care of exactly once processing at output.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What if any previous operators
fail ? How we can make sure
>>>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE
processing mode ?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thursday, October 6, 2016,
Thomas Weise <
>>>>>>>>>>>>>>>> thomas.weise@gmail.com>
wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In that case please have
a look at:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The operator will ensure
that messages are not duplicated,
>>>>>>>>>>>>>>>>> under the stated assumptions.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at
3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com>
wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In our case we are
writing the results back to
>>>>>>>>>>>>>>>>>> maprstreams topic
based on some validations.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thursday, October
6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>> thw@apache.org>
wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> which operators
in your application are writing to
>>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> When you look
at the example from the blog (
>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once),
there
>>>>>>>>>>>>>>>>>>> is Kafka input,
which is configured to be idempotent. The results are
>>>>>>>>>>>>>>>>>>> written to JDBC.
That operator by itself supports exactly-once through
>>>>>>>>>>>>>>>>>>> transactions
(in conjunction with idempotent input), hence there is no need
>>>>>>>>>>>>>>>>>>> to configure
the processing mode at all.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message