flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace
Date Thu, 22 Feb 2018 12:58:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372771#comment-16372771
] 

ASF GitHub Bot commented on FLINK-8679:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5518#discussion_r169946803
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
    @@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend(
     		this.restoredKvStateMetaInfos = new HashMap<>();
     		this.materializedSstFiles = new TreeMap<>();
     		this.backendUID = UUID.randomUUID();
    +		this.namespaceOutputStream = new ByteArrayOutputStreamWithPos(8);
     		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier,
this.backendUID);
     	}
     
     	@Override
    -	public <N> Stream<K> getKeys(String state, N namespace) {
    +	public <N> Stream<K> getKeys(String state, N namespace, TypeSerializer<N>
namespaceSerializer) {
     		Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(state);
     		if (columnInfo == null) {
     			return Stream.empty();
     		}
     
    -		RocksIterator iterator = db.newIterator(columnInfo.f0);
    -		iterator.seekToFirst();
    +		RocksIterator iterator = null;
    +		try {
    +			iterator = db.newIterator(columnInfo.f0);
    +			iterator.seekToFirst();
    +
    +			boolean ambiguousKeyPossible = AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer,
namespaceSerializer);
    +			final byte[] nameSpaceBytes;
     
    -		Iterable<K> iterable = () -> new RocksIteratorWrapper<>(iterator, state,
keySerializer, keyGroupPrefixBytes);
    -		Stream<K> targetStream = StreamSupport.stream(iterable.spliterator(), false);
    -		return targetStream.onClose(iterator::close);
    +			try {
    +				namespaceOutputStream.reset();
    +				AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace(
    +					namespace,
    +					namespaceSerializer,
    +					namespaceOutputStream,
    +					new DataOutputViewStreamWrapper(namespaceOutputStream),
    +					ambiguousKeyPossible);
    +				nameSpaceBytes = namespaceOutputStream.toByteArray();
    +			} catch (IOException ex) {
    +				throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.",
ex);
    +			}
    +
    +			final RocksIteratorWrapper<K> iteratorWrapper = new RocksIteratorWrapper<>(iterator,
state, keySerializer, keyGroupPrefixBytes,
    +				ambiguousKeyPossible, nameSpaceBytes);
    +
    +			Stream<K> targetStream = StreamSupport.stream(((Iterable<K>)()->iteratorWrapper).spliterator(),
false);
    +			return targetStream.onClose(() -> {
    +				try {
    +					iteratorWrapper.close();
    +				} catch (Exception ex) {
    +					LOG.warn("Release RocksIteratorWrapper failed.", ex);
    +				}
    +			});
    +		}  catch (Exception ex) {
    --- End diff --
    
    As mentioned in my previous comment, we can solve this without any `try-catch` here, just
create the native iterator further down, where no more exception can happen, i.e.
    ```
    (...)
    		RocksIterator iterator = db.newIterator(columnInfo.f0);
    		iterator.seekToFirst();
    
    		final RocksIteratorWrapper<K> iteratorWrapper = new RocksIteratorWrapper<>(iterator,
state, keySerializer, keyGroupPrefixBytes,
    			ambiguousKeyPossible, nameSpaceBytes);
    
    		Stream<K> targetStream = StreamSupport.stream(((Iterable<K>) () -> iteratorWrapper).spliterator(),
false);
    		return targetStream.onClose(iteratorWrapper::close);
    ```


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-8679
>                 URL: https://issues.apache.org/jira/browse/FLINK-8679
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It doesn't use
the namespace to filter data. And `HeapKeyedBackend.getKeys(stateName, namespace)` has done
that, I think they should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message