apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chandnisingh <...@git.apache.org>
Subject [GitHub] incubator-apex-malhar pull request: APEXMALHAR-1897 added managed ...
Date Wed, 23 Mar 2016 00:38:58 GMT
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r57094062
  
    --- Diff: library/src/main/java/com/datatorrent/lib/state/managed/ManagedTimeStateImpl.java
---
    @@ -51,63 +51,57 @@ public void put(long bucketId, long time, Slice key, Slice value)
         }
       }
     
    -  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
       @Override
       public Slice getSync(long bucketId, Slice key)
       {
    -    int bucketIdx = prepareBucket(bucketId);
    -    Bucket bucket = buckets[bucketIdx];
    -
    -    synchronized (bucket) {
    -      return bucket.get(key, -1, Bucket.ReadSource.ALL);
    -    }
    +    return getValueFromBucketSync(bucketId, -1, key);
       }
     
    -  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
       @Override
       public Slice getSync(long bucketId, long time, Slice key)
       {
    -    int bucketIdx = prepareBucket(bucketId);
         long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
         if (timeBucket == -1) {
           //time is expired so no point in looking further.
           return BucketedState.EXPIRED;
         }
    +    return getValueFromBucketSync(bucketId, timeBucket, key);
    +  }
     
    +  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
    +  private Slice getValueFromBucketSync(long bucketId, long timeBucket, Slice key)
    +  {
    +    int bucketIdx = prepareBucket(bucketId);
         Bucket bucket = buckets[bucketIdx];
         synchronized (bucket) {
           return bucket.get(key, timeBucket, Bucket.ReadSource.ALL);
         }
       }
     
    -  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
       @Override
       public Future<Slice> getAsync(long bucketId, Slice key)
       {
    -    int bucketIdx = prepareBucket(bucketId);
    -    Bucket bucket = buckets[bucketIdx];
    -    synchronized (bucket) {
    -      Slice cachedVal = buckets[bucketIdx].get(key, -1, Bucket.ReadSource.MEMORY);
    -      if (cachedVal != null) {
    -        return Futures.immediateFuture(cachedVal);
    -      }
    -      return readerService.submit(new KeyFetchTask(bucket, key, -1, throwable));
    -    }
    +    return getValueFromBucketAsync(bucketId, -1, key);
       }
     
    -  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
       @Override
       public Future<Slice> getAsync(long bucketId, long time, Slice key)
       {
    -    int bucketIdx = prepareBucket(bucketId);
    -    Bucket bucket = buckets[bucketIdx];
         long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
         if (timeBucket == -1) {
           //time is expired so no point in looking further.
           return Futures.immediateFuture(BucketedState.EXPIRED);
         }
    +    return getValueFromBucketAsync(bucketId, timeBucket, key);
    +  }
    +
    +  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
    +  private Future<Slice> getValueFromBucketAsync(long bucketId, long timeBucket,
Slice key)
    --- End diff --
    
    I moved it to the AbstractManagedStateImpl however can't make it static because they need
to access some non-static members and methods - ```buckets``` array and ```prepareBucket()```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message