flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vishal Sharma <vishal.sha...@grab.com>
Subject [External] Flink StreamingFileSink not ingesting to S3 when checkpointing is disabled
Date Tue, 28 May 2019 02:26:33 GMT
Hello everyone,

I want to use aws s3 as sink for a data stream in flink. I am using
StreamingFileSink class to create a sink.

I don't need checkpointing for my job, but when I disable checkpointing,
data is no longer written to S3.

case 1 : checkpointing enabled
When checkpointing is enabled, the data is successfully ingested to the
mentioned s3 path.

case 2 : checkpointing disabled
When checkpointing is disabled, the data is not written to s3.

I tried executing the job multiple times, but every time I got the same
result. I am facing this on local machine as well as on kubernetes cluster.


Following is a code I tried having bounded stream -

object FlinkTestJob {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // with checkpointing enabled
    env.enableCheckpointing(100)

    // Sinks
    val streamStrings: Seq[String] =
      Seq("test1", "test2", "test3", "test4", "test5", "test6",
"test7", "test8", "test9", "test10")

    val testStream = env.fromCollection(streamStrings)

    val rollingPolicy = new RollingPolicy[String, String] {

      override def shouldRollOnCheckpoint(partFileState:
PartFileInfo[String]): Boolean =
        partFileState.getSize > 1

      override def shouldRollOnEvent(
          partFileState: PartFileInfo[String],
          element: String): Boolean = true

      override def shouldRollOnProcessingTime(
          partFileState: PartFileInfo[String],
          currentTime: Long): Boolean = true
    }

    val sink: StreamingFileSink[String] = StreamingFileSink
      .forRowFormat(new Path("s3a://testbucket/sink"), new
SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(rollingPolicy)
      .build()

    testStream.addSink(sink)
    env.execute("test-job")
  }
}


When I write to s3 using "writeAsText("s3a://testbucket/sink")" instead of
StreamingFileSink, it works perfectly fine regardless of whether or not
checkpointing is enabled.

Flink version : 1.8.0
I want to understand the relation between checkpointing and
StreamingFileSink.

- Thanks

-- 
*_Grab is hiring. Learn more at _**https://grab.careers 
<https://grab.careers/>*


By communicating with Grab Inc and/or its 
subsidiaries, associate companies and jointly controlled entities (“Grab 
Group”), you are deemed to have consented to processing of your personal 
data as set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/ <https://grab.com/privacy/>


This email contains 
confidential information and is only for the intended recipient(s). If you 
are not the intended recipient(s), please do not disseminate, distribute or 
copy this email and notify Grab Group immediately if you have received this 
by mistake and delete this email from your system. Email transmission 
cannot be guaranteed to be secure or error-free as any information therein 
could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or 
contain viruses. Grab Group do not accept liability for any errors or 
omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and 
attachments therein shall remain vested in Grab Group, unless otherwise 
provided by law.


Mime
View raw message