flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache
Date Mon, 03 Jul 2017 12:24:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072338#comment-16072338
] 

ASF GitHub Bot commented on FLINK-7057:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4238#discussion_r125217930
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
    @@ -107,146 +133,268 @@ public BlobCache(
     			this.numFetchRetries = 0;
     		}
     
    +		// Initializing the clean up task
    +		this.cleanupTimer = new Timer(true);
    +
    +		cleanupInterval = blobClientConfig.getLong(
    +			ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
    +			ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
    +		this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
    +
     		// Add shutdown hook to delete storage directory
     		shutdownHook = BlobUtils.addShutdownHook(this, LOG);
     	}
     
    +	@Override
    +	public void registerJob(JobID jobId) {
    +		synchronized (lockObject) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +			if (ref == null) {
    +				ref = new RefCount();
    +				jobRefCounters.put(jobId, ref);
    +			}
    +			++ref.references;
    +		}
    +	}
    +
    +	@Override
    +	public void releaseJob(JobID jobId) {
    +		synchronized (lockObject) {
    +			RefCount ref = jobRefCounters.get(jobId);
    +
    +			if (ref == null) {
    +				LOG.warn("improper use of releaseJob() without a matching number of registerJob()
calls");
    +				return;
    +			}
    +
    +			--ref.references;
    +			if (ref.references == 0) {
    +				ref.keepUntil = System.currentTimeMillis() + cleanupInterval;
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Returns local copy of the (job-unrelated) file for the BLOB with the given key.
    +	 * <p>
    +	 * The method will first attempt to serve the BLOB from its local cache. If the BLOB
is not in
    +	 * the cache, the method will try to download it from this cache's BLOB server.
    +	 *
    +	 * @param key
    +	 * 		The key of the desired BLOB.
    +	 *
    +	 * @return file referring to the local storage location of the BLOB.
    +	 *
    +	 * @throws IOException
    +	 * 		Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
    +	 */
    +	@Override
    +	public File getFile(BlobKey key) throws IOException {
    +		return getFileInternal(null, key);
    +	}
    +
     	/**
    -	 * Returns the URL for the BLOB with the given key. The method will first attempt to
serve
    -	 * the BLOB from its local cache. If the BLOB is not in the cache, the method will try
to download it
    -	 * from this cache's BLOB server.
    +	 * Returns local copy of the file for the BLOB with the given key.
    +	 * <p>
    +	 * The method will first attempt to serve the BLOB from its local cache. If the BLOB
is not in
    +	 * the cache, the method will try to download it from this cache's BLOB server.
     	 *
    -	 * @param requiredBlob The key of the desired BLOB.
    -	 * @return URL referring to the local storage location of the BLOB.
    -	 * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from
the BLOB server.
    +	 * @param jobId
    +	 * 		ID of the job this blob belongs to
    +	 * @param key
    +	 * 		The key of the desired BLOB.
    +	 *
    +	 * @return file referring to the local storage location of the BLOB.
    +	 *
    +	 * @throws IOException
    +	 * 		Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
     	 */
    -	public URL getURL(final BlobKey requiredBlob) throws IOException {
    +	@Override
    +	public File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException {
    --- End diff --
    
    I think so far the convention is that fields without an annotation are considered `@Nonnull`
and only fields which are annotated with `@Nullable` can be `null`. Otherwise `key` should
also be marked as `@Nonnull`.


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
>                 Key: FLINK-7057
>                 URL: https://issues.apache.org/jira/browse/FLINK-7057
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, Network
>    Affects Versions: 1.4.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR files managed
by it. Instead, we want the {{BlobCache}} to do that itself for all job-related BLOBs. Also,
we do not want to operate on a per-{{BlobKey}} level but rather per job. Therefore, the cleanup
process should be adapted, too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message