flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StephanEwen <...@git.apache.org>
Subject [GitHub] flink pull request #3143: [FLINK-5530] fix race condition in AbstractRocksDB...
Date Wed, 18 Jan 2017 11:25:35 GMT
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3143#discussion_r96612430
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
---
    @@ -242,6 +245,132 @@ public void testValueState() throws Exception {
     		backend.dispose();
     	}
     
    +	/**
    +	 * Tests {@link ValueState#value()} and {@link KvState#getSerializedValue(byte[])}
    +	 * accessing the state concurrently. They should not get in the way of each
    +	 * other.
    +	 */
    +	@Test
    +	@SuppressWarnings("unchecked")
    +	public void testValueStateRace() throws Exception {
    +		final AbstractKeyedStateBackend<Integer> backend =
    +			createKeyedBackend(IntSerializer.INSTANCE);
    +		final Integer namespace = Integer.valueOf(1);
    +
    +		final ValueStateDescriptor<String> kvId =
    +			new ValueStateDescriptor<>("id", String.class);
    +		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
    +
    +		final TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
    +		final TypeSerializer<Integer> namespaceSerializer =
    +			IntSerializer.INSTANCE;
    +		final TypeSerializer<String> valueSerializer = kvId.getSerializer();
    +
    +		final ValueState<String> state = backend
    +			.getPartitionedState(namespace, IntSerializer.INSTANCE, kvId);
    +
    +		@SuppressWarnings("unchecked")
    +		final KvState<Integer> kvState = (KvState<Integer>) state;
    +
    +		/**
    +		 * 1) Test that ValueState#value() before and after
    +		 * KvState#getSerializedValue(byte[]) return the same value.
    +		 */
    +
    +		// set some key and namespace
    +		final int key1 = 1;
    +		backend.setCurrentKey(key1);
    +		kvState.setCurrentNamespace(2);
    +		state.update("2");
    +		assertEquals("2", state.value());
    +
    +		// query another key and namespace
    +		assertNull(getSerializedValue(kvState, 3, keySerializer,
    +			namespace, IntSerializer.INSTANCE,
    +			valueSerializer));
    +
    +		// the state should not have changed!
    +		assertEquals("2", state.value());
    +
    +		// re-set values
    +		kvState.setCurrentNamespace(namespace);
    +
    +		/**
    +		 * 2) Test two threads concurrently using ValueState#value() and
    +		 * KvState#getSerializedValue(byte[]).
    +		 */
    +
    +		// some modifications to the state
    +		final int key2 = 10;
    +		backend.setCurrentKey(key2);
    +		assertNull(state.value());
    +		assertNull(getSerializedValue(kvState, key2, keySerializer,
    +			namespace, namespaceSerializer, valueSerializer));
    +		state.update("1");
    +
    +		boolean getterSuccess;
    +		final Throwable[] throwables = {null, null};
    +
    +		final Thread getter = new Thread("State getter") {
    --- End diff --
    
    How about using the `CheckedThread` to avoid the stuff with Throwable arrays, etc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message