flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Avi Levi <avi.l...@bluevoyant.com>
Subject Re: Stream in loop and not getting to sink (Parquet writer )
Date Thu, 29 Nov 2018 06:03:30 GMT
Checkout this little App. you can see that the file is created but no data
is written. even for a single record

import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  def now = System.currentTimeMillis()
  val path = new Path(s"test-$now.parquet")
  val schemaString =
Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
  val schema: Schema = new Schema.Parser().parse(schemaString)
  val compressionCodecName = CompressionCodecName.SNAPPY
  val config = ParquetWriterConfig()
  val genericReocrd: GenericRecord = new GenericData.Record(schema)
  genericReocrd.put("name", "test_b")
  genericReocrd.put("code", "NoError")
  genericReocrd.put("ts", 100L)
  val stream = env.fromElements(genericReocrd)
  val writer: ParquetWriter[GenericRecord] =
AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(schema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()

  writer.write(genericReocrd)
  stream.addSink { r =>
    println(s"In Sink $r")
    writer.write(r)
  }
  env.execute()
  //  writer.close()
}


On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoeahit@gmail.com> wrote:

> Can you try closing the writer?
>
> AvroParquetWriter has an internal buffer. Try doing a .close() in
> snapshot()( since you are checkpointing hence this method will be called)
>
> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.levi@bluevoyant.com> wrote:
>
>> Thanks Rafi,
>> I am actually not using assignTimestampsAndWatermarks , I will try to
>> add it as you suggested. however it seems that the messages I repeating in
>> the stream over and over even if I am pushing single message manually to
>> the queue, that message will repeat infinity
>>
>> Cheers
>> Avi
>>
>>
>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.aroch@gmail.com> wrote:
>>
>>> Hi Avi,
>>>
>>> I can't see the part where you use  assignTimestampsAndWatermarks.
>>> If this part in not set properly, it's possible that watermarks are not
>>> sent and nothing will be written to your Sink.
>>>
>>> See here for more details:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>
>>> Hope this helps,
>>> Rafi
>>>
>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.levi@bluevoyant.com wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to implement Parquet Writer as SinkFunction. The pipeline
>>>> consists of kafka as source and parquet file as a sink however it seems
>>>> like the stream is repeating itself like endless loop and the parquet file
>>>> is not written . can someone please help me with this?
>>>>
>>>> object ParquetSinkWriter{
>>>>   private val path = new Path("tmp/pfile")
>>>>   private val schemaString =
>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>   private val avroSchema: Schema = new
>>>> Schema.Parser().parse(schemaString)
>>>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>>>   private   val config = ParquetWriterConfig()
>>>>   val writer: ParquetWriter[GenericRecord] =
>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>     .withSchema(avroSchema)
>>>>     .withCompressionCodec(compressionCodecName)
>>>>     .withPageSize(config.pageSize)
>>>>     .withRowGroupSize(config.blockSize)
>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>     .withValidation(config.validating)
>>>>     .build()
>>>> }
>>>>
>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>>> SinkFunction[GenericRecord] {
>>>>   import ParquetSinkWriter._
>>>>   override def invoke(value: GenericRecord): Unit = {
>>>>     println(s"ADDING TO File : $value") // getting this output
>>>>     writer.write(value) //the output is not written to the file
>>>>   }
>>>> }
>>>>
>>>> //main app
>>>> object StreamingJob extends App  {
>>>>  implicit val env: StreamExecutionEnvironment =
>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>   env.enableCheckpointing(500)
>>>>
>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>
>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>>   env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>> Time.seconds(3), Time.seconds(3)))
>>>>   val backend: StateBackend = new
>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>>   env.setStateBackend(backend)
>>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>>   *val stream2: DataStream[DnsRequest] = env.addSource(//consume from
>>>> kafka)*
>>>> *stream2.map { r =>*
>>>> *    println(s"MAPPING $r") //this output keeps repeating in a loop*
>>>> *    val genericReocrd: GenericRecord = new GenericData.Record(schema)*
>>>> *    genericReocrd.put("qname", r.qname)*
>>>> *    genericReocrd.put("rcode", r.rcode)*
>>>> *    genericReocrd.put("ts", r.ts)*
>>>> *    genericReocrd*
>>>> *  }.addSink(writer) *
>>>>
>>>> Thanks for your help
>>>> Avi
>>>>
>>>>
>
> --
> Thanks,
> Vipul
>

Mime
View raw message