flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xts...@apache.org
Subject [flink] 02/07: [hotfix][core] Check the cpu and heap memory to be positive when building ResourceSpec
Date Mon, 05 Jul 2021 03:04:12 GMT
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bc5f36d9fb063a5af1264ae58bbea9f7c0d54f3b
Author: Yangze Guo <karmagyz@gmail.com>
AuthorDate: Fri Jul 2 15:15:50 2021 +0800

    [hotfix][core] Check the cpu and heap memory to be positive when building ResourceSpec
---
 .../org/apache/flink/api/common/operators/ResourceSpec.java   | 11 ++++++++++-
 .../org/apache/flink/runtime/dispatcher/DispatcherTest.java   |  2 +-
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index f66f2a3..6b24b66 100755
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -70,7 +71,13 @@ public final class ResourceSpec implements Serializable {
     public static final ResourceSpec DEFAULT = UNKNOWN;
 
     /** A ResourceSpec that indicates zero amount of resources. */
-    public static final ResourceSpec ZERO = ResourceSpec.newBuilder(0.0, 0).build();
+    public static final ResourceSpec ZERO =
+            new ResourceSpec(
+                    new CPUResource(0.0),
+                    MemorySize.ZERO,
+                    MemorySize.ZERO,
+                    MemorySize.ZERO,
+                    Collections.emptyMap());
 
     /** How many cpu cores are needed. Can be null only if it is unknown. */
     @Nullable private final CPUResource cpuCores;
@@ -397,6 +404,8 @@ public final class ResourceSpec implements Serializable {
         }
 
         public ResourceSpec build() {
+            checkArgument(cpuCores.getValue().compareTo(BigDecimal.ZERO) > 0);
+            checkArgument(taskHeapMemory.compareTo(MemorySize.ZERO) > 0);
             return new ResourceSpec(
                     cpuCores, taskHeapMemory, taskOffHeapMemory, managedMemory, extendedResources);
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 4cc63e0..3b7b210 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -269,7 +269,7 @@ public class DispatcherTest extends TestLogger {
      */
     @Test
     public void testJobSubmissionWithPartialResourceConfigured() throws Exception {
-        ResourceSpec resourceSpec = ResourceSpec.newBuilder(2.0, 0).build();
+        ResourceSpec resourceSpec = ResourceSpec.newBuilder(2.0, 10).build();
 
         final JobVertex firstVertex = new JobVertex("firstVertex");
         firstVertex.setInvokableClass(NoOpInvokable.class);

Mime
View raw message