apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ilooner <...@git.apache.org>
Subject [GitHub] incubator-apex-malhar pull request: APEXMALHAR-1897 added managed ...
Date Sun, 20 Mar 2016 04:07:36 GMT
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r56759001
  
    --- Diff: library/src/main/java/com/datatorrent/lib/state/managed/Bucket.java ---
    @@ -0,0 +1,534 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.state.managed;
    +
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +
    +import com.datatorrent.lib.fileaccess.FileAccess;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * A bucket that groups events.
    + */
    +public interface Bucket extends ManagedStateComponent
    +{
    +  /**
    +   * @return bucket id
    +   */
    +  long getBucketId();
    +
    +  /**
    +   *
    +   * @return size of bucket in memory.
    +   */
    +  long getSizeInBytes();
    +
    +  /**
    +   * Get value of a key.
    +   *
    +   * @param key        key.
    +   * @param timeBucket time bucket of the key if known; -1 otherwise.
    +   * @param source     source to read from
    +   * @return value of the key.
    +   */
    +  Slice get(Slice key, long timeBucket, ReadSource source);
    +
    +  /**
    +   * Set value of a key.
    +   *
    +   * @param key        key.
    +   * @param timeBucket timeBucket of the key.
    +   * @param value      value of the key.
    +   */
    +  void put(Slice key, long timeBucket, Slice value);
    +
    +  /**
    +   * Triggers the bucket to checkpoint. Returns the non checkpointed data so far.
    +   *
    +   * @return non checkpointed data.
    +   */
    +  Map<Slice, BucketedValue> checkpoint(long windowId);
    +
    +  /**
    +   * Triggers the bucket to commit data till provided window id.
    +   *
    +   * @param windowId window id
    +   */
    +  void committed(long windowId);
    +
    +  /**
    +   * Triggers bucket to free memory which is already persisted in bucket data files.
    +   *
    +   * @return amount of memory freed in bytes.
    +   * @throws IOException
    +   */
    +  long freeMemory() throws IOException;
    +
    +  /**
    +   * Allows the bucket to process/cache data which is recovered (from window files) after
failure.
    +   *
    +   * @param windowId largest recovery window
    +   * @param recoveredData recovered data
    +   */
    +  void recoveredData(long windowId, Map<Slice, Bucket.BucketedValue> recoveredData);
    +
    +  enum ReadSource
    +  {
    +    MEMORY,      //state in memory in key/value form
    +    READERS,     //these are streams in which the key will be searched and serialized.
    +    ALL          //both the above states.
    +  }
    +
    +  class BucketedValue
    +  {
    +    private long timeBucket;
    +    private Slice value;
    +
    +    protected BucketedValue()
    +    {
    +    }
    +
    +    protected BucketedValue(long timeBucket, Slice value)
    +    {
    +      this.timeBucket = timeBucket;
    +      this.value = value;
    +    }
    +
    +    protected long getTimeBucket()
    +    {
    +      return timeBucket;
    +    }
    +
    +    protected void setTimeBucket(long timeBucket)
    +    {
    +      this.timeBucket = timeBucket;
    +    }
    +
    +    public Slice getValue()
    +    {
    +      return value;
    +    }
    +
    +    public void setValue(Slice value)
    +    {
    +      this.value = value;
    +    }
    +
    +    @Override
    +    public boolean equals(Object o)
    +    {
    +      if (this == o) {
    +        return true;
    +      }
    +      if (!(o instanceof BucketedValue)) {
    +        return false;
    +      }
    +
    +      BucketedValue that = (BucketedValue)o;
    +
    +      return timeBucket == that.timeBucket && value.equals(that.value);
    +
    +    }
    +
    +    @Override
    +    public int hashCode()
    +    {
    +      return Objects.hash(timeBucket, value);
    +    }
    +  }
    +
    +  /**
    +   * Default bucket.<br/>
    +   * Not thread-safe.
    +   */
    +  class DefaultBucket implements Bucket
    +  {
    +    private final long bucketId;
    +
    +    //Key -> Ordered values
    +    private Map<Slice, BucketedValue> flash = Maps.newHashMap();
    +
    +    //Data persisted in write ahead logs. window -> bucket
    +    private final transient TreeMap<Long, Map<Slice, BucketedValue>> checkpointedData
= Maps.newTreeMap();
    +
    +    //Data persisted in bucket data files
    +    private final transient Map<Slice, BucketedValue> committedData = Maps.newHashMap();
    +
    +    //Data recovered
    +    private final transient TreeMap<Long, Map<Slice, BucketedValue>> recoveredData
= Maps.newTreeMap();
    +
    +    //Data serialized/deserialized from bucket data files
    +    private final transient Map<Slice, BucketedValue> readCache = Maps.newConcurrentMap();
    +
    +    //TimeBucket -> FileReaders
    +    private final transient Map<Long, FileAccess.FileReader> readers = Maps.newTreeMap();
    +
    +    protected transient ManagedStateContext managedStateContext;
    +
    +    private AtomicLong sizeInBytes = new AtomicLong(0);
    +
    +    private final transient Slice dummyGetKey = new Slice(null, 0, 0);
    +
    +    private transient TreeSet<BucketsFileSystem.ImmutableTimeBucketMeta> cachedBucketMetas;
    +
    +    private DefaultBucket()
    +    {
    +      //for kryo
    +      bucketId = -1;
    +    }
    +
    +    protected DefaultBucket(long bucketId)
    +    {
    +      this.bucketId = bucketId;
    +    }
    +
    +    @Override
    +    public void setup(@NotNull ManagedStateContext managedStateContext)
    +    {
    +      this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed
state context");
    +    }
    +
    +    @Override
    +    public long getBucketId()
    +    {
    +      return bucketId;
    +    }
    +
    +    @Override
    +    public long getSizeInBytes()
    +    {
    +      return sizeInBytes.longValue();
    +    }
    +
    +    private Slice getFromMemory(Slice key)
    +    {
    +      //search the cache for key
    +      BucketedValue bucketedValue = flash.get(key);
    +      if (bucketedValue != null) {
    +        return bucketedValue.getValue();
    +      }
    +
    +      for (Long window : checkpointedData.descendingKeySet()) {
    +        //traverse the checkpointed data in reverse order
    +        bucketedValue = checkpointedData.get(window).get(key);
    +        if (bucketedValue != null) {
    +          return bucketedValue.getValue();
    +        }
    +      }
    +
    +      for (Long window : recoveredData.descendingKeySet()) {
    +        //traverse the reccovered data in reverse order
    +        bucketedValue = recoveredData.get(window).get(key);
    +        if (bucketedValue != null) {
    +          return bucketedValue.getValue();
    +        }
    +      }
    +
    +      bucketedValue = committedData.get(key);
    +      if (bucketedValue != null) {
    +        return bucketedValue.getValue();
    +      }
    +
    +      bucketedValue = readCache.get(key);
    +      if (bucketedValue != null) {
    +        return bucketedValue.getValue();
    +      }
    +      return null;
    +    }
    +
    +    private Slice getFromReaders(Slice key, long timeBucket)
    +    {
    +      if (timeBucket != -1) {
    +        Slice valSlice = getKeyFromTimeBucketReader(key, timeBucket);
    +        if (valSlice != null) {
    +          BucketedValue bucketedValue = new BucketedValue(timeBucket, valSlice);
    +          readCache.put(key, bucketedValue);
    --- End diff --
    
    If the user does a get for key "A" time bucket 1 first. Then that value will be put in
the read cache. If the user then does a get for key "A" time bucket 2, then the time bucket
1 value is retrieved from the read cache and returned. So the value for an older timebucket
is returned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message