flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mohit Anchlia <mohitanch...@gmail.com>
Subject Re: Customer inputformat
Date Mon, 31 Jul 2017 19:42:34 GMT
Thanks! When I give path to a directory flink is only reading 2 files. It
seems to be picking these 2 files randomly.

On Mon, Jul 31, 2017 at 12:05 AM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Mohit,
>
> as Ted said, there are plenty of InputFormats which are based on
> FileInputFormat.
> FileInputFormat also supports reading all files in a directory. Simply
> specify the path of the directory.
>
> Check StreamExecutionEnvironment.createFileInput() which takes a several
> parameters such as a FileInputFormat and a time interval in which the
> directory is periodically checked.
>
> Best, Fabian
>
> 2017-07-30 21:31 GMT+02:00 Ted Yu <yuzhihong@gmail.com>:
>
>> For #1, you can find quite a few classes which extend FileInputFormat.
>> e.g.
>>
>> flink-connectors/flink-avro/src/main/java/org/apache/flink/
>> api/java/io/AvroInputFormat.java:public class AvroInputFormat<E> extends
>> FileInputFormat<E> implements ResultTypeQuer
>> flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public
>> abstract class BinaryInputFormat<T> extends FileInputFormat<T>
>> flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public
>> abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
>> implements Checkpoi
>>
>> flink-streaming-java/src/test/java/org/apache/flink/streamin
>> g/runtime/operators/ContinuousFileProcessingRescalingTest.java:
>>     extends FileInputFormat<String>
>>
>> FYI
>>
>> On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <mohitanchlia@gmail.com>
>> wrote:
>>
>>> Thanks. Few more questions:
>>>
>>> - Is there an example for FileInputFormat?
>>> - how to make it read all the files in a directory?
>>> - how to make an inputformat a streaming input instead of batch? Eg:
>>> read as new files come to a dir.
>>>
>>> Thanks again.
>>>
>>> On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <fhueske@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink calls the reachedEnd() method before it calls nextRecord() and
>>>> closes the IF when reachedEnd() returns true.
>>>> So, it should not return true until nextRecord() was called and the
>>>> first and last record was emitted.
>>>>
>>>> You might also want to built your PDFFileInputFormat on FileInputFormat
>>>> and set unsplittable to true.
>>>> FileInputFormat comes with lots of built-in functionality such as
>>>> InputSplit generation.
>>>>
>>>> Cheers, Fabian
>>>>
>>>> 2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mohitanchlia@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I created a custom input format. Idea behind this is to read all
>>>>> binary files from a directory and use each file as it's own split. Each
>>>>> split is read as one whole record. When I run it in flink I don't get
any
>>>>> error but I am not seeing any output from .print. Am I missing something?
>>>>>
>>>>> ----
>>>>>
>>>>> *public* *class* *PDFFileInputFormat* *extends*
>>>>> RichInputFormat<StringValue, InputSplit> {
>>>>>
>>>>> *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger*
>>>>> (PDFFileInputFormat.*class*.getName());
>>>>>
>>>>> PDFFileInputSplit current = *null*;
>>>>>
>>>>> *public* *static* *void* main(String... args) *throws* Exception {
>>>>>
>>>>> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\
>>>>> test");
>>>>>
>>>>> InputSplit[] splits = pdfReader.createInputSplits(1);
>>>>>
>>>>> pdfReader.open(splits[0]);
>>>>>
>>>>> pdfReader.nextRecord(*null*);
>>>>>
>>>>> *final* ExecutionEnvironment env = ExecutionEnvironment.
>>>>> *getExecutionEnvironment*();
>>>>>
>>>>> env.fromElements(1, 2, 3)
>>>>>
>>>>> // returns the squared i
>>>>>
>>>>> .print();
>>>>>
>>>>> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test"
>>>>> );
>>>>>
>>>>> InputFormatSourceFunction<StringValue> *reader* = *new*
>>>>> InputFormatSourceFunction<>(format,
>>>>>
>>>>> TypeInformation.*of*(StringValue.*class*));
>>>>>
>>>>> env.createInput(format,TypeInformation.*of*(StringValue.*class*)
>>>>> ).print();
>>>>>
>>>>> }
>>>>>
>>>>> String path = *null*;
>>>>>
>>>>> *public* PDFFileInputFormat(String path) {
>>>>>
>>>>> *this*.path = path;
>>>>>
>>>>> }
>>>>>
>>>>> *public* *void* configure(Configuration parameters) {
>>>>>
>>>>> // *TODO* Auto-generated method stub
>>>>>
>>>>> }
>>>>>
>>>>> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics)
>>>>> *throws* IOException {
>>>>>
>>>>> // *TODO* Auto-generated method stub
>>>>>
>>>>> *return* cachedStatistics;
>>>>>
>>>>> }
>>>>>
>>>>> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws*
>>>>> IOException {
>>>>>
>>>>> *final* List<PDFFileInputSplit> splits = *new*
>>>>> ArrayList<PDFFileInputSplit>();
>>>>>
>>>>> Files.*list*(Paths.*get*(path)).forEach(f -> {
>>>>>
>>>>> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f);
>>>>>
>>>>> splits.add(split);
>>>>>
>>>>> });
>>>>>
>>>>> PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits
>>>>> .size()];
>>>>>
>>>>> *return* splits.toArray(inputSplitArray);
>>>>>
>>>>> }
>>>>>
>>>>> *public* InputSplitAssigner getInputSplitAssigner(InputSplit[]
>>>>> inputSplits) {
>>>>>
>>>>> *logger*.info("Assigner");
>>>>>
>>>>> // *TODO* Auto-generated method stub
>>>>>
>>>>> *return* *new* DefaultInputSplitAssigner(inputSplits);
>>>>>
>>>>> }
>>>>>
>>>>> *public* *void* open(InputSplit split) *throws* IOException {
>>>>>
>>>>> *this*.current = (PDFFileInputSplit) split;
>>>>>
>>>>> }
>>>>>
>>>>> *public* *boolean* reachedEnd() *throws* IOException {
>>>>>
>>>>> // *TODO* Auto-generated method stub
>>>>>
>>>>> *return* *true*;
>>>>>
>>>>> }
>>>>>
>>>>> *public* StringValue nextRecord(StringValue reuse) *throws*
>>>>> IOException {
>>>>>
>>>>> String content = *new* String(Files.*readAllBytes*(*this*.current
>>>>> .getFile()));
>>>>>
>>>>> *logger*.info("Content " + content);
>>>>>
>>>>> *return* *new* StringValue(content);
>>>>>
>>>>> }
>>>>>
>>>>> *public* *void* close() *throws* IOException {
>>>>>
>>>>> // *TODO* Auto-generated method stub
>>>>>
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>> ---
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Mohit
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message