flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Deepak Jha <dkjhan...@gmail.com>
Subject Issues while interacting with DynamoDB
Date Tue, 05 Jul 2016 00:46:17 GMT
Hi All,

We've flink (1.0.2) HA setup on AWS cloud and are using IAM roles to
interact with S3 (S3a as suggested in flink best practices) and DynamoDB.
While trying to interact with DynamoDB to perform key-value pair lookup
from one of the operator we are running into the following issue.

def putItem() = {
  val id = 1266399999L
  val item = new Item().withPrimaryKey("Id",
"sfsaf12344").withLong("uniqueIdentifier", id)
  table.putItem(item)

}

2016-07-04 17:15:18,379 PDT [INFO]  ip-10-6-10-182
[flink-akka.actor.default-dispatcher-29]
o.a.f.runtime.jobmanager.JobManager - Status of job
3ec7e145208453b5dbcf6224f373018f (Topology) changed to FAILING.
org.apache.flink.runtime.util.SerializedThrowable:
com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest;
at
com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.doPutItem(PutItemImpl.java:82)
at
com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.putItem(PutItemImpl.java:41)
at com.amazonaws.services.dynamodbv2.document.Table.putItem(Table.java:144)
at
com.mix.ingestion.url.dupedetection.DynamoDBIO$.putItem(DynamoDBHandler.scala:38)
at
com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKey(ABC.scala:143)
at
com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKeyAndUpdateDupeFlag(ABC.scala:135)
at
com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.updateDupeFlagAndTable(ABC.scala:96)
at
com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.detectDupe(ABC.scala:111)
at
com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$.detectDupe(ABC.scala:158)
at
com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70)
at
com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:485)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Serialized representation of
java.lang.NoSuchMethodError:
com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest;
... 18 common frames omitted

It works if I just run it in standalone fashion using "*java -cp
fatjar.jar:/opt/flink/lib/*  a.b.c.d.DynamoDBHandler"* on the same ec2
instance but I'm running into error when it tries to interact with DynamoDB
from inside an operator.
It fails even if I call the same *putItem* from inside the operator.

We've aws-java-sdk-1.7.4.jar , hadoop-aws-2.7.2.jar in flink/lib folder.
We're using fatjar to deploy the topology and it contains aws-java-sdk-s3
and aws-java-sdk-dynamodb both 1.11.3 version. I also experimented with
using aws-java-sdk in fatjar as well but it did not work. I looked into
aws-java-sdk-1.7.4.jar and see that com/amazonaws/services/dynamodbv2
exists.



Please let me know what am I doing wrong. Any help will be appreciated.

-- 
Thanks,
Deepak Jha

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message