flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-987) Extend TypeSerializers and -Comparators to work directly on Memory Segments
Date Tue, 22 Jul 2014 13:27:39 GMT

    [ https://issues.apache.org/jira/browse/FLINK-987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14070242#comment-14070242
] 

ASF GitHub Bot commented on FLINK-987:
--------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/77#discussion_r15226911
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/AbstractPagedOutputView.java
---
    @@ -20,397 +20,233 @@
     package org.apache.flink.runtime.memorymanager;
     
     import java.io.IOException;
    -import java.io.UTFDataFormatException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import com.google.common.base.Preconditions;
     
    -import org.apache.flink.core.memory.DataInputView;
    -import org.apache.flink.core.memory.DataOutputView;
     import org.apache.flink.core.memory.MemorySegment;
     
     
     /**
    - * The base class for all output views that are backed by multiple memory pages. This
base class contains all
    - * encoding methods to write data to a page and detect page boundary crossing. The concrete
sub classes must
    - * implement the methods to collect the current page and provide the next memory page
once the boundary is crossed.
    - * <p>
    - * The paging assumes that all memory segments are of the same size.
    + * The base class for all output views that are backed by multiple memory pages which
are not always kept in memory
    + * but which are supposed to be spilled after being written to. The concrete sub classes
must
    + * implement the methods to collect the current page and provide the next memory page
once the boundary is crossed or
    + * once the underlying buffers are unlocked by a call to {@link #unlock()}.
      */
    -public abstract class AbstractPagedOutputView implements DataOutputView {
    -	
    -	private MemorySegment currentSegment;			// the current memory segment to write to
    -	
    -	protected final int segmentSize;				// the size of the memory segments
    -	
    -	protected final int headerLength;				// the number of bytes to skip at the beginning
of each segment
    -	
    -	private int positionInSegment;					// the offset in the current segment
    -	
    -	private byte[] utfBuffer;						// the reusable array for UTF encodings
    -	
    -	
    +public abstract class AbstractPagedOutputView extends AbstractMemorySegmentOutputView
{
    +
    +	// Can we just return the current segment when we get the next one or do we have to
keep
    +	// them because {@link lock} was called. We need a counter because of nested locking.
    +	protected int lockCount;
    +
    +	// The segments that are currently locked. When {@link lock} is called we lock the current
    +	// segment and all the segments that we "advance" to. Only when {@link unlock} is called
can we return
    +	// all of the segments but the most recent one.
    +	private List<MemorySegment> lockedSegments;
    +
    +	// Index of current segment in list of locked segments. When we are not locked this
is set to 0, because
    +	// then we always only have one segment.
    +	private int currentSegmentIndex;
    +
     	// --------------------------------------------------------------------------------------------
     	//                                    Constructors
     	// --------------------------------------------------------------------------------------------
    -	
    +
     	/**
     	 * Creates a new output view that writes initially to the given initial segment. All
segments in the
     	 * view have to be of the given {@code segmentSize}. A header of length {@code headerLength}
is left
     	 * at the beginning of each segment.
    -	 * 
    +	 *
     	 * @param initialSegment The segment that the view starts writing to.
     	 * @param segmentSize The size of the memory segments.
     	 * @param headerLength The number of bytes to skip at the beginning of each segment
for the header.
     	 */
     	protected AbstractPagedOutputView(MemorySegment initialSegment, int segmentSize, int
headerLength) {
    +		super(initialSegment, segmentSize, headerLength);
     		if (initialSegment == null) {
     			throw new NullPointerException("Initial Segment may not be null");
     		}
    -		this.segmentSize = segmentSize;
    -		this.headerLength = headerLength;
     		this.currentSegment = initialSegment;
     		this.positionInSegment = headerLength;
    +
    +		lockedSegments = new ArrayList<MemorySegment>();
    +
    +		// at the beginning we only have one segment
    +		currentSegmentIndex = 0;
    +		lockCount = 0;
     	}
    -	
    +
     	/**
     	 * @param segmentSize The size of the memory segments.
     	 * @param headerLength The number of bytes to skip at the beginning of each segment
for the header.
     	 */
    -	protected AbstractPagedOutputView(int segmentSize, int headerLength)
    -	{
    -		this.segmentSize = segmentSize;
    -		this.headerLength = headerLength;
    +	protected AbstractPagedOutputView(int segmentSize, int headerLength) {
    +		super(segmentSize, headerLength);
    +
    +		lockedSegments = new ArrayList<MemorySegment>();
    +		currentSegmentIndex = 0;
    +		lockCount = 0;
     	}
    -	
    +
     
     	// --------------------------------------------------------------------------------------------
     	//                                  Page Management
     	// --------------------------------------------------------------------------------------------
    -	
    +
     	/**
    -	 * 
    +	 *
     	 * This method must return a segment. If no more segments are available, it must throw
an
     	 * {@link java.io.EOFException}.
    -	 * 
    -	 * @param current The current memory segment
    -	 * @param positionInCurrent The position in the segment, one after the last valid byte.
    -	 * @return The next memory segment. 
    -	 * 
    -	 * @throws IOException
    +	 *
    +	 * @return The next memory segment.
    +	 *
    +	 * @throws java.io.IOException
    +	 */
    +
    +	protected abstract MemorySegment requestSegment() throws IOException;
    +
    +	/**
    +	 *
    +	 * This method returns a memory segment that has been filled to where it came from.
Child classes
    +	 * are responsible for handling the segments.
    +	 *
    +	 * @param segment The  memory segment
    +	 * @param bytesWritten The position in the segment, one after the last valid byte.
    +	 *
    +	 * @throws java.io.IOException
     	 */
    -	protected abstract MemorySegment nextSegment(MemorySegment current, int positionInCurrent)
throws IOException;
    -	
    -	
    +	protected abstract void returnSegment(MemorySegment segment, int bytesWritten) throws
IOException;
    +
    +
     	/**
     	 * Gets the segment that the view currently writes to.
    -	 * 
    +	 *
     	 * @return The segment the view currently writes to.
     	 */
     	public MemorySegment getCurrentSegment() {
     		return this.currentSegment;
     	}
    -	
    +
     	/**
     	 * Gets the current write position (the position where the next bytes will be written)
     	 * in the current memory segment.
    -	 * 
    +	 *
     	 * @return The current write offset in the current memory segment.
     	 */
     	public int getCurrentPositionInSegment() {
     		return this.positionInSegment;
     	}
    -	
    +
     	/**
     	 * Gets the size of the segments used by this view.
    -	 * 
    +	 *
     	 * @return The memory segment size.
     	 */
     	public int getSegmentSize() {
     		return this.segmentSize;
     	}
    -	
    +
     	/**
     	 * Moves the output view to the next page. This method invokes internally the
    -	 * {@link #nextSegment(MemorySegment, int)} method to give the current memory segment
to the concrete subclass' 
    -	 * implementation and obtain the next segment to write to. Writing will continue inside
the new segment
    -	 * after the header.
    -	 * 
    -	 * @throws IOException Thrown, if the current segment could not be processed or a new
segment could not
    -	 *                     be obtained. 
    +	 * {@link #requestSegment()} and {@link #returnSegment(org.apache.flink.core.memory.MemorySegment,
int)} methods
    +	 * to give the current memory segment to the concrete subclass' implementation and obtain
the next segment to
    +	 * write to. Writing will continue inside the new segment after the header.
    +	 *
    +	 * @throws java.io.IOException Thrown, if the current segment could not be processed
or a new segment could not
    +	 *                     be obtained.
     	 */
    -	protected void advance() throws IOException {
    -		this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment);
    -		this.positionInSegment = this.headerLength;
    -	}
    -	
    -	/**
    -	 * Sets the internal state to the given memory segment and the given position within
the segment. 
    -	 * 
    -	 * @param seg The memory segment to write the next bytes to.
    -	 * @param position The position to start writing the next bytes to.
    -	 */
    -	protected void seekOutput(MemorySegment seg, int position) {
    -		this.currentSegment = seg;
    -		this.positionInSegment = position;
    -	}
    -	
    -	/**
    -	 * Clears the internal state. Any successive write calls will fail until either {@link
#advance()} or
    -	 * {@link #seekOutput(MemorySegment, int)} is called. 
    -	 * 
    -	 * @see #advance()
    -	 * @see #seekOutput(MemorySegment, int)
    -	 */
    -	protected void clear() {
    -		this.currentSegment = null;
    -		this.positionInSegment = this.headerLength;
    -	}
    -	
    -	// --------------------------------------------------------------------------------------------
    -	//                               Data Output Specific methods
    -	// --------------------------------------------------------------------------------------------
    -	
    -	@Override
    -	public void write(int b) throws IOException {
    -		writeByte(b);
    -	}
    -
    -	@Override
    -	public void write(byte[] b) throws IOException {
    -		write(b, 0, b.length);
    -	}
    -
     	@Override
    -	public void write(byte[] b, int off, int len) throws IOException {
    -		int remaining = this.segmentSize - this.positionInSegment;
    -		if (remaining >= len) {
    -			this.currentSegment.put(this.positionInSegment, b, off, len);
    -			this.positionInSegment += len;
    -		}
    -		else {
    -			if (remaining == 0) {
    -				advance();
    -				remaining = this.segmentSize - this.positionInSegment;
    -			}
    -			while (true) {
    -				int toPut = Math.min(remaining, len);
    -				this.currentSegment.put(this.positionInSegment, b, off, toPut);
    -				off += toPut;
    -				len -= toPut;
    -				
    -				if (len > 0) {
    -					this.positionInSegment = this.segmentSize;
    -					advance();
    -					remaining = this.segmentSize - this.positionInSegment;	
    -				}
    -				else {
    -					this.positionInSegment += toPut;
    -					break;
    -				}
    +	protected void advance() throws IOException {
    +		if (lockCount > 0) {
    +			if (currentSegmentIndex < lockedSegments.size() - 1) {
    +				currentSegmentIndex++;
    +				currentSegment = lockedSegments.get(currentSegmentIndex);
    +			} else {
    +				currentSegmentIndex++;
    +				currentSegment = requestSegment();
    +				lockedSegments.add(currentSegment);
     			}
    +		} else {
    +			returnSegment(currentSegment, positionInSegment);
    +			currentSegment = requestSegment();
    +			currentSegmentIndex = 0;
     		}
    +		positionInSegment = headerLength;
     	}
     
    -	@Override
    -	public void writeBoolean(boolean v) throws IOException {
    -		writeByte(v ? 1 : 0);
    -	}
    -
    -	@Override
    -	public void writeByte(int v) throws IOException {
    -		if (this.positionInSegment < this.segmentSize) {
    -			this.currentSegment.put(this.positionInSegment++, (byte) v);
    -		}
    -		else {
    -			advance();
    -			writeByte(v);
    +	public void lock() {
    +		lockCount++;
    +		if (lockCount == 1) {
    +			// we are the first to lock, so start the "locked segments" list
    +			Preconditions.checkState(lockedSegments.isEmpty(), "List of locked segments must be
empty.");
    +			Preconditions.checkState(currentSegmentIndex == 0, "Before locking currentSegmentIndex
must always be 0.");
    +			lockedSegments.add(currentSegment);
     		}
     	}
     
    -	@Override
    -	public void writeShort(int v) throws IOException {
    -		if (this.positionInSegment < this.segmentSize - 1) {
    -			this.currentSegment.putShort(this.positionInSegment, (short) v);
    -			this.positionInSegment += 2;
    -		}
    -		else if (this.positionInSegment == this.segmentSize) {
    -			advance();
    -			writeShort(v);
    -		}
    -		else {
    -			writeByte(v >> 8);
    -			writeByte(v);
    -		}
    -	}
    -
    -	@Override
    -	public void writeChar(int v) throws IOException {
    -		if (this.positionInSegment < this.segmentSize - 1) {
    -			this.currentSegment.putChar(this.positionInSegment, (char) v);
    -			this.positionInSegment += 2;
    -		}
    -		else if (this.positionInSegment == this.segmentSize) {
    -			advance();
    -			writeChar(v);
    -		}
    -		else {
    -			writeByte(v >> 8);
    -			writeByte(v);
    -		}
    -	}
    -
    -	@Override
    -	public void writeInt(int v) throws IOException {
    -		if (this.positionInSegment < this.segmentSize - 3) {
    -			this.currentSegment.putIntBigEndian(this.positionInSegment, v);
    -			this.positionInSegment += 4;
    -		}
    -		else if (this.positionInSegment == this.segmentSize) {
    -			advance();
    -			writeInt(v);
    -		}
    -		else {
    -			writeByte(v >> 24);
    -			writeByte(v >> 16);
    -			writeByte(v >>  8);
    -			writeByte(v);
    -		}
    -	}
    +	// here positions are relative to first locked segment
    +	public long tell() {
    +		Preconditions.checkState(lockCount > 0, "Target buffer must be locked.");
     
    -	@Override
    -	public void writeLong(long v) throws IOException {
    -		if (this.positionInSegment < this.segmentSize - 7) {
    -			this.currentSegment.putLongBigEndian(this.positionInSegment, v);
    -			this.positionInSegment += 8;
    -		}
    -		else if (this.positionInSegment == this.segmentSize) {
    -			advance();
    -			writeLong(v);
    -		}
    -		else {
    -			writeByte((int) (v >> 56));
    -			writeByte((int) (v >> 48));
    -			writeByte((int) (v >> 40));
    -			writeByte((int) (v >> 32));
    -			writeByte((int) (v >> 24));
    -			writeByte((int) (v >> 16));
    -			writeByte((int) (v >>  8));
    -			writeByte((int) v);
    -		}
    -	}
    -
    -	@Override
    -	public void writeFloat(float v) throws IOException {
    -		writeInt(Float.floatToRawIntBits(v));
    -	}
    -
    -	@Override
    -	public void writeDouble(double v) throws IOException {
    -		writeLong(Double.doubleToRawLongBits(v));
    -	}
    -
    -	@Override
    -	public void writeBytes(String s) throws IOException {
    -		for (int i = 0; i < s.length(); i++) {
    -			writeByte(s.charAt(i));
    -		}
    +		return currentSegmentIndex * segmentSize + positionInSegment;
     	}
     
    -	@Override
    -	public void writeChars(String s) throws IOException {
    -		for (int i = 0; i < s.length(); i++) {
    -			writeChar(s.charAt(i));
    -		}
    -	}
    +	public void seek(long position) throws IOException {
    +		Preconditions.checkState(lockCount > 0, "Target buffer must be locked.");
    +		Preconditions.checkArgument(position >= 0, "position must be positive");
     
    -	@Override
    -	public void writeUTF(String str) throws IOException {
    -		int strlen = str.length();
    -		int utflen = 0;
    -		int c, count = 0;
    -
    -		/* use charAt instead of copying String to char array */
    -		for (int i = 0; i < strlen; i++) {
    -			c = str.charAt(i);
    -			if ((c >= 0x0001) && (c <= 0x007F)) {
    -				utflen++;
    -			} else if (c > 0x07FF) {
    -				utflen += 3;
    -			} else {
    -				utflen += 2;
    +		// Check whether we have to seek into new segments or whether we are simply seeking
    +		// back into our locked segments
    +		int positionSegmentIndex = (int) (position / segmentSize);
    +		int positionInLastSegment = (int) (position % segmentSize);
    +		if (positionSegmentIndex > currentSegmentIndex) {
    +			int segmentsToAdvance = positionSegmentIndex - currentSegmentIndex;
    +			for (int i = 0; i < segmentsToAdvance; i++) {
    +				// fill all the segments that we seek over (might be random data though)
    +				advance();
     			}
    +		} else {
    +			currentSegmentIndex = positionSegmentIndex;
    +			currentSegment = lockedSegments.get(currentSegmentIndex);
     		}
    +		positionInSegment = positionInLastSegment;
    +	}
     
    -		if (utflen > 65535) {
    -			throw new UTFDataFormatException("encoded string too long: " + utflen + " memory");
    +	public void unlock() throws IOException {
    +		if (lockCount <= 0) {
    +			throw new RuntimeException("Unlock call without previous lock.");
    --- End diff --
    
    I'm not sure. You're probably right, though.


> Extend TypeSerializers and -Comparators to work directly on Memory Segments
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-987
>                 URL: https://issues.apache.org/jira/browse/FLINK-987
>             Project: Flink
>          Issue Type: Improvement
>          Components: Local Runtime
>    Affects Versions: 0.6-incubating
>            Reporter: Stephan Ewen
>            Assignee: Aljoscha Krettek
>             Fix For: 0.6-incubating
>
>
> As per discussion with [~till.rohrmann], [~uce], [~aljoscha], we suggest to change the
way that the TypeSerialzers/Comparators and DataInputViews/DataOutputViews work.
> The goal is to allow more flexibility in the construction on the binary representation
of data types, and to allow partial deserialization of individual fields. Both is currently
prohibited by the fact that the abstraction of the memory (into which the data goes) is a
stream abstraction ({{DataInputView}}, {{DataOutputView}}).
> An idea is to offer a random-access buffer like view for construction and random-access
deserialization, as well as various methods to copy elements in a binary fashion between such
buffers and streams.
> A possible set of methods for the {{TypeSerializer}} could be:
> {code}
> long serialize(T record, TargetBuffer buffer);
> 	
> T deserialize(T reuse, SourceBuffer source);
> 	
> void ensureBufferSufficientlyFilled(SourceBuffer source);
> 	
> <X> X deserializeField(X reuse, int logicalPos, SourceBuffer buffer);
> 	
> int getOffsetForField(int logicalPos, int offset, SourceBuffer buffer);
> 	
> void copy(DataInputView in, TargetBuffer buffer);
> 	
> void copy(SourceBuffer buffer,, DataOutputView out);
> 	
> void copy(DataInputView source, DataOutputView target);
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message