Hi Avi,

In the last snippet that you posted, you have not activated checkpoints.

Checkpoints are needed for the StreamingFileSink to produce results, especially in the case of BulkWriters (like Parquet) where
the part file is rolled upon reception of a checkpoint and the part is finalised (i.e. "committed") when the checkpoint gets completed successfully.

Could you please enable checkpointing and make sure that the job runs long enough for at least some checkpoints to be completed?

Thanks a lot,
Kostas

On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <avi.levi@bluevoyant.com> wrote:
Checkout this little App. you can see that the file is created but no data is written. even for a single record 
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.{ GenericData, GenericRecord }
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.io.Source
import org.apache.flink.streaming.api.scala._

object Tester extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
def now = System.currentTimeMillis()
val path = new Path(s"test-$now.parquet")
val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
val schema: Schema = new Schema.Parser().parse(schemaString)
val compressionCodecName = CompressionCodecName.SNAPPY
val config = ParquetWriterConfig()
val genericReocrd: GenericRecord = new GenericData.Record(schema)
genericReocrd.put("name", "test_b")
genericReocrd.put("code", "NoError")
genericReocrd.put("ts", 100L)
val stream = env.fromElements(genericReocrd)
val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
.withSchema(schema)
.withCompressionCodec(compressionCodecName)
.withPageSize(config.pageSize)
.withRowGroupSize(config.blockSize)
.withDictionaryEncoding(config.enableDictionary)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(config.validating)
.build()

writer.write(genericReocrd)
stream.addSink { r =>
println(s"In Sink $r")
writer.write(r)
}
env.execute()
// writer.close()
}

On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoeahit@gmail.com> wrote:
Can you try closing the writer?

AvroParquetWriter has an internal buffer. Try doing a .close() in snapshot()( since you are checkpointing hence this method will be called)

On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <avi.levi@bluevoyant.com> wrote:
Thanks Rafi, 
I am actually not using assignTimestampsAndWatermarks , I will try to add it as you suggested. however it seems that the messages I repeating in the stream over and over even if I am pushing single message manually to the queue, that message will repeat infinity 

Cheers
Avi


On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <rafi.aroch@gmail.com> wrote:
Hi Avi,

I can't see the part where you use  assignTimestampsAndWatermarks.
If this part in not set properly, it's possible that watermarks are not sent and nothing will be written to your Sink. 


Hope this helps, 
Rafi 

On Wed, Nov 28, 2018, 21:22 Avi Levi <avi.levi@bluevoyant.com wrote:
Hi, 

I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output 
    writer.write(value) //the output is not written to the file 
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)
  env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)
stream2.map { r =>
    println(s"MAPPING $r") //this output keeps repeating in a loop
    val genericReocrd: GenericRecord = new GenericData.Record(schema)
    genericReocrd.put("qname", r.qname)
    genericReocrd.put("rcode", r.rcode)
    genericReocrd.put("ts", r.ts)
    genericReocrd
  }.addSink(writer) 

Thanks for your help
Avi



--
Thanks,
Vipul