flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dia...@apache.org
Subject [flink] branch master updated: [FLINK-24062][python] Fix the serialization of timer to avoid serializing timer data partially
Date Wed, 01 Sep 2021 03:45:45 GMT
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ba14cc  [FLINK-24062][python] Fix the serialization of timer to avoid serializing
timer data partially
4ba14cc is described below

commit 4ba14cc3ef786623885e3b92a2f949a83edab2aa
Author: Dian Fu <dianfu@apache.org>
AuthorDate: Tue Aug 31 13:15:34 2021 +0800

    [FLINK-24062][python] Fix the serialization of timer to avoid serializing timer data partially
    
    This closes #17065.
---
 .../pyflink/fn_execution/beam/beam_coder_impl_fast.pxd         |  2 +-
 .../pyflink/fn_execution/beam/beam_operations_fast.pyx         |  5 ++---
 flink-python/pyflink/fn_execution/beam/beam_operations_slow.py |  5 ++---
 flink-python/pyflink/fn_execution/beam/beam_stream_fast.pxd    |  2 +-
 flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx    |  7 +++----
 flink-python/pyflink/fn_execution/beam/beam_stream_slow.py     | 10 +++++++---
 .../pyflink/fn_execution/datastream/timerservice_impl.py       |  2 +-
 7 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pxd b/flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pxd
index 0cf78de..126191c 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pxd
+++ b/flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pxd
@@ -38,4 +38,4 @@ cdef class FlinkFieldCoderBeamWrapper(StreamCoderImpl):
 
 cdef class FlinkLengthPrefixCoderBeamWrapper(StreamCoderImpl):
     cdef readonly LengthPrefixBaseCoderImpl _value_coder
-    cdef BeamTimeBasedOutputStream _output_stream
+    cdef readonly BeamTimeBasedOutputStream _output_stream
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
index f7b535e..c7e5a09 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
@@ -90,11 +90,10 @@ cdef class NetworkOutputProcessor(OutputProcessor):
     cpdef process_outputs(self, WindowedValue windowed_value, results):
         output_stream = self._consumer.output_stream
         self._value_coder_impl.encode_to_stream(results, output_stream, True)
-        output_stream.maybe_flush()
+        self._value_coder_impl._output_stream.maybe_flush()
 
     cpdef close(self):
-        if self._value_coder_impl._output_stream:
-            self._value_coder_impl._output_stream.close()
+        self._value_coder_impl._output_stream.close()
 
 cdef class IntermediateOutputProcessor(OutputProcessor):
 
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
index c81c516..809a4b0 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
@@ -48,11 +48,10 @@ class NetworkOutputProcessor(OutputProcessor):
     def process_outputs(self, windowed_value: WindowedValue, results: Iterable[Any]):
         output_stream = self._consumer.output_stream
         self._value_coder_impl.encode_to_stream(results, output_stream, True)
-        output_stream.maybe_flush()
+        self._value_coder_impl._output_stream.maybe_flush()
 
     def close(self):
-        if self._value_coder_impl._output_stream:
-            self._value_coder_impl._output_stream.close()
+        self._value_coder_impl._output_stream.close()
 
 
 class IntermediateOutputProcessor(OutputProcessor):
diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pxd b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pxd
index 3afffb4..6171ae7 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pxd
+++ b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pxd
@@ -36,7 +36,7 @@ cdef class BeamSizeBasedOutputStream(LengthPrefixOutputStream):
     cdef BOutputStream _output_stream
 
     cdef void reset_output_stream(self, BOutputStream output_stream)
-    cdef bint _maybe_flush(self)
+    cpdef bint maybe_flush(self)
 
 cdef class BeamTimeBasedOutputStream(BeamSizeBasedOutputStream):
     cdef bint _flush_event
diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
index c165f5e..583a0c2 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
+++ b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
@@ -84,7 +84,6 @@ cdef class BeamSizeBasedOutputStream(LengthPrefixOutputStream):
             memcpy(self._output_data + self._output_pos, data, length)
         self._output_pos += length
         self._output_stream.pos = self._output_pos
-        self._maybe_flush()
 
     cpdef void flush(self):
         self._output_stream.flush()
@@ -96,7 +95,7 @@ cdef class BeamSizeBasedOutputStream(LengthPrefixOutputStream):
         self._output_pos = output_stream.pos
         self._output_buffer_size = output_stream.buffer_size
 
-    cdef bint _maybe_flush(self):
+    cpdef bint maybe_flush(self):
         if self._output_pos > 10_000_000:
             self.flush()
             return True
@@ -117,9 +116,9 @@ cdef class BeamTimeBasedOutputStream(BeamSizeBasedOutputStream):
             self._periodic_flusher.cancel()
             self._periodic_flusher = None
 
-    cdef bint _maybe_flush(self):
+    cpdef bint maybe_flush(self):
         if self._flush_event:
             self.flush()
             self._flush_event = False
-        elif BeamSizeBasedOutputStream._maybe_flush(self):
+        elif BeamSizeBasedOutputStream.maybe_flush(self):
             self._flush_event = False
diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py b/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
index 5688beb..376a861 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
@@ -47,9 +47,6 @@ class BeamTimeBasedOutputStream(create_OutputStream):
 
     def write(self, b: bytes):
         self._output_stream.write(b)
-        if self._flush_event:
-            self._output_stream.flush()
-            self._flush_event = False
 
     def reset_output_stream(self, output_stream: create_OutputStream):
         self._output_stream = output_stream
@@ -61,3 +58,10 @@ class BeamTimeBasedOutputStream(create_OutputStream):
         if self._periodic_flusher:
             self._periodic_flusher.cancel()
             self._periodic_flusher = None
+
+    def maybe_flush(self):
+        if self._flush_event:
+            self._output_stream.flush()
+            self._flush_event = False
+        else:
+            self._output_stream.maybe_flush()
diff --git a/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py b/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py
index 14c7018..46397af 100644
--- a/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py
+++ b/flink-python/pyflink/fn_execution/datastream/timerservice_impl.py
@@ -141,7 +141,7 @@ class InternalTimerServiceImpl(InternalTimerService[N]):
             hold_timestamp=None,
             paneinfo=None)
         self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
-        self._output_stream.maybe_flush()
+        self._timer_coder_impl._key_coder_impl._value_coder._output_stream.maybe_flush()
 
 
 class TimerServiceImpl(TimerService):

Mime
View raw message