flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Richter <s.rich...@data-artisans.com>
Subject Re: Custom Sink Checkpointing errors
Date Fri, 20 Oct 2017 09:58:32 GMT
Hi,

the crash looks unrelated to Flink code from the dump’s trace. Since it happens somewhere
in managing a jar file, it might be related to this: https://bugs.openjdk.java.net/browse/JDK-8142508
<https://bugs.openjdk.java.net/browse/JDK-8142508> , point (2). Maybe your jar gets
overwritten while running, e.g. from your IDE?

The serialization exception looks like the custom sink is using the same serializer in different
threads concurrently. I don’t have the full custom code but this would be my guess. Ensure
to duplicate serializers whenever different threads could work on them, e.g. processing vs
checkpointing.

Best,
Stefan


 
> Am 20.10.2017 um 14:24 schrieb vipul singh <neoeahit@gmail.com>:
> 
> Hello all,
> 
> I am working on a custom sink implementation, but having weird issues with checkpointing.
> 
> I am using a custom ListState to checkpoint, and it looks like this:
> private var checkpointMessages: ListState[Bucket] =_
> 
> My snapshot function looks like:
> 
> @throws[IOException]
> def snapshotState(context: FunctionSnapshotContext): Unit = {
>   checkpointMessages.clear()
>       for((bucketName, bucket) <- bufferedMessages) {
> 
>         // cloning to avoid any conncurrent modification issues
>         var new_buffer = new ListBuffer[GenericRecord]()
> 
>         bucket.buffer.foreach(f=> new_buffer += f)
> 
>         val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)
> 
>         if(shouldUpload(bucketName)) uploadFile (bucketName)
>         else checkpointMessages.add(new_bucket)
>       }}
> where class bucket is:
> @SerialVersionUID(1L)
> class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var timestamp:
Long) extends Serializable{
>   def this(name: String) = {
>     this(name, ListBuffer[GenericRecord](), new Date().getTime)
>   }
> }
> 
> BufferredMessages signature is 
> private val bufferedMessages = collection.mutable.Map[String, Bucket]()
> 
> The basic idea behind this implementation is I maintain multiple buffers, and push messages(org.apache.avro.generic.GenericRecord)
during the @invoke section of the sink, upon reaching certain thresholds I archive these on
s3.
> 
> I try to run this both locally in intellij and on a cluster:
> 
> On Intellij the process runs for a bit( checkpoints 3-4 times) and then error out with
the exception below:
> 
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, tid=0x0000000000003903
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build 1.8.0_131-b11)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode bsd-amd64 compressed
oops)
> # Problematic frame:
> # V  [libjvm.dylib+0x46440c]
> #
> # Core dump written. Default location: /cores/core or core.25232
> #
> # An error report file with more information is saved as:
> # hs_err_pid25232.log
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp <http://bugreport.java.com/bugreport/crash.jsp>
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> Disconnected from the target VM, address: '127.0.0.1:60979 <http://127.0.0.1:60979/>',
transport: 'socket'
> 
> Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
> 
> I managed to collect a core dump: https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a
<https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a>. 
> 
> On a cluster I start to set concurrent serialization issues: https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47
<https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47>
> 
> My initial guess is this is happening due to the size of the ListState? but i checked
the number of records are around ~10k in the buffer. Due to the nature of the application,
we have to implement this in a custom sink.
> 
> Could someone please help me/ guide me to troubleshoot this further.
> 
> -- 
> Thanking in advance,
> Vipul


Mime
View raw message