flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8802) Concurrent serialization without duplicating serializers in state server.
Date Wed, 14 Mar 2018 08:47:02 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398269#comment-16398269
] 

ASF GitHub Bot commented on FLINK-8802:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174384860
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
---
    @@ -40,6 +54,81 @@
      */
     public class KvStateRegistryTest extends TestLogger {
     
    +	@Test
    +	public void testKvStateEntry() throws InterruptedException {
    +		final int threads = 10;
    +
    +		final CountDownLatch latch1 = new CountDownLatch(threads);
    +		final CountDownLatch latch2 = new CountDownLatch(1);
    +
    +		final List<KvStateInfo<?, ?, ?>> infos = Collections.synchronizedList(new
ArrayList<>());
    +
    +		final JobID jobID = new JobID();
    +
    +		final JobVertexID jobVertexId = new JobVertexID();
    +		final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
    +		final String registrationName = "foobar";
    +
    +		final KvStateRegistry kvStateRegistry = new KvStateRegistry();
    +		final KvStateID stateID = kvStateRegistry.registerKvState(
    +				jobID,
    +				jobVertexId,
    +				keyGroupRange,
    +				registrationName,
    +				new DummyKvState()
    +		);
    +
    +		for (int i = 0; i < threads; i++) {
    +			new Thread(() -> {
    +				final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +				final KvStateInfo<?, ?, ?> stateInfo = kvState.getInfoForCurrentThread();
    +				infos.add(stateInfo);
    +
    +				latch1.countDown();
    +				try {
    +					latch2.await();
    +				} catch (InterruptedException e) {
    +					Assert.fail(e.getMessage());
    +				}
    +
    +			}).start();
    +		}
    +
    +		latch1.await();
    +
    +		final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID);
    +
    +		// verify that all the threads are done correctly.
    +		Assert.assertEquals(threads, infos.size());
    +		Assert.assertEquals(threads, kvState.getCacheSize());
    +
    +		latch2.countDown();
    +
    +		for (KvStateInfo<?, ?, ?> infoA: infos) {
    +			boolean found = false;
    +			for (KvStateInfo<?, ?, ?> infoB: infos) {
    +				if (infoA == infoB) {
    +					if (found) {
    +						Assert.fail("Already found");
    +					}
    +					found = true;
    +				} else {
    +					Assert.assertTrue(infoA != infoB && infoA.equals(infoB));
    --- End diff --
    
    `infoA != infoB` is redundant


> Concurrent serialization without duplicating serializers in state server.
> -------------------------------------------------------------------------
>
>                 Key: FLINK-8802
>                 URL: https://issues.apache.org/jira/browse/FLINK-8802
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State
>    Affects Versions: 1.5.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers are not
duplicated, which may lead to exceptions thrown when a serializer is stateful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message