flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
Date Mon, 02 Oct 2017 13:43:02 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188033#comment-16188033
] 

ASF GitHub Bot commented on FLINK-7068:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r142137151
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
---
    @@ -64,39 +65,60 @@ public FileSystemBlobStore(FileSystem fileSystem, String storagePath)
throws IOE
     	// - Put ------------------------------------------------------------------
     
     	@Override
    -	public void put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
    -		put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
    +	public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException
{
    +		return put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
     	}
     
    -	private void put(File fromFile, String toBlobPath) throws IOException {
    +	private boolean put(File fromFile, String toBlobPath) throws IOException {
     		try (OutputStream os = fileSystem.create(new Path(toBlobPath), FileSystem.WriteMode.OVERWRITE))
{
     			LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
     			Files.copy(fromFile, os);
     		}
    +		return true;
     	}
     
     	// - Get ------------------------------------------------------------------
     
     	@Override
    -	public void get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
    -		get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile);
    +	public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException
{
    +		return get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile, blobKey);
     	}
     
    -	private void get(String fromBlobPath, File toFile) throws IOException {
    +	private boolean get(String fromBlobPath, File toFile, BlobKey blobKey) throws IOException
{
     		checkNotNull(fromBlobPath, "Blob path");
     		checkNotNull(toFile, "File");
    +		checkNotNull(blobKey, "Blob key");
     
     		if (!toFile.exists() && !toFile.createNewFile()) {
     			throw new IOException("Failed to create target file to copy to");
     		}
     
     		final Path fromPath = new Path(fromBlobPath);
    +		MessageDigest md = BlobUtils.createMessageDigest();
    +
    +		final int buffSize = 4096; // like IOUtils#BLOCKSIZE, for chunked file copying
     
     		boolean success = false;
     		try (InputStream is = fileSystem.open(fromPath);
     			FileOutputStream fos = new FileOutputStream(toFile)) {
     			LOG.debug("Copying from {} to {}.", fromBlobPath, toFile);
    -			IOUtils.copyBytes(is, fos); // closes the streams
    +
    +			// not using IOUtils.copyBytes(is, fos) here to be able to create a hash on-the-fly
    +			final byte[] buf = new byte[buffSize];
    +			int bytesRead = is.read(buf);
    +			while (bytesRead >= 0) {
    +				fos.write(buf, 0, bytesRead);
    +				md.update(buf, 0, bytesRead);
    +
    +				bytesRead = is.read(buf);
    +			}
    +
    +			// verify that file contents are correct
    +			final byte[] computedKey = md.digest();
    +			if (!Arrays.equals(computedKey, blobKey.getHash())) {
    +				throw new IOException("Detected data corruption during transfer");
    +			}
    --- End diff --
    
    Nice addition :-)


> change BlobService sub-classes for permanent and transient BLOBs
> ----------------------------------------------------------------
>
>                 Key: FLINK-7068
>                 URL: https://issues.apache.org/jira/browse/FLINK-7068
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, Network
>    Affects Versions: 1.4.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are permanently stored
for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. which even
does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message