flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
Date Tue, 10 Nov 2015 17:13:11 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14998926#comment-14998926
] 

ASF GitHub Bot commented on FLINK-2692:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1266#discussion_r44434469
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
    @@ -18,32 +18,97 @@
     
     package org.apache.flink.api.java.io;
     
    +import com.google.common.base.Preconditions;
    +import com.google.common.primitives.Ints;
    +import org.apache.flink.api.common.io.GenericCsvInputFormat;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.types.parser.FieldParser;
     
    -import org.apache.flink.api.common.typeutils.CompositeType;
    -import org.apache.flink.api.java.tuple.Tuple;
    +import java.io.IOException;
     import org.apache.flink.core.fs.Path;
     import org.apache.flink.util.StringUtils;
     
    -public class CsvInputFormat<OUT> extends CommonCsvInputFormat<OUT> {
    +public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT>
{
     
     	private static final long serialVersionUID = 1L;
    +
    +	public static final String DEFAULT_LINE_DELIMITER = "\n";
    +
    +	public static final String DEFAULT_FIELD_DELIMITER = ",";
    +
    +	protected transient Object[] parsedValues;
     	
    -	public CsvInputFormat(Path filePath, CompositeType<OUT> typeInformation) {
    -		super(filePath, typeInformation);
    +	protected CsvInputFormat(Path filePath) {
    +		super(filePath);
     	}
    -	
    -	public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType<OUT>
typeInformation) {
    -		super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
    +
    +	@Override
    +	public void open(FileInputSplit split) throws IOException {
    +		super.open(split);
    +
    +		@SuppressWarnings("unchecked")
    +		FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
    +
    +		//throw exception if no field parsers are available
    +		if (fieldParsers.length == 0) {
    +			throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers
to parse input");
    +		}
    +
    +		// create the value holders
    +		this.parsedValues = new Object[fieldParsers.length];
    +		for (int i = 0; i < fieldParsers.length; i++) {
    +			this.parsedValues[i] = fieldParsers[i].createValue();
    +		}
    +
    +		// left to right evaluation makes access [0] okay
    +		// this marker is used to fasten up readRecord, so that it doesn't have to check each
call if the line ending is set to default
    +		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
    +			this.lineDelimiterIsLinebreak = true;
    +		}
    +
    +		this.commentCount = 0;
    +		this.invalidLineCount = 0;
     	}
     
     	@Override
    -	protected OUT createTuple(OUT reuse) {
    -		Tuple result = (Tuple) reuse;
    -		for (int i = 0; i < parsedValues.length; i++) {
    -			result.setField(parsedValues[i], i);
    +	public OUT nextRecord(OUT record) throws IOException {
    +		OUT returnRecord = null;
    +		do {
    +			returnRecord = super.nextRecord(record);
    +		} while (returnRecord == null && !reachedEnd());
    +
    +		return returnRecord;
    +	}
    +
    +	public Class<?>[] getFieldTypes() {
    +		return super.getGenericFieldTypes();
    +	}
    +
    +	protected static boolean[] createDefaultMask(int size) {
    +		boolean[] includedMask = new boolean[size];
    +		for (int x=0; x<includedMask.length; x++) {
    +			includedMask[x] = true;
    +		}
    +		return includedMask;
    +	}
    +
    +	protected static boolean[] toBooleanMask(int[] sourceFieldIndices) {
    --- End diff --
    
    This method might give the impression that fields can be read in any order, however they
are parsed in order of their position. This might lead to unexpected behavior if a user species
field indicies out of order, e.g., `int[] {3,1,7,5}`


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat 
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-2692
>                 URL: https://issues.apache.org/jira/browse/FLINK-2692
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Till Rohrmann
>            Assignee: Chesnay Schepler
>            Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a {{Pojo}}
type. As a consequence, the processing logic, which has to work for both types, is overly
complex. For example, the {{CsvInputFormat}} contains fields which are only used when a Pojo
is returned. Moreover, the pojo field information are constructed by calling setter methods
which have to be called in a very specific order, otherwise they fail. E.g. one first has
to call {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the number of
fields might be different. Furthermore, some of the methods can only be called if the return
type is a {{Pojo}} type, because they expect that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more easily maintainable.
I propose to split it up into a {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}}
which take all the required information via their constructors instead of using the {{setFields}}
and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message