apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "hsy541@gmail.com" <hsy...@gmail.com>
Subject Re: Datatorrent fault tolerance
Date Fri, 07 Oct 2016 20:07:21 GMT
Oh I see, you want to send to different topics. Well, then you have to give
some dummy value to the topic property on the operator.

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:38 AM, Jaspal Singh <jaspal.singh1404@gmail.com>
wrote:

> Siyuan,
>
> So for the output operator, we have specified it as a part of our logic
> itself.
>
> public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOutputOperator
{
>
>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>()
{
>
>         Gson gson = new Gson();
>
>         @Override
>         public void process(Tenant tenant) {
>
>             try {
>                 Producer<String, String> producer = getKafkaProducer();
>                 //ObjectMapper mapper = new ObjectMapper();
>                 long now = System.currentTimeMillis();
>                 //Configuration conf = HBaseConfiguration.create();
>                 //TenantDao dao = new TenantDao(conf);
>                 //ArrayList<Put> puts = new ArrayList<>();
>                 if (tenant != null) {
>                     //Tenant tenant = tenant.next();
>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
tenant.getVolumeName(), gson.toJson(tenant)));
>                         //puts.add(dao.mkPut(tenant));
>                     } else {
>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
tenant.getVolumeName(), gson.toJson(tenant)));
>
>                     }
>                     producer.flush();
>                 }
>             }
>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:34 PM, hsy541@gmail.com <hsy541@gmail.com> wrote:
>
>> Jaspal,
>>
>> I think you miss the kafkaOut  :)
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1404@gmail.com
>> > wrote:
>>
>>> Siyuan,
>>>
>>> That's how we have given it in properties file:
>>>
>>> [image: Inline image 1]
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:27 PM, hsy541@gmail.com <hsy541@gmail.com>
>>> wrote:
>>>
>>>> Jaspal,
>>>>
>>>> Topic is a mandatory property you have to set. In mapr, the value
>>>> should be set to the full stream path example:  /your/stream/path:streamname
>>>>
>>>> Regards,
>>>> Siyuan
>>>>
>>>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> After making the change, we are getting the below error while
>>>>> application launch:
>>>>>
>>>>> *An error occurred trying to launch the application. Server message:
>>>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>>>> constraints
>>>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>>>> propertyPath='topic', message='may not be null', *
>>>>>
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>>>
>>>>>> Thanks for your inputs !!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> Should we use malhar-library version 3.5 then ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks!!
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <thw@apache.org>
wrote:
>>>>>>>
>>>>>>>> Please make sure to depend on version 3.5 of malhar-kafka
in
>>>>>>>> pom.xml. This operator is not in malhar-library, it's a separate
module.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Siyuan,
>>>>>>>>>
>>>>>>>>> I am using the same Kafka producer as you mentioned.
But I am not
>>>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library
while import.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks!!
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <
>>>>>>>>> hsy541@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Also which kafka output operator you are using?
>>>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>>> works with MapR stream, the latter only works with
kafka 0.8.* or 0.9
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Siyuan
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com
<
>>>>>>>>>> hsy541@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Jaspal,
>>>>>>>>>>>
>>>>>>>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>>>> from malhar?  If so please make sure the producer
you use here
>>>>>>>>>>> is org.apache.kafka.clients.producer.KafkaProducer
instead of
>>>>>>>>>>> kafka.javaapi.producer.Producer.  That is old
api and that is
>>>>>>>>>>> not supported by MapR stream.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Siyuan
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh
<
>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> Below is the operator implementation we are
trying to run. This
>>>>>>>>>>>> operator is getting an object of tenant class
from updtream operator.
>>>>>>>>>>>>
>>>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator
extends AbstractKafkaOutputOperator {
>>>>>>>>>>>>
>>>>>>>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>>>>
>>>>>>>>>>>>     public transient final DefaultInputPort<Tenant>
in = new DefaultInputPort<Tenant>() {
>>>>>>>>>>>>
>>>>>>>>>>>>         Gson gson = new Gson();
>>>>>>>>>>>>
>>>>>>>>>>>>         @Override
>>>>>>>>>>>>         public void process(Tenant tenant)
{
>>>>>>>>>>>>
>>>>>>>>>>>>             try {
>>>>>>>>>>>>                 Producer<String, String>
producer = getKafkaProducer();
>>>>>>>>>>>>                 //ObjectMapper mapper = new
ObjectMapper();
>>>>>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>>>>>                 //ArrayList<Put> puts
= new ArrayList<>();
>>>>>>>>>>>>                 if (tenant != null) {
>>>>>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl()))
{
>>>>>>>>>>>>                         producer.send(new
ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>>>>>                     } else {
>>>>>>>>>>>>                         producer.send(new
ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>>
>>>>>>>>>>>>                     }
>>>>>>>>>>>>                     producer.flush();
>>>>>>>>>>>>                 }
>>>>>>>>>>>>             }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> After building the application, it throws
error during launch:
>>>>>>>>>>>>
>>>>>>>>>>>> An error occurred trying to launch the application.
Server
>>>>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native
Method) at
>>>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583)
at
>>>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916)
at
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh
<
>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was trying to refer to the input from
previous operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator,
>>>>>>>>>>>>> do we need to specify <String, T>
? Since we are getting an object of class
>>>>>>>>>>>>> type from previous operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas
Weise <thw@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Are you referring to the upstream
operator in the DAG or the
>>>>>>>>>>>>>> state of the previous application
after relaunch? Since the data is stored
>>>>>>>>>>>>>> in MapR streams, an operator that
is a producer can also act as a consumer.
>>>>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal
Singh <
>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a question, so when we
are using
>>>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator*
to write results
>>>>>>>>>>>>>>> into maprstream topic will it
be able to read messgaes from the previous
>>>>>>>>>>>>>>> operator ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM,
Thomas Weise <thw@apache.org
>>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For recovery you need to
set the window data manager like
>>>>>>>>>>>>>>>> so:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://github.com/DataTorrent
>>>>>>>>>>>>>>>> /examples/blob/master/tutorial
>>>>>>>>>>>>>>>> s/exactly-once/src/main/java/c
>>>>>>>>>>>>>>>> om/example/myapexapp/Application.java#L33
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> That will also apply to stateful
restart of the entire
>>>>>>>>>>>>>>>> application (relaunch from
previous instance's checkpointed state).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For cold restart, you would
need to consider the property
>>>>>>>>>>>>>>>> you mention and decide what
is applicable to your use case.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16
PM, Jaspal Singh <
>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com>
wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ok now I get it. Thanks
for the nice explaination !!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> One more thing, so you
mentioned about checkpointing the
>>>>>>>>>>>>>>>>> offset ranges to replay
in same order from kafka.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is there any property
we need to configure to do that?
>>>>>>>>>>>>>>>>> like initialOffset set
to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thursday, October
6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>> thomas.weise@gmail.com>
wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What you want is
the effect of exactly-once output
>>>>>>>>>>>>>>>>>> (that's why we call
it also end-to-end exactly-once). There is no such
>>>>>>>>>>>>>>>>>> thing as exactly-once
processing in a distributed system. In this case it
>>>>>>>>>>>>>>>>>> would be rather "produce
exactly-once. Upstream operators, on failure, will
>>>>>>>>>>>>>>>>>> recover to checkpointed
state and re-process the stream from there. This is
>>>>>>>>>>>>>>>>>> at-least-once, the
default behavior. Because in the input operator you have
>>>>>>>>>>>>>>>>>> configured to replay
in the same order from Kafka (this is done by
>>>>>>>>>>>>>>>>>> checkpointing the
offset ranges), the computation in the DAG is idempotent
>>>>>>>>>>>>>>>>>> and the output operator
can discard the results that were already published
>>>>>>>>>>>>>>>>>> instead of producing
duplicates.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016
at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com>
wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I think this
is something called a customized operator
>>>>>>>>>>>>>>>>>>> implementation
that is taking care of exactly once processing at output.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What if any previous
operators fail ? How we can make
>>>>>>>>>>>>>>>>>>> sure they also
recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thursday,
October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>> thomas.weise@gmail.com>
wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In that case
please have a look at:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The operator
will ensure that messages are not
>>>>>>>>>>>>>>>>>>>> duplicated,
under the stated assumptions.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, Oct
6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com>
wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> In our
case we are writing the results back to
>>>>>>>>>>>>>>>>>>>>> maprstreams
topic based on some validations.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thursday,
October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>>>> thw@apache.org>
wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> which
operators in your application are writing to
>>>>>>>>>>>>>>>>>>>>>> external
systems?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> When
you look at the example from the blog (
>>>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once),
>>>>>>>>>>>>>>>>>>>>>> there
is Kafka input, which is configured to be idempotent. The results are
>>>>>>>>>>>>>>>>>>>>>> written
to JDBC. That operator by itself supports exactly-once through
>>>>>>>>>>>>>>>>>>>>>> transactions
(in conjunction with idempotent input), hence there is no need
>>>>>>>>>>>>>>>>>>>>>> to
configure the processing mode at all.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message