flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: Stream in loop and not getting to sink (Parquet writer )
Date Thu, 29 Nov 2018 09:07:23 GMT
Hi again Avi,

In the first example that you posted (the one with the Kafka source), do
you call env.execute()?

Cheers,
Kostas

On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <k.kloudas@data-artisans.com>
wrote:

> Hi Avi,
>
> In the last snippet that you posted, you have not activated checkpoints.
>
> Checkpoints are needed for the StreamingFileSink to produce results,
> especially in the case of BulkWriters (like Parquet) where
> the part file is rolled upon reception of a checkpoint and the part is
> finalised (i.e. "committed") when the checkpoint gets completed
> successfully.
>
> Could you please enable checkpointing and make sure that the job runs long
> enough for at least some checkpoints to be completed?
>
> Thanks a lot,
> Kostas
>
> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.levi@bluevoyant.com> wrote:
>
>> Checkout this little App. you can see that the file is created but no
>> data is written. even for a single record
>>
>> import io.eels.component.parquet.ParquetWriterConfig
>> import org.apache.avro.Schema
>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.hadoop.fs.Path
>> import org.apache.parquet.avro.AvroParquetWriter
>> import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>> import scala.io.Source
>> import org.apache.flink.streaming.api.scala._
>>
>> object Tester extends App {
>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>   def now = System.currentTimeMillis()
>>   val path = new Path(s"test-$now.parquet")
>>   val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>   val config = ParquetWriterConfig()
>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>   genericReocrd.put("name", "test_b")
>>   genericReocrd.put("code", "NoError")
>>   genericReocrd.put("ts", 100L)
>>   val stream = env.fromElements(genericReocrd)
>>   val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
>>     .withSchema(schema)
>>     .withCompressionCodec(compressionCodecName)
>>     .withPageSize(config.pageSize)
>>     .withRowGroupSize(config.blockSize)
>>     .withDictionaryEncoding(config.enableDictionary)
>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>     .withValidation(config.validating)
>>     .build()
>>
>>   writer.write(genericReocrd)
>>   stream.addSink { r =>
>>     println(s"In Sink $r")
>>     writer.write(r)
>>   }
>>   env.execute()
>>   //  writer.close()
>> }
>>
>>
>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoeahit@gmail.com> wrote:
>>
>>> Can you try closing the writer?
>>>
>>> AvroParquetWriter has an internal buffer. Try doing a .close() in
>>> snapshot()( since you are checkpointing hence this method will be called)
>>>
>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.levi@bluevoyant.com>
>>> wrote:
>>>
>>>> Thanks Rafi,
>>>> I am actually not using assignTimestampsAndWatermarks , I will try to
>>>> add it as you suggested. however it seems that the messages I repeating in
>>>> the stream over and over even if I am pushing single message manually to
>>>> the queue, that message will repeat infinity
>>>>
>>>> Cheers
>>>> Avi
>>>>
>>>>
>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.aroch@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Avi,
>>>>>
>>>>> I can't see the part where you use  assignTimestampsAndWatermarks.
>>>>> If this part in not set properly, it's possible that watermarks are
>>>>> not sent and nothing will be written to your Sink.
>>>>>
>>>>> See here for more details:
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>>>
>>>>> Hope this helps,
>>>>> Rafi
>>>>>
>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.levi@bluevoyant.com wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to implement Parquet Writer as SinkFunction. The pipeline
>>>>>> consists of kafka as source and parquet file as a sink however it
seems
>>>>>> like the stream is repeating itself like endless loop and the parquet
file
>>>>>> is not written . can someone please help me with this?
>>>>>>
>>>>>> object ParquetSinkWriter{
>>>>>>   private val path = new Path("tmp/pfile")
>>>>>>   private val schemaString =
>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>>>   private val avroSchema: Schema = new
>>>>>> Schema.Parser().parse(schemaString)
>>>>>>   private val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>   private   val config = ParquetWriterConfig()
>>>>>>   val writer: ParquetWriter[GenericRecord] =
>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>     .withSchema(avroSchema)
>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>     .withPageSize(config.pageSize)
>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>     .withValidation(config.validating)
>>>>>>     .build()
>>>>>> }
>>>>>>
>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
>>>>>> SinkFunction[GenericRecord] {
>>>>>>   import ParquetSinkWriter._
>>>>>>   override def invoke(value: GenericRecord): Unit = {
>>>>>>     println(s"ADDING TO File : $value") // getting this output
>>>>>>     writer.write(value) //the output is not written to the file
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> //main app
>>>>>> object StreamingJob extends App  {
>>>>>>  implicit val env: StreamExecutionEnvironment =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>   env.enableCheckpointing(500)
>>>>>>
>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>>>
>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>>>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>>>>   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>>>>   env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>>>> Time.seconds(3), Time.seconds(3)))
>>>>>>   val backend: StateBackend = new
>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>>>>   env.setStateBackend(backend)
>>>>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>>>>   *val stream2: DataStream[DnsRequest] = env.addSource(//consume
>>>>>> from kafka)*
>>>>>> *stream2.map { r =>*
>>>>>> *    println(s"MAPPING $r") //this output keeps repeating in a loop*
>>>>>> *    val genericReocrd: GenericRecord = new
>>>>>> GenericData.Record(schema)*
>>>>>> *    genericReocrd.put("qname", r.qname)*
>>>>>> *    genericReocrd.put("rcode", r.rcode)*
>>>>>> *    genericReocrd.put("ts", r.ts)*
>>>>>> *    genericReocrd*
>>>>>> *  }.addSink(writer) *
>>>>>>
>>>>>> Thanks for your help
>>>>>> Avi
>>>>>>
>>>>>>
>>>
>>> --
>>> Thanks,
>>> Vipul
>>>
>>

Mime
View raw message