flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zentol <...@git.apache.org>
Subject [GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...
Date Wed, 04 Apr 2018 08:25:09 GMT
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179057166
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
---
    @@ -262,106 +216,120 @@ private static Thread createShutdownHook(final FileCache cache,
final Logger log
     		);
     	}
     
    +	public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
    +		checkNotNull(jobId);
    +
    +		synchronized (lock) {
    +			Set<ExecutionAttemptID> jobRefCounter = jobRefHolders.get(jobId);
    +
    +			if (jobRefCounter == null || jobRefCounter.isEmpty()) {
    +				LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles()
calls for jobId " + jobId);
    +				return;
    +			}
    +
    +			jobRefCounter.remove(executionId);
    +			if (jobRefCounter.isEmpty()) {
    +				executorService.schedule(new DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS);
    +			}
    +		}
    +	}
    +
     	// ------------------------------------------------------------------------
     	//  background processes
     	// ------------------------------------------------------------------------
     
     	/**
    -	 * Asynchronous file copy process.
    +	 * Asynchronous file copy process from blob server.
     	 */
    -	private static class CopyProcess implements Callable<Path> {
    +	private static class CopyFromBlobProcess implements Callable<Path> {
     
    -		private final Path filePath;
    -		private final Path cachedPath;
    -		private boolean executable;
    +		private final PermanentBlobKey blobKey;
    +		private final Path target;
    +		private final boolean isDirectory;
    +		private final boolean isExecutable;
    +		private final JobID jobID;
    +		private final PermanentBlobService blobService;
     
    -		public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
    -			this.filePath = new Path(e.filePath);
    -			this.executable = e.isExecutable;
    -			this.cachedPath = cachedPath;
    +		CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService,
Path target) {
    +			try {
    +				this.isExecutable = e.isExecutable;
    +				this.isDirectory = e.isZipped;
    +				this.jobID = jobID;
    +				this.blobService = blobService;
    +				this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader());
    +				this.target = target;
    +			} catch (Exception ex) {
    +				throw new RuntimeException(ex);
    +			}
     		}
     
     		@Override
     		public Path call() throws IOException {
    -			// let exceptions propagate. we can retrieve them later from
    -			// the future and report them upon access to the result
    -			copy(filePath, cachedPath, this.executable);
    -			return cachedPath;
    +			final File file = blobService.getFile(jobID, blobKey);
    +
    +			if (isDirectory) {
    +				try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
    +					ZipEntry entry;
    +					while ((entry = zis.getNextEntry()) != null) {
    +						String fileName = entry.getName();
    +						Path newFile = new Path(target, fileName);
    +						if (entry.isDirectory()) {
    +							target.getFileSystem().mkdirs(newFile);
    +						} else {
    +							try (FSDataOutputStream fsDataOutputStream = target.getFileSystem()
    +									.create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
    +								IOUtils.copyBytes(zis, fsDataOutputStream, false);
    +							}
    +							//noinspection ResultOfMethodCallIgnored
    +							new File(newFile.getPath()).setExecutable(isExecutable);
    +						}
    +						zis.closeEntry();
    +					}
    +				}
    +				Files.delete(file.toPath());
    +				return target;
    +			} else {
    +				//noinspection ResultOfMethodCallIgnored
    +				file.setExecutable(isExecutable);
    +				return Path.fromLocalFile(file);
    +			}
    +
     		}
     	}
     
     	/**
     	 * If no task is using this file after 5 seconds, clear it.
     	 */
    -	private static class DeleteProcess implements Runnable {
    -
    -		private final Object lock;
    -		private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>
entries;
    +	private class DeleteProcess implements Runnable {
     
    -		private final String name;
     		private final JobID jobID;
     
    -		public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File,
Path, Future<Path>>>> entries,
    -								String name, JobID jobID) {
    -			this.lock = lock;
    -			this.entries = entries;
    -			this.name = name;
    +		DeleteProcess(JobID jobID) {
     			this.jobID = jobID;
     		}
     
     		@Override
     		public void run() {
     			try {
     				synchronized (lock) {
    -					Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries
= entries.get(jobID);
     
    -					if (jobEntries != null) {
    -						Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
    +					if (jobRefHolders.get(jobID).isEmpty()) {
    +						// abort the copy
    +						for (Future<Path> fileFuture : entries.get(jobID).values()) {
    +							fileFuture.cancel(true);
    +						}
     
    -						if (entry != null) {
    -							int count = entry.f0;
    -							if (count > 1) {
    -								// multiple references still
    -								entry.f0 = count - 1;
    -							}
    -							else {
    -								// we remove the last reference
    -								jobEntries.remove(name);
    -								if (jobEntries.isEmpty()) {
    -									entries.remove(jobID);
    -								}
    -
    -								// abort the copy
    -								entry.f3.cancel(true);
    -
    -								// remove the file
    -								File file = new File(entry.f2.toString());
    -								if (file.exists()) {
    -									if (file.isDirectory()) {
    -										FileUtils.deleteDirectory(file);
    -									}
    -									else if (!file.delete()) {
    -										LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
    -									}
    -								}
    -
    -								// remove the job wide temp directory, if it is now empty
    -								File parent = entry.f1;
    -								if (parent.isDirectory()) {
    -									String[] children = parent.list();
    -									if (children == null || children.length == 0) {
    -										//noinspection ResultOfMethodCallIgnored
    -										parent.delete();
    -									}
    -								}
    -							}
    +						// remove the job wide temp directories
    +						for (File storageDirectory : storageDirectories) {
    +							File tempDir = new File(storageDirectory, jobID.toString());
    +							FileUtils.deleteDirectory(tempDir);
     						}
     					}
     				}
    -			}
    -			catch (IOException e) {
    +			} catch (IOException e) {
     				LOG.error("Could not delete file from local file cache.", e);
     			}
     		}
     	}
    +
    --- End diff --
    
    revert


---

Mime
View raw message