apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "hsy541@gmail.com" <hsy...@gmail.com>
Subject Re: Datatorrent fault tolerance
Date Fri, 07 Oct 2016 18:27:32 GMT
Jaspal,

Topic is a mandatory property you have to set. In mapr, the value should be
set to the full stream path example:  /your/stream/path:streamname

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <jaspal.singh1404@gmail.com>
wrote:

> After making the change, we are getting the below error while application
> launch:
>
> *An error occurred trying to launch the application. Server message:
> javax.validation.ConstraintViolationException: Operator kafkaOut violates
> constraints
> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
> propertyPath='topic', message='may not be null', *
>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <jaspal.singh1404@gmail.com>
> wrote:
>
>> So I just changes the malhar-kafka version to 3.5.0, I was able to import
>> the AbstractOutputOperator. Let me try to launch it now.
>>
>> Thanks for your inputs !!
>>
>>
>>
>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <jaspal.singh1404@gmail.com>
>> wrote:
>>
>>> Should we use malhar-library version 3.5 then ?
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <thw@apache.org> wrote:
>>>
>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>>> This operator is not in malhar-library, it's a separate module.
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> Hi Siyuan,
>>>>>
>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <hsy541@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Also which kafka output operator you are using?
>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>
>>>>>> Regards,
>>>>>> Siyuan
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <hsy541@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Jaspal,
>>>>>>>
>>>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>>>> from malhar?  If so please make sure the producer you use here
is
>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is
not
>>>>>>> supported by MapR stream.
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Siyuan
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thomas,
>>>>>>>>
>>>>>>>> Below is the operator implementation we are trying to run.
This
>>>>>>>> operator is getting an object of tenant class from updtream
operator.
>>>>>>>>
>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends
AbstractKafkaOutputOperator {
>>>>>>>>
>>>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>
>>>>>>>>     public transient final DefaultInputPort<Tenant>
in = new DefaultInputPort<Tenant>() {
>>>>>>>>
>>>>>>>>         Gson gson = new Gson();
>>>>>>>>
>>>>>>>>         @Override
>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>
>>>>>>>>             try {
>>>>>>>>                 Producer<String, String> producer =
getKafkaProducer();
>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>                 if (tenant != null) {
>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl()))
{
>>>>>>>>                         producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>                     } else {
>>>>>>>>                         producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(),
gson.toJson(tenant)));
>>>>>>>>
>>>>>>>>                     }
>>>>>>>>                     producer.flush();
>>>>>>>>                 }
>>>>>>>>             }
>>>>>>>>
>>>>>>>>
>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>
>>>>>>>> An error occurred trying to launch the application. Server
message:
>>>>>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer;
at
>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583)
at
>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thomas,
>>>>>>>>>
>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>
>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator,
do
>>>>>>>>> we need to specify <String, T> ? Since we are getting
an object of class
>>>>>>>>> type from previous operator.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <thw@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Are you referring to the upstream operator in the
DAG or the
>>>>>>>>>> state of the previous application after relaunch?
Since the data is stored
>>>>>>>>>> in MapR streams, an operator that is a producer can
also act as a consumer.
>>>>>>>>>> Please clarify your question.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>
>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to
write results
>>>>>>>>>>> into maprstream topic will it be able to read
messgaes from the previous
>>>>>>>>>>> operator ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise
<thw@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> For recovery you need to set the window data
manager like so:
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>>> on.java#L33
>>>>>>>>>>>>
>>>>>>>>>>>> That will also apply to stateful restart
of the entire
>>>>>>>>>>>> application (relaunch from previous instance's
checkpointed state).
>>>>>>>>>>>>
>>>>>>>>>>>> For cold restart, you would need to consider
the property you
>>>>>>>>>>>> mention and decide what is applicable to
your use case.
>>>>>>>>>>>>
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh
<
>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Ok now I get it. Thanks for the nice
explaination !!
>>>>>>>>>>>>>
>>>>>>>>>>>>> One more thing, so you mentioned about
checkpointing the
>>>>>>>>>>>>> offset ranges to replay in same order
from kafka.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is there any property we need to configure
to do that? like
>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas
Weise <
>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> What you want is the effect of exactly-once
output (that's
>>>>>>>>>>>>>> why we call it also end-to-end exactly-once).
There is no such thing as
>>>>>>>>>>>>>> exactly-once processing in a distributed
system. In this case it would be
>>>>>>>>>>>>>> rather "produce exactly-once. Upstream
operators, on failure, will recover
>>>>>>>>>>>>>> to checkpointed state and re-process
the stream from there. This is
>>>>>>>>>>>>>> at-least-once, the default behavior.
Because in the input operator you have
>>>>>>>>>>>>>> configured to replay in the same
order from Kafka (this is done by
>>>>>>>>>>>>>> checkpointing the offset ranges),
the computation in the DAG is idempotent
>>>>>>>>>>>>>> and the output operator can discard
the results that were already published
>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal
Singh <
>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think this is something called
a customized operator
>>>>>>>>>>>>>>> implementation that is taking
care of exactly once processing at output.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What if any previous operators
fail ? How we can make sure
>>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE
processing mode ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thursday, October 6, 2016,
Thomas Weise <
>>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In that case please have
a look at:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The operator will ensure
that messages are not duplicated,
>>>>>>>>>>>>>>>> under the stated assumptions.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37
PM, Jaspal Singh <
>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com>
wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In our case we are writing
the results back to
>>>>>>>>>>>>>>>>> maprstreams topic based
on some validations.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thursday, October
6, 2016, Thomas Weise <thw@apache.org>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> which operators in
your application are writing to
>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> When you look at
the example from the blog (
>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once),
there is
>>>>>>>>>>>>>>>>>> Kafka input, which
is configured to be idempotent. The results are written
>>>>>>>>>>>>>>>>>> to JDBC. That operator
by itself supports exactly-once through transactions
>>>>>>>>>>>>>>>>>> (in conjunction with
idempotent input), hence there is no need to configure
>>>>>>>>>>>>>>>>>> the processing mode
at all.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message