apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jim <...@facility.supplies>
Subject FW: Problems with Kinesis Stream
Date Tue, 10 May 2016 17:41:54 GMT

Hi Pradeep,

Here is an extract from my pom.xml with the relevant entries:

    <!-- change this if you desire to use a different version of DataTorrent -->

Everything was in sync with your demo application.

I did see in your demo application that you created your own StringKinesisInputOperator where
you override the getTuple function.

In our app we create a KinesisEDITransactionSetInputOperator, but we do not override the getTuple()
function.  Is there any reason that we should do that?  Here is our class (note we have some
database access where we save the current shard position to a database table, so if the application
restarts, we can use setInitialOffset(“earliest”) and when loading them have a custom
shardManager that reads this table and sets the initial position to this location – so that
we don’t duplicate completed transactions AND still pick up transactions that were added
to the queue in case the system was down.  We need this for restartability, in case we update
the application and it starts under a new application id, so w don’t have to worry about
missing records or duplicating (I think).

Here is our class – does anything stand out as missing?:

package supplies.facility.edi.helpers;

import com.amazonaws.services.kinesis.model.Record;
import com.datatorrent.api.Context;
import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator;
import supplies.facility.edi.models.EdiTransactionSet;
import supplies.facility.edi.models.OptionalOperatorProperties;

import java.sql.PreparedStatement;
import java.util.HashMap;
import java.util.Map;

 * Created by Bob on 2/21/2016.
public class KinesisEdiTransactionSetInputOperator extends AbstractKinesisInputOperator<EdiTransactionSet>

    Map<String, String> previousPosition = new HashMap<String, String>();

    public KinesisEdiTransactionSetInputOperator() {

    private OptionalOperatorProperties optionalProperties = new OptionalOperatorProperties();

    public OptionalOperatorProperties getOptionalProperties() {
        return optionalProperties;

    public void setOptionalProperties(OptionalOperatorProperties optionalProperties) {
        this.optionalProperties = optionalProperties;

    public EdiTransactionSet getTuple(Record record) {
        try {
            EdiTransactionSet obj = EdiTransactionSetFactory.createEdiTransactionSet(record.getData());
            return obj;
        } catch (Exception var4) {
            throw new RuntimeException(var4);

    public void endWindow() {

        // Update the database for the curent shard position so that we can recover if we
need to restart the application
        // with no risk of duplicate transactions
        // Update the order to replace the line_items data with the newly updated one
        try {
            if (!previousPosition.equals(shardPosition)) {
                // Loop through all values in the shard map
                for (Map.Entry<String, String> entry : shardPosition.entrySet()) {
                    PreparedStatement statement = getOptionalProperties().getEdiJdbc().getConnection().prepareStatement("insert
into kinesis_shard_manager values(?, ?) on conflict on constraint pk_kinesis_shard_manager
do update set sequence = excluded.sequence");
                    statement.setString(1, entry.getKey());
                    statement.setString(2, entry.getValue());

                // Set the previous position
                previousPosition = shardPosition;
        catch (Exception e) {
            throw new RuntimeException("Exception while handling the input", e);

    public void teardown() {

        // Disconnect from the edi jdbcStore

    public void setup(Context.OperatorContext context) {


        try {
            // Connect to the Edi jdbcstore for this operator in the
        } catch (Exception e) {
            throw new RuntimeException("Exception setting up the abstract edi processor: ",

From: Pradeep Dalvi [mailto:pradeep.dalvi@datatorrent.com]
Sent: Tuesday, May 10, 2016 6:18 AM
To: Jim <jim@facility.supplies<mailto:jim@facility.supplies>>
Subject: Re: Problems with Kinesis Stream

Hi Jim,

I don't have much of background about Kinesis Operator. So I would coordinate with correct
set of engineers and help you resolve issues on priority.
I was with Chaitanya to try out different options. We also tried restarting the application.
We would like to know little more details about scenarios, setup & RTS version that is
being used. AFAIR from last conversation, we had suggested using DT RTS 3.3.0-RC5
Shall we schedule a call for this? Please feel free to suggest convenient timings. I shall
check with Chaitanya.

Pradeep A. Dalvi

On Tue, May 10, 2016 at 3:51 PM, Jim <jim@facility.supplies<mailto:jim@facility.supplies>>
Hi Chaitanya,

And you are adding new records, and see them flow in.

And if you restart the application, and have it set to “earliest” it always retrieves
all the records, then continues to get new ones as records are added?

We had two different developers on two different machines get the same kind of errors.

I will compare our applications to yours to see what could be different.


From: Chaitanya Chebolu [mailto:chaitanya@datatorrent.com<mailto:chaitanya@datatorrent.com>]
Sent: Tuesday, May 10, 2016 5:17 AM
To: users@apex.incubator.apache.org<mailto:users@apex.incubator.apache.org>
Subject: Re: Problems with Kinesis Stream

Hi JIm,

    I created sample application with the same dependencies which you have specified.
    I ran with earliest and latest offsets, it's working fine. I haven't observed any records
loss in both the scenarios.
    We tried two different setups: local as well as on AWS.
    Please find the pom and application in the below location:


On Tue, May 10, 2016 at 1:18 PM, Jim <jim@facility.supplies<mailto:jim@facility.supplies>>

I am using apex in an application, where one of the inputs is an AWS Kinesis stream.  I am
using this over AWS SQS because I need to guarantee that the items are processing in the order
in which they are received by the system.

I have an operator that uses the base AbstractKinesisInputOperator.

We are currently using:

                Apex version 3.3.0-incubating
                Malhar version 3.3.1-incubating
                aws-java-sdk-kinesis version 1.10.76

I am noticing strange behavior of this operator and cannot pin down where the issue is coming
from, if it is from the AWS sdk, or from the datatorrent apex module.

Here is what I am seeing:

1.)    When I setInitialOffset(“latest”); when I start the application, I do not see the
next transactions always processed by the application.  From what I can see it is very hit
or miss, which transactions are actually read, and send through for processing.  I have no
idea why.

2.)    When I setInitialOffset(“earliest”); it does seem to pick up all records, and read
new records – for a while.  Then at some point; it stops processing new records and even
if I restart the application so it should start again from the beginning, nothing is read
in.  If I then delete and recreate the Kinesis stream it does work again for a while, till
it stops again.

Is anyone successfully using a Kinesis stream to process records, and are confident that no
issues like this are occurring so transactions are not being missed?

Can someone at apex/datatorrent look into this, and help figure out what is happening, and
how we can fix this?

Note that I am just about ready to go live, and would like to get this resolved this week!



Pradeep A. Dalvi
Software Engineer
DataTorrent (India)
View raw message