flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ryan Hobbs (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss
Date Thu, 28 Sep 2017 19:11:01 GMT
Ryan Hobbs created FLINK-7737:
---------------------------------

             Summary: On HCFS systems, FSDataOutputStream does not issue hsync only hflush
which leads to data loss
                 Key: FLINK-7737
                 URL: https://issues.apache.org/jira/browse/FLINK-7737
             Project: Flink
          Issue Type: Bug
          Components: Streaming Connectors
    Affects Versions: 1.3.2
         Environment: Dev
            Reporter: Ryan Hobbs


During several tests where we simulated failure conditions, we have observed that on HCFS
systems where the data stream is of type FSDataOutputStream, Flink will issue hflush() and
not hsync() which results in data loss.

In the class *StreamWriterBase.java* the code below will execute hsync if the output stream
is of type *HdfsDataOutputStream* but not for streams of type *FSDataOutputStream*.  Is this
by design?

{code}
protected void hflushOrSync(FSDataOutputStream os) throws IOException {
try {
// At this point the refHflushOrSync cannot be null,
// since register method would have thrown if it was.
this.refHflushOrSync.invoke(os);
if (os instanceof HdfsDataOutputStream) {
				((HdfsDataOutputStream) os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
			}
		} catch (InvocationTargetException e) {
String msg = "Error while trying to hflushOrSync!";
LOG.error(msg + " " + e.getCause());
Throwable cause = e.getCause();
if (cause != null && cause instanceof IOException) {
throw (IOException) cause;
			}
throw new RuntimeException(msg, e);
		} catch (Exception e) {
String msg = "Error while trying to hflushOrSync!";
LOG.error(msg + " " + e);
throw new RuntimeException(msg, e);
		}
	}
{code}

Could a potential fix me to perform a sync even on streams of type *FSDataOutputStream*?

{code}
 if (os instanceof HdfsDataOutputStream) {
                                ((HdfsDataOutputStream) os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
                        } else if (os instanceof FSDataOutputStream) {
                                os.hsync();
                        }
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message