flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10157) Allow `null` user values in map state with TTL
Date Wed, 19 Sep 2018 08:51:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620314#comment-16620314
] 

ASF GitHub Bot commented on FLINK-10157:
----------------------------------------

StefanRRichter commented on a change in pull request #6707: [FLINK-10157] [State TTL] Allow
`null` user values in map state with TTL
URL: https://github.com/apache/flink/pull/6707#discussion_r218484596
 
 

 ##########
 File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
 ##########
 @@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Serializer wrapper to add support of null value serialization.
+ *
+ * <p>If the target serializer does not support null values of its type,
+ * you can use this class to wrap this serializer.
+ * This is a generic treatment of null value serialization
+ * which comes with the cost of additional byte in the final serialized value.
+ * The {@code NullableSerializer} will intercept null value serialization case
+ * and prepend the target serialized value with a boolean flag marking whether it is null
or not.
+ * <pre> {@code
+ * TypeSerializer<T> originalSerializer = ...;
+ * TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrap(originalSerializer);
+ * // or
+ * TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrapIfNullIsNotSupported(originalSerializer);
+ * }}</pre>
+ *
+ * @param <T> type to serialize
+ */
+public class NullableSerializer<T> extends TypeSerializer<T> {
+	private static final long serialVersionUID = 3335569358214720033L;
+
+	private final TypeSerializer<T> originalSerializer;
+
+	private NullableSerializer(TypeSerializer<T> originalSerializer) {
+		Preconditions.checkNotNull(originalSerializer, "The original serializer cannot be null");
+		this.originalSerializer = originalSerializer;
+	}
+
+	/**
+	 * This method tries to serialize null value with the {@code originalSerializer}
+	 * and wraps it in case of {@link NullPointerException}, otherwise it returns the {@code
originalSerializer}.
+	 */
+	public static <T> TypeSerializer<T> wrapIfNullIsNotSupported(TypeSerializer<T>
originalSerializer) {
+		return checkIfNullSupported(originalSerializer) ? originalSerializer : wrap(originalSerializer);
+	}
+
+	private static <T> boolean checkIfNullSupported(TypeSerializer<T> originalSerializer)
{
+		try {
+			originalSerializer.serialize(null, new DataOutputSerializer(1));
+			Preconditions.checkArgument(originalSerializer.copy(null) == null);
+		} catch (NullPointerException | IOException e) {
 
 Review comment:
   I wonder if we should be broader and catch `IOException` and `RuntimeException`, e.g. some
people might wrongly throw `IllegalArgumentException` in their serializer.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Allow `null` user values in map state with TTL
> ----------------------------------------------
>
>                 Key: FLINK-10157
>                 URL: https://issues.apache.org/jira/browse/FLINK-10157
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>         Environment: Flink:1.6.0
> Scala:2.11
> JDK:1.8
>            Reporter: chengjie.wu
>            Assignee: Andrey Zagrebin
>            Priority: Minor
>              Labels: pull-request-available
>         Attachments: StateWithOutTtlTest.scala, StateWithTtlTest.scala
>
>
> Thanks for the StateTtl feature,this is exactly what I need now! But I found an issue.
> In the previous version or when StateTtl is not enabled,MapState allows `null` value,that
means after
> {code:java}
> mapState.put("key", null){code}
> , then
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff0000}*true*{color}, but when StateTtl is enabled,
> {code:java}
> mapState.contains("key"){code}
> will return {color:#ff0000}*false*{color}(*the key has not expired*).
>  So I think the field `userValue` in `org.apache.flink.runtime.state.ttl.TtlValue` should
allow `null` value. User state is null may not means the TtlValue should be null.
>  
> {code:java}
> /**
>  * This class wraps user value of state with TTL.
>  *
>  * @param <T> Type of the user value of state with TTL
>  */
> class TtlValue<T> implements Serializable {
>  private final T userValue;
>  private final long lastAccessTimestamp;
> TtlValue(T userValue, long lastAccessTimestamp) {
>  Preconditions.checkNotNull(userValue);
>  this.userValue = userValue;
>  this.lastAccessTimestamp = lastAccessTimestamp;
>  }
> T getUserValue() {
>  return userValue;
>  }
> long getLastAccessTimestamp() {
>  return lastAccessTimestamp;
>  }
> }
> {code}
> Am I understanding right?
> This is my test class.
> [^StateWithTtlTest.scala] [^StateWithOutTtlTest.scala]
> ^Thanks!:)^



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message