Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 81B4C200CD5 for ; Sun, 30 Jul 2017 09:54:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7FEA21645BE; Sun, 30 Jul 2017 07:54:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0498B1645BC for ; Sun, 30 Jul 2017 09:54:02 +0200 (CEST) Received: (qmail 44122 invoked by uid 500); 30 Jul 2017 07:54:01 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 44112 invoked by uid 99); 30 Jul 2017 07:54:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 30 Jul 2017 07:54:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id D98BAC0042 for ; Sun, 30 Jul 2017 07:54:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id QWoNTj3F2SYc for ; Sun, 30 Jul 2017 07:53:58 +0000 (UTC) Received: from mail-ua0-f177.google.com (mail-ua0-f177.google.com [209.85.217.177]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 974E95FC9D for ; Sun, 30 Jul 2017 07:53:58 +0000 (UTC) Received: by mail-ua0-f177.google.com with SMTP id k43so144010968uaf.3 for ; Sun, 30 Jul 2017 00:53:58 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=r78uf5VWNIh7l5ruN2Hx84PsQ8UPktd1JlnYdGLoJ4Y=; b=pTp3/UDreKkf8lGw3EBK0Jng646/tpiUgX5K8uqj66IpLbSUReRpclhaRaH/OEkS9a U62bdSt7JGTjfO7AFvCqyT6qmuJxyr/jrP6SSTr7zwOF+RS0cKeJUs232HibBktbDMLH F+ta3WXInUcKym5+X5qNXtIxn2Ii6Y/bi2JdmMJ4JTbqw75+9E8UwhmuDijwpkmpQQwc 0F77iHCxYNP7FpJUd/QTAVLepn/Z2OwW/LVMyBOX1J8NmaAxr9VOp2HH3yzLY27PSqPp NyaijYvrPb8jEftQiFDBCjmh1205ncmXKLuvpoXuROtEg26JM2YYBqo/S1rBbGvr/h6a sRag== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=r78uf5VWNIh7l5ruN2Hx84PsQ8UPktd1JlnYdGLoJ4Y=; b=d3JIgKCSB/qmW7zo6Py1nDKVmoL4oDcQ6Cnw7QqkoRPcClDo3qnZ83/p4jrinp1TlF hGwgDG0I/7TarrohrOIphb3z0vkmUvSuVC3BDGDW4me9N1CmZZ2dnDqYNJjViL1f9vKW sLIGoZJiscFNTyAKzOMCyVpmWWLuyJE+eodpBDcRetOUSwtekcLn2vPG72YB7NqkeNF7 9TEGqaOD1GEimGf7qoQ7dsCqjzrGWPNjDrGDR/EMJKgSdns5gOjebM68+E6fthD2/wBZ ntRIyhH25koWrJfoNVt50t7T7pc3aggzKvo/VLCbSdybRYTsMGh9lbX9aGnmL2igD8QG fR4w== X-Gm-Message-State: AIVw112S+U2Xqt7ImirQyrpdPnEq62GU+0aN6h1qFIdKxNgJjllE4FBN IyB6fROnMRLbfQSpnJI4rHMe9pYQqw== X-Received: by 10.176.71.213 with SMTP id w21mr8725835uac.14.1501401231808; Sun, 30 Jul 2017 00:53:51 -0700 (PDT) MIME-Version: 1.0 Received: by 10.31.161.142 with HTTP; Sun, 30 Jul 2017 00:53:21 -0700 (PDT) In-Reply-To: References: From: Fabian Hueske Date: Sun, 30 Jul 2017 09:53:21 +0200 Message-ID: Subject: Re: Customer inputformat To: Mohit Anchlia Cc: user Content-Type: multipart/alternative; boundary="f4030437a0a0165bc205558437a5" archived-at: Sun, 30 Jul 2017 07:54:04 -0000 --f4030437a0a0165bc205558437a5 Content-Type: text/plain; charset="UTF-8" Hi, Flink calls the reachedEnd() method before it calls nextRecord() and closes the IF when reachedEnd() returns true. So, it should not return true until nextRecord() was called and the first and last record was emitted. You might also want to built your PDFFileInputFormat on FileInputFormat and set unsplittable to true. FileInputFormat comes with lots of built-in functionality such as InputSplit generation. Cheers, Fabian 2017-07-30 3:41 GMT+02:00 Mohit Anchlia : > Hi, > > I created a custom input format. Idea behind this is to read all binary > files from a directory and use each file as it's own split. Each split is > read as one whole record. When I run it in flink I don't get any error but > I am not seeing any output from .print. Am I missing something? > > ---- > > *public* *class* *PDFFileInputFormat* *extends* > RichInputFormat { > > *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger*( > PDFFileInputFormat.*class*.getName()); > > PDFFileInputSplit current = *null*; > > *public* *static* *void* main(String... args) *throws* Exception { > > PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\test"); > > InputSplit[] splits = pdfReader.createInputSplits(1); > > pdfReader.open(splits[0]); > > pdfReader.nextRecord(*null*); > > *final* ExecutionEnvironment env = ExecutionEnvironment. > *getExecutionEnvironment*(); > > env.fromElements(1, 2, 3) > > // returns the squared i > > .print(); > > PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test"); > > InputFormatSourceFunction *reader* = *new* > InputFormatSourceFunction<>(format, > > TypeInformation.*of*(StringValue.*class*)); > > env.createInput(format,TypeInformation.*of*(StringValue.*class*)).print(); > > } > > String path = *null*; > > *public* PDFFileInputFormat(String path) { > > *this*.path = path; > > } > > *public* *void* configure(Configuration parameters) { > > // *TODO* Auto-generated method stub > > } > > *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics) > *throws* IOException { > > // *TODO* Auto-generated method stub > > *return* cachedStatistics; > > } > > *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws* > IOException { > > *final* List splits = *new* > ArrayList(); > > Files.*list*(Paths.*get*(path)).forEach(f -> { > > PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f); > > splits.add(split); > > }); > > PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits.size( > )]; > > *return* splits.toArray(inputSplitArray); > > } > > *public* InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) > { > > *logger*.info("Assigner"); > > // *TODO* Auto-generated method stub > > *return* *new* DefaultInputSplitAssigner(inputSplits); > > } > > *public* *void* open(InputSplit split) *throws* IOException { > > *this*.current = (PDFFileInputSplit) split; > > } > > *public* *boolean* reachedEnd() *throws* IOException { > > // *TODO* Auto-generated method stub > > *return* *true*; > > } > > *public* StringValue nextRecord(StringValue reuse) *throws* IOException { > > String content = *new* String(Files.*readAllBytes*(*this*.current > .getFile())); > > *logger*.info("Content " + content); > > *return* *new* StringValue(content); > > } > > *public* *void* close() *throws* IOException { > > // *TODO* Auto-generated method stub > > } > > } > > --- > > > Thanks, > > Mohit > > > --f4030437a0a0165bc205558437a5 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi,

Flink calls the = reachedEnd() method before it calls nextRecord() and closes the IF when rea= chedEnd() returns true.
So, it should not return true until nextRe= cord() was called and the first and last record was emitted.

Y= ou might also want to built your PDFFileInputFormat on FileInputFormat and = set unsplittable to true.
FileInputFormat comes with lots of built= -in functionality such as InputSplit generation.

Cheers, Fabia= n

2017-0= 7-30 3:41 GMT+02:00 Mohit Anchlia <mohitanchlia@gmail.com>:
Hi,
=
I created a custom input format. Idea behind this is to read= all binary files from a directory and use each file as it's own split.= Each split is read as one whole record. When I run it in flink I don't= get any error but I am not seeing any output from .print. Am I missing som= ething?

----

public class<= /b> PDFFileInputFormat extends RichInputFormat<StringValue, InputSplit> {=

private static = final Logger logger =3D LoggerFactory.getLogger(PDFFileI= nputFormat.class.getName());

PDFFileInputSplit current<= /font> =3D null;

public static <= b>voi= d main(String... args) throws Exception {<= /font>

PDFFileInputFormat pdfReader<= /font> =3D new PDFFileInputFormat("c:\\proj\\test");

InputSplit[] splits =3D pdfReader.creat= eInputSplits(1);

pdfReader.open(splits[0]);

<= font size=3D"2">

pdfReader.nextRecord(null);

final= ExecutionEnvironment env =3D ExecutionEnvironment.getExecutionEnvironment();=

env.fromElements(1, 2, 3)

// returns the squared i

.print();

PDFFileInputFormat <= font size=3D"2" color=3D"#6a3e3e">format= =3D new PDFFileInputFormat("c:\\proj\\test");

InputFormatSourceFunction&l= t;StringValue> reader =3D <= /font>new InputFormatSourceFunction<>= ;(format,

TypeInformation.of(StringValue.class));

env.createInput(format,TypeInf= ormation.of(StringValue.class)).print();

}

String path<= font size=3D"2"> =3D null;

public PDFFileInputFormat(String path) {

this.path =3D path;

}

public void configure(= Configuration parameters) {

// TODO<= /font> Auto-generated method stub

}

public BaseStatistics getStatistics(BaseStatistics cachedStat= istics) throws IOException {

// TODO<= /font> Auto-generated method stub

= re= turn cachedStatistics;

}

public InputSplit[] createInputSplits(int minNumSplits) thro= ws IOException {

final List<PDFFileInputSplit> splits =3D new ArrayList<PDF= FileInputSplit>();

Files.list(Paths.get<= /i>(path)).forEach(f= -> {

PDFFileInputSplit split =3D new = PDFFileInputSplit(splits.size(), f<= /font>);

splits.add(split);

});

PDFFileInputSplit[] inputSpli= tArray =3D new PDFFileInputSplit[splits.size(= )];

return splits.toArray(inputSpli= tArray);

}

public InputSplitAssigner getInputSplitAssigner(InputSplit= [] inputSplits) {

logger.info("Assigner");

// TODO<= /font> Auto-generated method stub

= re= turn new DefaultInputSplitAssigner(inputSplits);

}

public void open(Input= Split split) throws= IOException {

this.current =3D (PDFFileInputSpl= it) split;

}

public boolean reached= End() throws IOException {=

// TODO<= /font> Auto-generated method stub

= re= turn true;

}

public StringValue nextRecord(StringValue reuse= ) throws IOExcept= ion {

String content<= font size=3D"2"> =3D new String(Fil= es.readAllBytes(this.= current.getFile()));

logger.info("Content " + content);

return new StringValue(c= ontent);

}

public void close() throws IOException {

// TODO<= /font> Auto-generated method stub

}

}

---


<= p align=3D"LEFT">Thanks,

Mohit

=

=


--f4030437a0a0165bc205558437a5--