flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Flavio Pompermaier (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-2503) Inconsistencies in FileInputFormat hierarchy
Date Mon, 10 Aug 2015 10:19:45 GMT
Flavio Pompermaier created FLINK-2503:
-----------------------------------------

             Summary: Inconsistencies in FileInputFormat hierarchy
                 Key: FLINK-2503
                 URL: https://issues.apache.org/jira/browse/FLINK-2503
             Project: Flink
          Issue Type: Bug
          Components: Core
    Affects Versions: master
            Reporter: Flavio Pompermaier
            Priority: Minor


>From a thread in the user mailing list (Invalid argument reading a file containing a Kryo
object).

I think that there are some inconsistencies in the hierarchy of InputFormats.
The BinaryOutputFormat/TypeSerializerInputFormat should somehow inherit the behaviour of the
FileInputFormat (so respect unsplittable and enumerateNestedFiles) while they doesn't take
into account those flags.
Moreover in the TypeSerializerInputFormat there's a "// TODO: fix this shit" that maybe should
be removed or fixed :)

Also maintaing aligned testForUnsplittable and decorateInputStream is somehow dangerous..
And maybe visibility for getBlockIndexForPosition should be changed to protected?

My need was to implement a TypeSerializerInputFormat<RowBundle> but to achieve that
I had to make a lot of overrides..am I doing something wrong or are those inputFormat somehow
to improve..? This is my IF code (remark: from the comment "Copied from FileInputFormat (override
TypeSerializerInputFormat)" on the code is copied-and-pasted from FileInputFormat..thus MY
code ends there):

public class RowBundleInputFormat extends TypeSerializerInputFormat<RowBundle> {

	private static final long serialVersionUID = 1L;
	private static final Logger LOG = LoggerFactory.getLogger(RowBundleInputFormat.class);

	/** The fraction that the last split may be larger than the others. */
	private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
	private boolean objectRead;

	public RowBundleInputFormat() {
		super(new GenericTypeInfo<>(RowBundle.class));
		unsplittable = true;
	}

	@Override
	protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit
fileSplit) throws Throwable {
		return inputStream;
	}

	@Override
	protected boolean testForUnsplittable(FileStatus pathFile) {
		return true;
	}

	@Override
	public void open(FileInputSplit split) throws IOException {
		super.open(split);
		objectRead = false;
	}

	@Override
	public boolean reachedEnd() throws IOException {
		return this.objectRead;
	}

	@Override
	public RowBundle nextRecord(RowBundle reuse) throws IOException {
		RowBundle yourObject = super.nextRecord(reuse);
		this.objectRead = true; // read only one object
		return yourObject;
	}

	// -------------------------------------------------------------------
	// Copied from FileInputFormat (overriding TypeSerializerInputFormat)
	// -------------------------------------------------------------------
	@Override
	public FileInputSplit[] createInputSplits(int minNumSplits)
			throws IOException {...}

	private long addNestedFiles(Path path, List<FileStatus> files, long length, boolean
logExcludedFiles) throws IOException {...}

	private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, long halfSplitSize,
int startIndex) { ... }

}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message