flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache
Date Tue, 04 Jul 2017 09:37:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073394#comment-16073394
] 

ASF GitHub Bot commented on FLINK-7057:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4238#discussion_r125430786
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
---
    @@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) {
     			clientSocket.close();
     		}
     		finally {
    -			if (fos != null) {
    -				try {
    -					fos.close();
    -				} catch (Throwable t) {
    -					LOG.warn("Cannot close stream to BLOB staging file", t);
    -				}
    -			}
     			if (incomingFile != null) {
    -				if (!incomingFile.delete()) {
    +				if (!incomingFile.delete() && incomingFile.exists()) {
     					LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath());
     				}
     			}
     		}
     	}
     
     	/**
    -	 * Handles an incoming DELETE request from a BLOB client.
    -	 * 
    -	 * @param inputStream The input stream to read the request from.
    -	 * @param outputStream The output stream to write the response to.
    -	 * @throws java.io.IOException Thrown if an I/O error occurs while reading the request
data from the input stream.
    +	 * Reads a full file from <tt>inputStream</tt> into <tt>incomingFile</tt>
returning its checksum.
    +	 *
    +	 * @param inputStream
    +	 * 		stream to read from
    +	 * @param incomingFile
    +	 * 		file to write to
    +	 * @param buf
    +	 * 		An auxiliary buffer for data serialization/deserialization
    +	 *
    +	 * @return the received file's content hash as a BLOB key
    +	 *
    +	 * @throws IOException
    +	 * 		thrown if an I/O error occurs while reading/writing data from/to the respective
streams
     	 */
    -	private void delete(InputStream inputStream, OutputStream outputStream, byte[] buf)
throws IOException {
    +	private static BlobKey readFileFully(
    +			final InputStream inputStream, final File incomingFile, final byte[] buf)
    +			throws IOException {
    +		MessageDigest md = BlobUtils.createMessageDigest();
    +		FileOutputStream fos = new FileOutputStream(incomingFile);
     
     		try {
    -			int type = inputStream.read();
    -			if (type < 0) {
    -				throw new EOFException("Premature end of DELETE request");
    -			}
    -
    -			if (type == CONTENT_ADDRESSABLE) {
    -				BlobKey key = BlobKey.readFromInputStream(inputStream);
    -				File blobFile = blobServer.getStorageLocation(key);
    -
    -				writeLock.lock();
    -
    -				try {
    -					// we should make the local and remote file deletion atomic, otherwise we might
risk not
    -					// removing the remote file in case of a concurrent put operation
    -					if (blobFile.exists() && !blobFile.delete()) {
    -						throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
    -					}
    -
    -					blobStore.delete(key);
    -				} finally {
    -					writeLock.unlock();
    +			while (true) {
    +				final int bytesExpected = readLength(inputStream);
    +				if (bytesExpected == -1) {
    +					// done
    +					break;
    +				}
    +				if (bytesExpected > BUFFER_SIZE) {
    +					throw new IOException(
    +						"Unexpected number of incoming bytes: " + bytesExpected);
     				}
    -			}
    -			else if (type == NAME_ADDRESSABLE) {
    -				byte[] jidBytes = new byte[JobID.SIZE];
    -				readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");
    -				JobID jobID = JobID.fromByteArray(jidBytes);
     
    -				String key = readKey(buf, inputStream);
    +				readFully(inputStream, buf, 0, bytesExpected, "buffer");
    +				fos.write(buf, 0, bytesExpected);
     
    -				File blobFile = this.blobServer.getStorageLocation(jobID, key);
    +				md.update(buf, 0, bytesExpected);
    +			}
    +			return new BlobKey(md.digest());
    +		} finally {
    --- End diff --
    
    ...and throw an exception if closing the stream fails? (the current and previous take
on this was to swallow the exception and log the error message) How grave would you consider
the failure of closing an output stream to a temporary file which was completely written just
fine?


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
>                 Key: FLINK-7057
>                 URL: https://issues.apache.org/jira/browse/FLINK-7057
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, Network
>    Affects Versions: 1.4.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR files managed
by it. Instead, we want the {{BlobCache}} to do that itself for all job-related BLOBs. Also,
we do not want to operate on a per-{{BlobKey}} level but rather per job. Therefore, the cleanup
process should be adapted, too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message