apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hsy...@apache.org
Subject [1/7] incubator-apex-malhar git commit: Rename HDS to HDHT.
Date Mon, 30 Nov 2015 21:28:26 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 e24c14c3e -> 333a70733


Rename HDS to HDHT.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/4e47d236
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/4e47d236
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/4e47d236

Branch: refs/heads/devel-3
Commit: 4e47d236cbd8a1ca3c6dd96235cd588a7eb1d2e6
Parents: 217f8db
Author: thomas <thomas@datatorrent.com>
Authored: Mon Dec 8 15:22:57 2014 -0800
Committer: Chandni Singh <csingh@apache.org>
Committed: Mon Nov 23 21:37:01 2015 -0800

----------------------------------------------------------------------
 AbstractSinglePortHDSWriter.java | 194 ++++++++++++++++++++++++++++++++++
 HDHTFileAccess.java              | 122 +++++++++++++++++++++
 HDHTFileAccessFSImpl.java        | 125 ++++++++++++++++++++++
 tfile/DTFileReader.java          | 110 +++++++++++++++++++
 tfile/TFileImpl.java             | 176 ++++++++++++++++++++++++++++++
 tfile/TFileReader.java           | 110 +++++++++++++++++++
 tfile/TFileWriter.java           |  55 ++++++++++
 7 files changed, 892 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/AbstractSinglePortHDSWriter.java
----------------------------------------------------------------------
diff --git a/AbstractSinglePortHDSWriter.java b/AbstractSinglePortHDSWriter.java
new file mode 100644
index 0000000..04fa602
--- /dev/null
+++ b/AbstractSinglePortHDSWriter.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.Min;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.common.util.Slice;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+
+/**
+ * Operator that receives data on port and writes it to the data store.
+ * Implements partitioning, maps partition key to the store bucket.
+ * The derived class supplies the codec for partitioning and key-value serialization.
+ * @param <EVENT>
+ */
+public abstract class AbstractSinglePortHDSWriter<EVENT> extends HDHTWriter implements
Partitioner<AbstractSinglePortHDSWriter<EVENT>>
+{
+  public interface HDSCodec<EVENT> extends StreamCodec<EVENT>
+  {
+    byte[] getKeyBytes(EVENT event);
+    byte[] getValueBytes(EVENT event);
+    EVENT fromKeyValue(Slice key, byte[] value);
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractSinglePortHDSWriter.class);
+
+  protected int partitionMask;
+
+  protected Set<Integer> partitions;
+
+  protected transient HDSCodec<EVENT> codec;
+
+  @Min(1)
+  private int partitionCount = 1;
+
+  public final transient DefaultInputPort<EVENT> input = new DefaultInputPort<EVENT>()
+  {
+    @Override
+    public void process(EVENT event)
+    {
+      try {
+        processEvent(event);
+      } catch (IOException e) {
+        throw new RuntimeException("Error processing " + event, e);
+      }
+    }
+
+    @Override
+    public StreamCodec<EVENT> getStreamCodec()
+    {
+      return getCodec();
+    }
+  };
+
+  public void setPartitionCount(int partitionCount)
+  {
+    this.partitionCount = partitionCount;
+  }
+
+  public int getPartitionCount()
+  {
+    return partitionCount;
+  }
+
+  /**
+   * Storage bucket for the given event. Only one partition can write to a storage bucket
and by default it is
+   * identified by the partition id.
+   *
+   * @param event
+   * @return The bucket key.
+   */
+  protected long getBucketKey(EVENT event)
+  {
+    return (codec.getPartition(event) & partitionMask);
+  }
+
+  protected void processEvent(EVENT event) throws IOException
+  {
+    byte[] key = codec.getKeyBytes(event);
+    byte[] value = codec.getValueBytes(event);
+    super.put(getBucketKey(event), new Slice(key), value);
+  }
+
+  abstract protected HDSCodec<EVENT> getCodec();
+
+  @Override
+  public void setup(OperatorContext arg0)
+  {
+    LOG.debug("Store {} with partitions {} {}", super.getFileStore(), new PartitionKeys(this.partitionMask,
this.partitions));
+    super.setup(arg0);
+    try {
+      this.codec = getCodec();
+      // inject the operator reference, if such field exists
+      // TODO: replace with broader solution
+      Class<?> cls = this.codec.getClass();
+      while (cls != null) {
+        for (Field field : cls.getDeclaredFields()) {
+          if (field.getType().isAssignableFrom(this.getClass())) {
+            field.setAccessible(true);
+            field.set(this.codec, this);
+          }
+        }
+        cls = cls.getSuperclass();
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create codec", e);
+    }
+  }
+
+  @Override
+  public Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> definePartitions(Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>>
partitions, int incrementalCapacity)
+  {
+    boolean isInitialPartition = partitions.iterator().next().getStats() == null;
+
+    if (!isInitialPartition) {
+      // support for dynamic partitioning requires lineage tracking
+      LOG.warn("Dynamic partitioning not implemented");
+      return partitions;
+    }
+
+    int totalCount;
+
+    //Get the size of the partition for parallel partitioning
+    if(incrementalCapacity != 0) {
+      totalCount = incrementalCapacity;
+    }
+    //Do normal partitioning
+    else {
+      totalCount = partitionCount;
+    }
+
+    Kryo lKryo = new Kryo();
+    Collection<Partition<AbstractSinglePortHDSWriter<EVENT>>> newPartitions
= Lists.newArrayListWithExpectedSize(totalCount);
+    for (int i = 0; i < totalCount; i++) {
+      // Kryo.copy fails as it attempts to clone transient fields (input port)
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      Output output = new Output(bos);
+      lKryo.writeObject(output, this);
+      output.close();
+      Input lInput = new Input(bos.toByteArray());
+      @SuppressWarnings("unchecked")
+      AbstractSinglePortHDSWriter<EVENT> oper = lKryo.readObject(lInput, this.getClass());
+      newPartitions.add(new DefaultPartition<AbstractSinglePortHDSWriter<EVENT>>(oper));
+    }
+
+    // assign the partition keys
+    DefaultPartition.assignPartitionKeys(newPartitions, input);
+
+    for (Partition<AbstractSinglePortHDSWriter<EVENT>> p : newPartitions) {
+      PartitionKeys pks = p.getPartitionKeys().get(input);
+      p.getPartitionedInstance().partitionMask = pks.mask;
+      p.getPartitionedInstance().partitions = pks.partitions;
+    }
+
+    return newPartitions;
+  }
+
+  @Override
+  public void partitioned(Map<Integer, Partition<AbstractSinglePortHDSWriter<EVENT>>>
arg0)
+  {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/HDHTFileAccess.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java
new file mode 100644
index 0000000..fc3d56f
--- /dev/null
+++ b/HDHTFileAccess.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.TreeMap;
+
+import com.datatorrent.common.util.Slice;
+
+/**
+ * Abstraction for file system and format interaction.
+ */
+public interface HDHTFileAccess extends Closeable
+{
+  void init();
+
+  DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException;
+  DataInputStream getInputStream(long bucketKey, String fileName) throws IOException;
+
+  /**
+   * Atomic file rename.
+   * @param bucketKey
+   * @param oldName
+   * @param newName
+   * @throws IOException
+   */
+  void rename(long bucketKey, String oldName, String newName) throws IOException;
+  void delete(long bucketKey, String fileName) throws IOException;
+
+  long getFileSize(long bucketKey, String s) throws IOException;
+
+  /**
+   * HDHT Data File Format Reader
+   */
+  interface HDSFileReader extends Closeable
+  {
+    /**
+     * Read the entire contents of the underlying file into a TreeMap structure
+     * @param data
+     * @throws IOException
+     */
+    //Move to
+    // void readFully(TreeMap<Slice, Slice> data) throws IOException;
+    void readFully(TreeMap<Slice, byte[]> data) throws IOException;
+
+    /**
+     * Repositions the pointer to the beginning of the underlying file.
+     * @throws IOException
+     */
+    void reset() throws IOException;
+
+    /**
+     * Searches for a matching key, and positions the pointer before the start of the key.
+     * @param key Byte array representing the key
+     * @throws IOException
+     * @return true if a given key is found
+     */
+    boolean seek(Slice key) throws IOException;
+
+    /**
+     * Reads next available key/value pair starting from the current pointer position
+     * into Slice objects and advances pointer to next key.  If pointer is at the end
+     * of the file, false is returned, and Slice objects remains unmodified.
+     *
+     * @param key Empty slice object
+     * @param value Empty slice object
+     * @return True if key/value were successfully read, false otherwise
+     * @throws IOException
+     */
+    boolean next(Slice key, Slice value) throws IOException;
+
+  }
+
+  /**
+   * HDHT Data File Format Writer
+   */
+  interface HDSFileWriter extends Closeable {
+    /**
+     * Appends key/value pair to the underlying file.
+     * @param key
+     * @param value
+     * @throws IOException
+     */
+    void append(byte[] key, byte[] value) throws IOException;
+
+    /**
+     * Returns number of bytes written to the underlying stream.
+     * @return The bytes written.
+     * @throws IOException
+     */
+    long getBytesWritten() throws IOException;
+  }
+
+  /**
+   * Obtain a reader for the given data file. Since existing file formats may depend on the
file system directly (vs.
+   * work just based on InputStream), construction of the reader is part of the file system
abstraction itself.
+   */
+  public HDSFileReader getReader(long bucketKey, String fileName) throws IOException;
+
+  /**
+   * Obtain a writer for the given data file. Since existing file formats may depend on the
file system directly (vs.
+   * work just based on OutputStream), construction of the writer is part of the file system
abstraction itself.
+   */
+  public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/HDHTFileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java
new file mode 100644
index 0000000..ad9aa05
--- /dev/null
+++ b/HDHTFileAccessFSImpl.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht;
+
+import java.io.IOException;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.common.util.DTThrowable;
+
+/**
+ * Hadoop file system backed store.
+ */
+abstract public class HDHTFileAccessFSImpl implements HDHTFileAccess
+{
+  @NotNull
+  private String basePath;
+  protected transient FileSystem fs;
+
+  public HDHTFileAccessFSImpl()
+  {
+  }
+
+  public String getBasePath()
+  {
+    return basePath;
+  }
+
+  public void setBasePath(String path)
+  {
+    this.basePath = path;
+  }
+
+  protected Path getFilePath(long bucketKey, String fileName) {
+    return new Path(getBucketPath(bucketKey), fileName);
+  }
+
+  protected Path getBucketPath(long bucketKey)
+  {
+    return new Path(basePath, Long.toString(bucketKey));
+  }
+
+  @Override
+  public long getFileSize(long bucketKey, String fileName) throws IOException {
+    return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    fs.close();
+  }
+
+  @Override
+  public void init()
+  {
+    if (fs == null) {
+      Path dataFilePath = new Path(basePath);
+      try {
+        fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration());
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+  }
+
+  @Override
+  public void delete(long bucketKey, String fileName) throws IOException
+  {
+    fs.delete(getFilePath(bucketKey, fileName), true);
+  }
+
+  @Override
+  public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException
+  {
+    Path path = getFilePath(bucketKey, fileName);
+    return fs.create(path, true);
+  }
+
+  @Override
+  public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException
+  {
+    return fs.open(getFilePath(bucketKey, fileName));
+  }
+
+  @Override
+  public void rename(long bucketKey, String fromName, String toName) throws IOException
+  {
+    FileContext fc = FileContext.getFileContext(fs.getUri());
+    Path bucketPath = getBucketPath(bucketKey);
+    // file context requires absolute path
+    if (!bucketPath.isAbsolute()) {
+      bucketPath = new Path(fs.getWorkingDirectory(), bucketPath);
+    }
+    fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE);
+  }
+
+  @Override
+  public String toString()
+  {
+    return this.getClass().getSimpleName() + "[basePath=" + basePath + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/DTFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java
new file mode 100644
index 0000000..fefadaf
--- /dev/null
+++ b/tfile/DTFileReader.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
+import org.apache.hadoop.io.file.tfile.TFile;
+
+import com.datatorrent.common.util.Slice;
+import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
+
+/**
+ * {@link DTFile} wrapper for HDSFileReader
+ * <br>
+ * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader}
implementation
+ * <br>
+ * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}.
So there is no corresponding "DTFileWriter"
+ *
+ *
+ */
+public class DTFileReader implements HDSFileReader
+{
+  private final Reader reader;
+  private final Scanner scanner;
+  private final FSDataInputStream fsdis;
+
+  public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws
IOException
+  {
+    this.fsdis = fsdis;
+    reader = new Reader(fsdis, fileLength, conf);
+    scanner = reader.createScanner();
+  }
+
+  /**
+   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+   * @see java.io.Closeable#close()
+   */
+  @Override
+  public void close() throws IOException
+  {
+    scanner.close();
+    reader.close();
+    fsdis.close();
+  }
+
+  @Override
+  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+  {
+    scanner.rewind();
+    for (; !scanner.atEnd(); scanner.advance()) {
+      Entry en = scanner.entry();
+      Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength());
+      byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset()
+ en.getValueLength());
+      data.put(key, value);
+    }
+
+  }
+
+  @Override
+  public void reset() throws IOException
+  {
+    scanner.rewind();
+  }
+
+  @Override
+  public boolean seek(Slice key) throws IOException
+  {
+    return scanner.seekTo(key.buffer, key.offset, key.length);
+  }
+
+  @Override
+  public boolean next(Slice key, Slice value) throws IOException
+  {
+    if (scanner.atEnd()) return false;
+    Entry en = scanner.entry();
+
+    key.buffer = en.getBlockBuffer();
+    key.offset = en.getKeyOffset();
+    key.length = en.getKeyLength();
+
+    value.buffer = en.getBlockBuffer();
+    value.offset = en.getValueOffset();
+    value.length = en.getValueLength();
+
+    scanner.advance();
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileImpl.java
----------------------------------------------------------------------
diff --git a/tfile/TFileImpl.java b/tfile/TFileImpl.java
new file mode 100644
index 0000000..714a5b1
--- /dev/null
+++ b/tfile/TFileImpl.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+import com.datatorrent.contrib.hdht.HDHTFileAccessFSImpl;
+
+/**
+ * A TFile wrapper with HDHTFileAccess API
+ * <ul>
+ * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link
Writer} for IO operations</li> 
+ * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which
is faster than default TFile reader) and {@link Writer} for IO operations</li> 
+ * </ul>
+ *
+ */
+public abstract class TFileImpl extends HDHTFileAccessFSImpl
+{
+  private int minBlockSize = 64 * 1024;
+
+  private String compressName = TFile.COMPRESSION_NONE;
+  
+  private String comparator = "memcmp";
+  
+  private int chunkSize = 1024 * 1024;
+  
+  private int inputBufferSize = 256 * 1024;
+  
+  private int outputBufferSize = 256 * 1024;
+
+  
+  private void setupConfig(Configuration conf)
+  {
+    conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
+    conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize));
+    conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize));
+  }
+
+
+  @Override
+  public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException
+  {
+    FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName);
+    setupConfig(fs.getConf());
+    return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
+  }
+  
+  public int getMinBlockSize()
+  {
+    return minBlockSize;
+  }
+
+
+  public void setMinBlockSize(int minBlockSize)
+  {
+    this.minBlockSize = minBlockSize;
+  }
+
+
+  public String getCompressName()
+  {
+    return compressName;
+  }
+
+
+  public void setCompressName(String compressName)
+  {
+    this.compressName = compressName;
+  }
+
+
+  public String getComparator()
+  {
+    return comparator;
+  }
+
+
+  public void setComparator(String comparator)
+  {
+    this.comparator = comparator;
+  }
+
+
+  public int getChunkSize()
+  {
+    return chunkSize;
+  }
+
+
+  public void setChunkSize(int chunkSize)
+  {
+    this.chunkSize = chunkSize;
+  }
+
+
+  public int getInputBufferSize()
+  {
+    return inputBufferSize;
+  }
+
+
+  public void setInputBufferSize(int inputBufferSize)
+  {
+    this.inputBufferSize = inputBufferSize;
+  }
+
+
+  public int getOutputBufferSize()
+  {
+    return outputBufferSize;
+  }
+
+
+  public void setOutputBufferSize(int outputBufferSize)
+  {
+    this.outputBufferSize = outputBufferSize;
+  }
+  
+  /**
+   * Return {@link TFile} {@link Reader}
+   *
+   */
+  public static class DefaultTFileImpl extends TFileImpl{
+    
+    @Override
+    public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
+    {
+      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
+      long fileLength = getFileSize(bucketKey, fileName);
+      super.setupConfig(fs.getConf());
+      return new TFileReader(fsdis, fileLength, fs.getConf());
+    }
+    
+  }
+  
+  
+  /**
+   * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
+   *
+   */
+  public static class DTFileImpl extends TFileImpl {
+    
+    @Override
+    public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
+    {
+      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
+      long fileLength = getFileSize(bucketKey, fileName);
+      super.setupConfig(fs.getConf());
+      return new DTFileReader(fsdis, fileLength, fs.getConf());
+    }
+    
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
new file mode 100644
index 0000000..d20408c
--- /dev/null
+++ b/tfile/TFileReader.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+import java.util.TreeMap;
+
+import com.datatorrent.common.util.Slice;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
+
+import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
+
+public class TFileReader implements HDSFileReader
+{
+
+  private final Reader reader;
+  private final Scanner scanner;
+  private final FSDataInputStream fsdis;
+
+  public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws
IOException
+  {
+    this.fsdis = fsdis;
+    reader = new Reader(fsdis, fileLength, conf);
+    scanner = reader.createScanner();
+  }
+
+  /**
+   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+   * @see java.io.Closeable#close()
+   */
+  @Override
+  public void close() throws IOException
+  {
+    scanner.close();
+    reader.close();
+    fsdis.close();
+  }
+
+  @Override
+  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+  {
+    scanner.rewind();
+    for (; !scanner.atEnd(); scanner.advance()) {
+      Entry en = scanner.entry();
+      int klen = en.getKeyLength();
+      int vlen = en.getValueLength();
+      byte[] key = new byte[klen];
+      byte[] value = new byte[vlen];
+      en.getKey(key);
+      en.getValue(value);
+      data.put(new Slice(key, 0, key.length), value);
+    }
+
+  }
+
+  @Override
+  public void reset() throws IOException
+  {
+    scanner.rewind();
+  }
+
+  @Override
+  public boolean seek(Slice key) throws IOException
+  {
+    return scanner.seekTo(key.buffer, key.offset, key.length);
+  }
+
+  @Override
+  public boolean next(Slice key, Slice value) throws IOException
+  {
+    if (scanner.atEnd()) return false;
+    Entry en = scanner.entry();
+    byte[] rkey = new byte[en.getKeyLength()];
+    byte[] rval = new byte[en.getValueLength()];
+    en.getKey(rkey);
+    en.getValue(rval);
+
+    key.buffer = rkey;
+    key.offset = 0;
+    key.length = en.getKeyLength();
+
+    value.buffer = rval;
+    value.offset = 0;
+    value.length = en.getValueLength();
+
+    scanner.advance();
+    return true;
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/4e47d236/tfile/TFileWriter.java
----------------------------------------------------------------------
diff --git a/tfile/TFileWriter.java b/tfile/TFileWriter.java
new file mode 100644
index 0000000..b6fd90d
--- /dev/null
+++ b/tfile/TFileWriter.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hdht.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileWriter;
+
+public final class TFileWriter implements HDSFileWriter
+{
+  private Writer writer;
+  
+  private FSDataOutputStream fsdos;
+  
+  public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String
comparator, Configuration conf) throws IOException
+  {
+    this.fsdos = stream;
+    writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
+    
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    writer.close();
+    fsdos.close();
+  }
+
+  @Override
+  public void append(byte[] key, byte[] value) throws IOException
+  {
+    writer.append(key, value);
+  }
+
+  @Override
+  public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
+
+}


Mime
View raw message