flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Customer inputformat
Date Sun, 30 Jul 2017 07:53:21 GMT
Hi,

Flink calls the reachedEnd() method before it calls nextRecord() and closes
the IF when reachedEnd() returns true.
So, it should not return true until nextRecord() was called and the first
and last record was emitted.

You might also want to built your PDFFileInputFormat on FileInputFormat and
set unsplittable to true.
FileInputFormat comes with lots of built-in functionality such as
InputSplit generation.

Cheers, Fabian

2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mohitanchlia@gmail.com>:

> Hi,
>
> I created a custom input format. Idea behind this is to read all binary
> files from a directory and use each file as it's own split. Each split is
> read as one whole record. When I run it in flink I don't get any error but
> I am not seeing any output from .print. Am I missing something?
>
> ----
>
> *public* *class* *PDFFileInputFormat* *extends*
> RichInputFormat<StringValue, InputSplit> {
>
> *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger*(
> PDFFileInputFormat.*class*.getName());
>
> PDFFileInputSplit current = *null*;
>
> *public* *static* *void* main(String... args) *throws* Exception {
>
> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\test");
>
> InputSplit[] splits = pdfReader.createInputSplits(1);
>
> pdfReader.open(splits[0]);
>
> pdfReader.nextRecord(*null*);
>
> *final* ExecutionEnvironment env = ExecutionEnvironment.
> *getExecutionEnvironment*();
>
> env.fromElements(1, 2, 3)
>
> // returns the squared i
>
> .print();
>
> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test");
>
> InputFormatSourceFunction<StringValue> *reader* = *new*
> InputFormatSourceFunction<>(format,
>
> TypeInformation.*of*(StringValue.*class*));
>
> env.createInput(format,TypeInformation.*of*(StringValue.*class*)).print();
>
> }
>
> String path = *null*;
>
> *public* PDFFileInputFormat(String path) {
>
> *this*.path = path;
>
> }
>
> *public* *void* configure(Configuration parameters) {
>
> // *TODO* Auto-generated method stub
>
> }
>
> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics)
> *throws* IOException {
>
> // *TODO* Auto-generated method stub
>
> *return* cachedStatistics;
>
> }
>
> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws*
> IOException {
>
> *final* List<PDFFileInputSplit> splits = *new*
> ArrayList<PDFFileInputSplit>();
>
> Files.*list*(Paths.*get*(path)).forEach(f -> {
>
> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f);
>
> splits.add(split);
>
> });
>
> PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits.size(
> )];
>
> *return* splits.toArray(inputSplitArray);
>
> }
>
> *public* InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits)
> {
>
> *logger*.info("Assigner");
>
> // *TODO* Auto-generated method stub
>
> *return* *new* DefaultInputSplitAssigner(inputSplits);
>
> }
>
> *public* *void* open(InputSplit split) *throws* IOException {
>
> *this*.current = (PDFFileInputSplit) split;
>
> }
>
> *public* *boolean* reachedEnd() *throws* IOException {
>
> // *TODO* Auto-generated method stub
>
> *return* *true*;
>
> }
>
> *public* StringValue nextRecord(StringValue reuse) *throws* IOException {
>
> String content = *new* String(Files.*readAllBytes*(*this*.current
> .getFile()));
>
> *logger*.info("Content " + content);
>
> *return* *new* StringValue(content);
>
> }
>
> *public* *void* close() *throws* IOException {
>
> // *TODO* Auto-generated method stub
>
> }
>
> }
>
> ---
>
>
> Thanks,
>
> Mohit
>
>
>

Mime
View raw message