flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cliff Resnick <cre...@gmail.com>
Subject KeyedStream and chained forward operators
Date Tue, 21 Apr 2020 15:35:16 GMT
I'm running a massive file sifting by timestamp DataSteam job from s3.

The basic job is:
FileMonitor -> ContinuousFileReader -> MultipleFileOutputSink

The MultipleFileOutputSink sifts data based on timestamp to date-hour
directories

It's a lot of data, so I'm using high parallelism, but I want to maintain
reasonable output file size, so if I key post-ContinuousFileReader by
5-minute timestamp keys I get the desired result of large files at the cost
of a network shuffle.

But since I also have timestamps on the input files I figured I could push
back the keyed stream to FileMonitor -> ContinuousFileReader and save the
network shuffle. I tested this and confirmed that it sort of worked and
ContinuousFileReaders are receiving properly partitioned input, but output
post reader is now rebalanced and sinks produce lots of tiny files.

The code is below. Am I missing something?

val source = env
  .addSource(fileMonitor)
  .name(s"Bucketed Log Source File Watcher: $path")
  .keyBy(new KeySelector[TimestampedFileInputSplit, Long]() {
    override def getKey(split: TimestampedFileInputSplit): Long = {
      val name = split.getPath.getName
      val r    = """(\d+)\.log""".r
      r.findFirstMatchIn(name) match {
        case Some(m) ⇒ {
          val t = m.group(1).toLong
          t - (t % 300)
        }
        case _ ⇒ -1
      }
    }
  })
  .transform[String]("Bucketed Log Source File Reader", fileReader)
  .forward
  .assignTimestampsAndWatermarks(WatermarkExtractor[String])
  .forward
  .addSink(SourceTrackingSink(Sift.outputBucket, BidDateFunc))

Mime
View raw message