An InputFormat processes multiple InputSplits. open() is called for each InputSplit.
If you don't reset reached to false in open() you will only read a single (i.e., the first) InputSplit and skip all others.

I'd override open as follows:

public void open(FileInputSplit fileSplit) throws IOException {
  super.open();
  reached = false;
}

Cheers, Fabian

2017-08-01 8:08 GMT+02:00 Mohit Anchlia <mohitanchlia@gmail.com>:
I didn't override open. I am using open that got inherited from FileInputFormat . Am I supposed to specifically override open?

On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fhueske@gmail.com> wrote:
Do you set reached to false in open()? 


Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mohitanchlia@gmail.com>:
And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 /**
  *
  */
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
   .getLogger(PDFInputFormat.class.getName());
 private boolean reached = false;
 @Override
 public boolean reachedEnd() throws IOException {
  logger.info("called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 }
 @Override
 public String nextRecord(String reuse) throws IOException {
  logger.info("This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
  logger.info("Content " + content);
  reached = true;
  return content;
 }
}

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mohitanchlia@gmail.com> wrote:

I have a very simple program that just reads all the files in the path. However, flink is not working as expected.

Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to:

[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s).

My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots.


Code ---

  PDFFileInputFormat format = new PDFFileInputFormat();
  format.setFilePath(args[0]);
  format.setNestedFileEnumeration(true);
  logger.info("Number of splits " + format.getNumSplits());

  // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());

  env.createInput(format, TypeInformation.of(StringValue.class)).print();