flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5530) race condition in AbstractRocksDBState#getSerializedValue
Date Fri, 20 Jan 2017 12:03:26 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831612#comment-15831612
] 

ASF GitHub Bot commented on FLINK-5530:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3143#discussion_r97059850
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
---
    @@ -132,55 +132,91 @@ public void setCurrentNamespace(N namespace) {
     				namespaceSerializer);
     
     		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
    -		writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
    -		return backend.db.get(columnFamily, keySerializationStream.toByteArray());
    +
    +		// we cannot reuse the keySerializationStream member since this method
    +		// is called concurrently to the other ones and it may thus contain garbage
    +		ByteArrayOutputStreamWithPos tmpKeySerializationStream =
    +			new ByteArrayOutputStreamWithPos(128);
    +		DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView =
    +			new DataOutputViewStreamWrapper(tmpKeySerializationStream);
    +
    +		writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
    +			tmpKeySerializationStream, tmpKeySerializationDateDataOutputView);
    +
    +		return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray());
     
     	}
     
     	protected void writeCurrentKeyWithGroupAndNamespace() throws IOException {
    -		writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(),
currentNamespace);
    +		writeKeyWithGroupAndNamespace(
    +			backend.getCurrentKeyGroupIndex(),
    +			backend.getCurrentKey(),
    +			currentNamespace,
    +			this.keySerializationStream,
    +			this.keySerializationDateDataOutputView);
     	}
     
    -	protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws
IOException {
    +	protected void writeKeyWithGroupAndNamespace(
    +			int keyGroup, K key, N namespace,
    +			final ByteArrayOutputStreamWithPos keySerializationStream,
    +			final DataOutputView keySerializationDateDataOutputView) throws IOException {
    +
     		keySerializationStream.reset();
    -		writeKeyGroup(keyGroup);
    -		writeKey(key);
    -		writeNameSpace(namespace);
    +		writeKeyGroup(keyGroup, keySerializationDateDataOutputView);
    +		writeKey(key, keySerializationStream, keySerializationDateDataOutputView);
    +		writeNameSpace(namespace, keySerializationStream, keySerializationDateDataOutputView);
     	}
     
    -	private void writeKeyGroup(int keyGroup) throws IOException {
    +	private void writeKeyGroup(
    +			int keyGroup, final DataOutputView keySerializationDateDataOutputView)
    +			throws IOException {
    +
     		for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
     			keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3));
     		}
     	}
     
    -	private void writeKey(K key) throws IOException {
    +	private void writeKey(
    +			K key, final ByteArrayOutputStreamWithPos keySerializationStream,
    +			final DataOutputView keySerializationDateDataOutputView) throws IOException {
    +
     		//write key
     		int beforeWrite = keySerializationStream.getPosition();
     		backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView);
     
     		if (ambiguousKeyPossible) {
     			//write size of key
    -			writeLengthFrom(beforeWrite);
    +			writeLengthFrom(beforeWrite, keySerializationStream,
    +				keySerializationDateDataOutputView);
     		}
     	}
     
    -	private void writeNameSpace(N namespace) throws IOException {
    +	private void writeNameSpace(
    +			N namespace, final ByteArrayOutputStreamWithPos keySerializationStream,
    +			final DataOutputView keySerializationDateDataOutputView) throws IOException {
    +
     		int beforeWrite = keySerializationStream.getPosition();
     		namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView);
     
     		if (ambiguousKeyPossible) {
     			//write length of namespace
    -			writeLengthFrom(beforeWrite);
    +			writeLengthFrom(beforeWrite, keySerializationStream,
    +				keySerializationDateDataOutputView);
     		}
     	}
     
    -	private void writeLengthFrom(int fromPosition) throws IOException {
    +	private static void writeLengthFrom(
    +			int fromPosition, final ByteArrayOutputStreamWithPos keySerializationStream,
    +			final DataOutputView keySerializationDateDataOutputView) throws IOException {
    +
     		int length = keySerializationStream.getPosition() - fromPosition;
    -		writeVariableIntBytes(length);
    +		writeVariableIntBytes(length, keySerializationDateDataOutputView);
     	}
     
    -	private void writeVariableIntBytes(int value) throws IOException {
    +	private static void writeVariableIntBytes(
    +			int value, final DataOutputView keySerializationDateDataOutputView)
    --- End diff --
    
    Why are the output view arguments declared `final` here and in other methods? Usually
we only mark method arguments as final if required for anonymous classes, etc.


> race condition in AbstractRocksDBState#getSerializedValue
> ---------------------------------------------------------
>
>                 Key: FLINK-5530
>                 URL: https://issues.apache.org/jira/browse/FLINK-5530
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State
>    Affects Versions: 1.2.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Blocker
>
> AbstractRocksDBState#getSerializedValue() uses the same key serialisation stream as the
ordinary state access methods but is called in parallel during state queries thus violating
the assumption of only one thread accessing it. 
> This may lead to either wrong results in queries or corrupt data while queries are executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message