An InputFormat processes multiple InputSplits. open() is called for each InputSplit.
If you don't reset reached to false in open() you will only read a single (i.e., the first) InputSplit and skip all others.

I'd override open as follows:

public void open(FileInputSplit fileSplit) throws IOException {;
  reached = false;

Cheers, Fabian

2017-08-01 8:08 GMT+02:00 Mohit Anchlia <>:
I didn't override open. I am using open that got inherited from FileInputFormat . Am I supposed to specifically override open?

On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <> wrote:
Do you set reached to false in open()? 

Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <>:
And here is the inputformat code:

public class PDFFileInputFormat extends FileInputFormat<String> {
 private static final long serialVersionUID = -4137283038479003711L;
 private static final Logger logger = LoggerFactory
 private boolean reached = false;
 public boolean reachedEnd() throws IOException {"called reached " + reached);
  // TODO Auto-generated method stub
  return reached;
 public String nextRecord(String reuse) throws IOException {"This is where you parse PDF");
  String content = new String(
    Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));"Content " + content);
  reached = true;
  return content;

On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <> wrote:

I have a very simple program that just reads all the files in the path. However, flink is not working as expected.

Everytime I execute this job I only see flink reading 2 files, even though there are more in that directory. On closer look it appears that it might be related to:

[] INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2 task slot(s).

My question is, isn't flink supposed to iterate over the directory after those 2 slots become free again? I am assuming this problem is caused because there are only 2 slots.

Code ---

  PDFFileInputFormat format = new PDFFileInputFormat();
  format.setNestedFileEnumeration(true);"Number of splits " + format.getNumSplits());


  env.createInput(format, TypeInformation.of(StringValue.class)).print();