apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ilooner <...@git.apache.org>
Subject [GitHub] incubator-apex-malhar pull request: APEXMALHAR-1897 added managed ...
Date Sun, 20 Mar 2016 03:28:41 GMT
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r56758801
  
    --- Diff: library/src/main/java/com/datatorrent/lib/state/managed/Bucket.java ---
    @@ -0,0 +1,534 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.state.managed;
    +
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +
    +import com.datatorrent.lib.fileaccess.FileAccess;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * A bucket that groups events.
    + */
    +public interface Bucket extends ManagedStateComponent
    +{
    +  /**
    +   * @return bucket id
    +   */
    +  long getBucketId();
    +
    +  /**
    +   *
    +   * @return size of bucket in memory.
    +   */
    +  long getSizeInBytes();
    +
    +  /**
    +   * Get value of a key.
    +   *
    +   * @param key        key.
    +   * @param timeBucket time bucket of the key if known; -1 otherwise.
    +   * @param source     source to read from
    +   * @return value of the key.
    +   */
    +  Slice get(Slice key, long timeBucket, ReadSource source);
    +
    +  /**
    +   * Set value of a key.
    +   *
    +   * @param key        key.
    +   * @param timeBucket timeBucket of the key.
    +   * @param value      value of the key.
    +   */
    +  void put(Slice key, long timeBucket, Slice value);
    +
    +  /**
    +   * Triggers the bucket to checkpoint. Returns the non checkpointed data so far.
    +   *
    +   * @return non checkpointed data.
    +   */
    +  Map<Slice, BucketedValue> checkpoint(long windowId);
    +
    +  /**
    +   * Triggers the bucket to commit data till provided window id.
    +   *
    +   * @param windowId window id
    +   */
    +  void committed(long windowId);
    +
    +  /**
    +   * Triggers bucket to free memory which is already persisted in bucket data files.
    +   *
    +   * @return amount of memory freed in bytes.
    +   * @throws IOException
    +   */
    +  long freeMemory() throws IOException;
    +
    +  /**
    +   * Allows the bucket to process/cache data which is recovered (from window files) after
failure.
    +   *
    +   * @param windowId largest recovery window
    +   * @param recoveredData recovered data
    +   */
    +  void recoveredData(long windowId, Map<Slice, Bucket.BucketedValue> recoveredData);
    +
    +  enum ReadSource
    +  {
    +    MEMORY,      //state in memory in key/value form
    +    READERS,     //these are streams in which the key will be searched and serialized.
    +    ALL          //both the above states.
    +  }
    +
    +  class BucketedValue
    +  {
    +    private long timeBucket;
    +    private Slice value;
    +
    +    protected BucketedValue()
    +    {
    +    }
    +
    +    protected BucketedValue(long timeBucket, Slice value)
    +    {
    +      this.timeBucket = timeBucket;
    +      this.value = value;
    +    }
    +
    +    protected long getTimeBucket()
    +    {
    +      return timeBucket;
    +    }
    +
    +    protected void setTimeBucket(long timeBucket)
    +    {
    +      this.timeBucket = timeBucket;
    +    }
    +
    +    public Slice getValue()
    +    {
    +      return value;
    +    }
    +
    +    public void setValue(Slice value)
    +    {
    +      this.value = value;
    +    }
    +
    +    @Override
    +    public boolean equals(Object o)
    +    {
    +      if (this == o) {
    +        return true;
    +      }
    +      if (!(o instanceof BucketedValue)) {
    +        return false;
    +      }
    +
    +      BucketedValue that = (BucketedValue)o;
    +
    +      return timeBucket == that.timeBucket && value.equals(that.value);
    +
    +    }
    +
    +    @Override
    +    public int hashCode()
    +    {
    +      return Objects.hash(timeBucket, value);
    +    }
    +  }
    +
    +  /**
    +   * Default bucket.<br/>
    +   * Not thread-safe.
    +   */
    +  class DefaultBucket implements Bucket
    +  {
    +    private final long bucketId;
    +
    +    //Key -> Ordered values
    +    private Map<Slice, BucketedValue> flash = Maps.newHashMap();
    +
    +    //Data persisted in write ahead logs. window -> bucket
    +    private final transient TreeMap<Long, Map<Slice, BucketedValue>> checkpointedData
= Maps.newTreeMap();
    +
    +    //Data persisted in bucket data files
    +    private final transient Map<Slice, BucketedValue> committedData = Maps.newHashMap();
    +
    +    //Data recovered
    +    private final transient TreeMap<Long, Map<Slice, BucketedValue>> recoveredData
= Maps.newTreeMap();
    --- End diff --
    
    can recovered data be treated the same as checkpointed data?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message