apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chandnisingh <...@git.apache.org>
Subject [GitHub] incubator-apex-malhar pull request: APEXMALHAR-1897 added managed ...
Date Mon, 21 Mar 2016 02:48:18 GMT
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/145#discussion_r56779057
  
    --- Diff: library/src/main/java/com/datatorrent/lib/state/managed/BucketsFileSystem.java
---
    @@ -0,0 +1,573 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.state.managed;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import java.util.concurrent.ConcurrentSkipListSet;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.fs.LocatedFileStatus;
    +import org.apache.hadoop.fs.RemoteIterator;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Ordering;
    +import com.google.common.collect.Sets;
    +import com.google.common.collect.Table;
    +import com.google.common.collect.TreeBasedTable;
    +
    +import com.datatorrent.lib.fileaccess.FileAccess;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Persists bucket data on disk and maintains meta information about the buckets.
    + * <p/>
    + *
    + * Each bucket has a meta-data file and the format of that is :<br/>
    + * <ol>
    + * <li>total number of time-buckets (int)</li>
    + * <li>For each time bucket
    + * <ol>
    + * <li>time bucket key (long)</li>
    + * <li>size of data (sum of bytes) (long)</li>
    + * <li>last transferred window id (long)</li>
    + * <li>length of the first key in the time-bucket file (int)</li>
    + * <li>first key in the time-bucket file (byte[])</li>
    + * </ol>
    + * </li>
    + * </ol>
    + * <p/>
    + * Meta data information is updated by {@link IncrementalCheckpointManager}. Any updates
are restricted to the package.
    + */
    +public class BucketsFileSystem implements ManagedStateComponent
    +{
    +  public static final String META_FILE_NAME = "_META";
    +
    +  private final transient TreeBasedTable<Long, Long, MutableTimeBucketMeta> timeBucketsMeta
= TreeBasedTable.create();
    +
    +  //Check-pointed set of all buckets this instance has written to.
    +  protected final Set<Long> bucketNamesOnFS = new ConcurrentSkipListSet<>();
    +
    +  protected transient ManagedStateContext managedStateContext;
    +
    +  @Override
    +  public void setup(@NotNull ManagedStateContext managedStateContext)
    +  {
    +    this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed
state context");
    +  }
    +
    +  protected FileAccess.FileWriter getWriter(long bucketId, String fileName) throws IOException
    +  {
    +    return managedStateContext.getFileAccess().getWriter(bucketId, fileName);
    +  }
    +
    +  protected FileAccess.FileReader getReader(long bucketId, String fileName) throws IOException
    +  {
    +    return managedStateContext.getFileAccess().getReader(bucketId, fileName);
    +  }
    +
    +  protected void rename(long bucketId, String fromName, String toName) throws IOException
    +  {
    +    managedStateContext.getFileAccess().rename(bucketId, fromName, toName);
    +  }
    +
    +  protected DataOutputStream getOutputStream(long bucketId, String fileName) throws IOException
    +  {
    +    return managedStateContext.getFileAccess().getOutputStream(bucketId, fileName);
    +  }
    +
    +  protected DataInputStream getInputStream(long bucketId, String fileName) throws IOException
    +  {
    +    return managedStateContext.getFileAccess().getInputStream(bucketId, fileName);
    +  }
    +
    +  protected boolean exists(long bucketId, String fileName) throws IOException
    +  {
    +    return managedStateContext.getFileAccess().exists(bucketId, fileName);
    +  }
    +
    +  protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId) throws IOException
    +  {
    +    return managedStateContext.getFileAccess().listFiles(bucketId);
    +  }
    +
    +  protected void delete(long bucketId, String fileName) throws IOException
    +  {
    +    managedStateContext.getFileAccess().delete(bucketId, fileName);
    +  }
    +
    +  protected void deleteBucket(long bucketId) throws IOException
    +  {
    +    managedStateContext.getFileAccess().deleteBucket(bucketId);
    +  }
    +
    +  /**
    +   * Saves data to a bucket. The data consists of key/values of all time-buckets of a
particular bucket.
    +   *
    +   * @param windowId        window id
    +   * @param bucketId        bucket id
    +   * @param data            data of all time-buckets
    +   * @throws IOException
    +   */
    +  protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue>
data) throws IOException
    +  {
    +    Table<Long, Slice, Bucket.BucketedValue> timeBucketedKeys = TreeBasedTable.create(Ordering.<Long>natural(),
    +        managedStateContext.getKeyComparator());
    +
    +    for (Map.Entry<Slice, Bucket.BucketedValue> entry : data.entrySet()) {
    +      long timeBucketId = entry.getValue().getTimeBucket();
    +      timeBucketedKeys.put(timeBucketId, entry.getKey(), entry.getValue());
    +    }
    +
    +    for (long timeBucket : timeBucketedKeys.rowKeySet()) {
    +      BucketsFileSystem.MutableTimeBucketMeta tbm = getOrCreateTimeBucketMeta(bucketId,
timeBucket);
    +      addBucketName(bucketId);
    +
    +      long dataSize = 0;
    +      Slice firstKey = null;
    +
    +      FileAccess.FileWriter fileWriter;
    +      String tmpFileName = getTmpFileName();
    +      if (tbm.getLastTransferredWindowId() == -1) {
    +        //A new time bucket so we append all the key/values to the new file
    +        fileWriter = getWriter(bucketId, tmpFileName);
    +
    +        for (Map.Entry<Slice, Bucket.BucketedValue> entry : timeBucketedKeys.row(timeBucket).entrySet())
{
    +          Slice key = entry.getKey();
    +          Slice value = entry.getValue().getValue();
    +
    +          dataSize += key.length;
    +          dataSize += value.length;
    +
    +          fileWriter.append(key.toByteArray(), value.toByteArray());
    +          if (firstKey == null) {
    +            firstKey = key;
    +          }
    +        }
    +      } else {
    +        //the time bucket existed so we need to read the file and then re-write it
    +        TreeMap<Slice, Slice> fileData = new TreeMap<>(managedStateContext.getKeyComparator());
    +        FileAccess.FileReader fileReader = getReader(bucketId, getFileName(timeBucket));
    +        fileReader.readFully(fileData);
    +        fileReader.close();
    +
    +        for (Map.Entry<Slice, Bucket.BucketedValue> entry : timeBucketedKeys.row(timeBucket).entrySet())
{
    +          fileData.put(entry.getKey(), entry.getValue().getValue());
    +        }
    +
    +        fileWriter = getWriter(bucketId, tmpFileName);
    +        for (Map.Entry<Slice, Slice> entry : fileData.entrySet()) {
    +          Slice key = entry.getKey();
    +          Slice value = entry.getValue();
    +
    +          dataSize += key.length;
    +          dataSize += value.length;
    +
    +          fileWriter.append(key.toByteArray(), value.toByteArray());
    +          if (firstKey == null) {
    +            firstKey = key;
    +          }
    +        }
    +      }
    +      fileWriter.close();
    +      rename(bucketId, tmpFileName, getFileName(timeBucket));
    +      tbm.updateTimeBucketMeta(windowId, dataSize, firstKey);
    +    }
    +
    +    updateBucketMetaFile(bucketId);
    +  }
    +
    +  /**
    +   * Retrieves the time bucket meta of a particular time-bucket. If the time bucket doesn't
exist then a new one
    +   * is created.
    +   *
    +   * @param bucketId     bucket id
    +   * @param timeBucketId time bucket id
    +   * @return time bucket meta of the time bucket
    +   * @throws IOException
    +   */
    +  @NotNull
    +  MutableTimeBucketMeta getOrCreateTimeBucketMeta(long bucketId, long timeBucketId) throws
IOException
    +  {
    +    synchronized (timeBucketsMeta) {
    +      MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId);
    +      if (tbm == null) {
    +        tbm = new MutableTimeBucketMeta(bucketId, timeBucketId);
    +        timeBucketsMeta.put(bucketId, timeBucketId, tbm);
    +      }
    +      return tbm;
    +    }
    +  }
    +
    +  protected void addBucketName(long bucketId)
    +  {
    +    bucketNamesOnFS.add(bucketId);
    +  }
    +
    +  /**
    +   * Returns the time bucket meta of a particular time-bucket which is immutable.
    +   *
    +   * @param bucketId     bucket id
    +   * @param timeBucketId time bucket id
    +   * @return immutable time bucket meta
    +   * @throws IOException
    +   */
    +  @Nullable
    +  public ImmutableTimeBucketMeta getTimeBucketMeta(long bucketId, long timeBucketId)
throws IOException
    +  {
    +    synchronized (timeBucketsMeta) {
    +      MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId);
    +      if (tbm != null) {
    +        return tbm.getImmutableTimeBucketMeta();
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private MutableTimeBucketMeta timeBucketMetaHelper(long bucketId, long timeBucketId)
throws IOException
    +  {
    +    MutableTimeBucketMeta tbm = timeBucketsMeta.get(bucketId, timeBucketId);
    +    if (tbm != null) {
    +      return tbm;
    +    }
    +    if (exists(bucketId, META_FILE_NAME)) {
    +      try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) {
    +        //Load meta info of all the time buckets of the bucket identified by bucketId.
    +        loadBucketMetaFile(bucketId, dis);
    +      }
    +    } else {
    +      return null;
    +    }
    +    return timeBucketsMeta.get(bucketId, timeBucketId);
    +  }
    +
    +  /**
    +   * Returns the meta information of all the time buckets in the bucket in descending
order - latest to oldest.
    +   *
    +   * @param bucketId bucket id
    +   * @return all the time buckets in order - latest to oldest
    +   */
    +  public TreeSet<ImmutableTimeBucketMeta> getAllTimeBuckets(long bucketId) throws
IOException
    +  {
    +    synchronized (timeBucketsMeta) {
    +      TreeSet<ImmutableTimeBucketMeta> immutableTimeBucketMetas = Sets.newTreeSet(
    +          Collections.<ImmutableTimeBucketMeta>reverseOrder());
    +
    +      if (timeBucketsMeta.containsRow(bucketId)) {
    +        for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet())
{
    +          immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
    +        }
    +        return immutableTimeBucketMetas;
    +      }
    +      if (exists(bucketId, META_FILE_NAME)) {
    +        try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) {
    +          //Load meta info of all the time buckets of the bucket identified by bucket
id
    +          loadBucketMetaFile(bucketId, dis);
    +          for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet())
{
    +            immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
    +          }
    +          return immutableTimeBucketMetas;
    +        }
    +      }
    +      return immutableTimeBucketMetas;
    +    }
    +  }
    +
    +  /**
    +   * Loads the bucket meta-file.
    +   *
    +   * @param bucketId bucket id
    +   * @param dis      data input stream
    +   * @throws IOException
    +   */
    +  private void loadBucketMetaFile(long bucketId, DataInputStream dis) throws IOException
    +  {
    +    int numberOfEntries = dis.readInt();
    +
    +    for (int i = 0; i < numberOfEntries; i++) {
    +      long timeBucketId = dis.readLong();
    +      long dataSize = dis.readLong();
    +      long lastTransferredWindow = dis.readLong();
    +
    +      MutableTimeBucketMeta tbm = new MutableTimeBucketMeta(bucketId, timeBucketId);
    +
    +      int sizeOfFirstKey = dis.readInt();
    +      byte[] firstKeyBytes = new byte[sizeOfFirstKey];
    +      dis.readFully(firstKeyBytes, 0, firstKeyBytes.length);
    +      tbm.updateTimeBucketMeta(lastTransferredWindow, dataSize, new Slice(firstKeyBytes));
    +
    +      timeBucketsMeta.put(bucketId, timeBucketId, tbm);
    +    }
    +  }
    +
    +  /**
    +   * Saves the updated bucket meta on disk.
    +   *
    +   * @param bucketId bucket id
    +   * @throws IOException
    +   */
    +  void updateBucketMetaFile(long bucketId) throws IOException
    +  {
    +    Map<Long, MutableTimeBucketMeta> timeBuckets;
    +    synchronized (timeBucketsMeta) {
    +      timeBuckets = timeBucketsMeta.row(bucketId);
    +    }
    --- End diff --
    
    Modification to a particular row is just being done by one thread which is the thread
that transfers data from a window file to data files. I will double check and add a comment



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message