apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hsy...@apache.org
Subject [6/7] incubator-apex-malhar git commit: MLHR-1916 #resolve #comment Added back the FileAccess api and its implementations
Date Mon, 30 Nov 2015 21:28:31 GMT
MLHR-1916 #resolve #comment Added back the FileAccess api and its implementations


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/7d2f4749
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/7d2f4749
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/7d2f4749

Branch: refs/heads/devel-3
Commit: 7d2f47491498c6b1c550f70e626dd76ba1db393e
Parents: c787461
Author: MalharJenkins <jenkins@datatorrent.com>
Authored: Mon Nov 23 21:14:41 2015 -0800
Committer: Chandni Singh <csingh@apache.org>
Committed: Mon Nov 23 22:11:18 2015 -0800

----------------------------------------------------------------------
 HDHTFileAccess.java                             | 124 -------------
 HDHTFileAccessFSImpl.java                       | 127 -------------
 .../lib/fileaccess/DTFileReader.java            | 112 ++++++++++++
 .../datatorrent/lib/fileaccess/FileAccess.java  | 129 ++++++++++++++
 .../lib/fileaccess/FileAccessFSImpl.java        | 130 ++++++++++++++
 .../datatorrent/lib/fileaccess/TFileImpl.java   | 178 +++++++++++++++++++
 .../datatorrent/lib/fileaccess/TFileReader.java | 125 +++++++++++++
 .../datatorrent/lib/fileaccess/TFileWriter.java |  61 +++++++
 pom.xml                                         |   2 +-
 tfile/DTFileReader.java                         | 111 ------------
 tfile/TFileImpl.java                            | 177 ------------------
 tfile/TFileReader.java                          | 125 -------------
 tfile/TFileWriter.java                          |  60 -------
 13 files changed, 736 insertions(+), 725 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/HDHTFileAccess.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccess.java b/HDHTFileAccess.java
deleted file mode 100644
index 266ba75..0000000
--- a/HDHTFileAccess.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht;
-
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.TreeMap;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * Abstraction for file system and format interaction.
- *
- * @since 2.0.0
- */
-public interface HDHTFileAccess extends Closeable
-{
-  void init();
-
-  DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException;
-  DataInputStream getInputStream(long bucketKey, String fileName) throws IOException;
-
-  /**
-   * Atomic file rename.
-   * @param bucketKey
-   * @param oldName
-   * @param newName
-   * @throws IOException
-   */
-  void rename(long bucketKey, String oldName, String newName) throws IOException;
-  void delete(long bucketKey, String fileName) throws IOException;
-
-  long getFileSize(long bucketKey, String s) throws IOException;
-
-  /**
-   * HDHT Data File Format Reader
-   */
-  interface HDSFileReader extends Closeable
-  {
-    /**
-     * Read the entire contents of the underlying file into a TreeMap structure
-     * @param data
-     * @throws IOException
-     */
-    //Move to
-    // void readFully(TreeMap<Slice, Slice> data) throws IOException;
-    void readFully(TreeMap<Slice, byte[]> data) throws IOException;
-
-    /**
-     * Repositions the pointer to the beginning of the underlying file.
-     * @throws IOException
-     */
-    void reset() throws IOException;
-
-    /**
-     * Searches for a matching key, and positions the pointer before the start of the key.
-     * @param key Byte array representing the key
-     * @throws IOException
-     * @return true if a given key is found
-     */
-    boolean seek(Slice key) throws IOException;
-
-    /**
-     * Reads next available key/value pair starting from the current pointer position
-     * into Slice objects and advances pointer to next key.  If pointer is at the end
-     * of the file, false is returned, and Slice objects remains unmodified.
-     *
-     * @param key Empty slice object
-     * @param value Empty slice object
-     * @return True if key/value were successfully read, false otherwise
-     * @throws IOException
-     */
-    boolean next(Slice key, Slice value) throws IOException;
-
-  }
-
-  /**
-   * HDHT Data File Format Writer
-   */
-  interface HDSFileWriter extends Closeable {
-    /**
-     * Appends key/value pair to the underlying file.
-     * @param key
-     * @param value
-     * @throws IOException
-     */
-    void append(byte[] key, byte[] value) throws IOException;
-
-    /**
-     * Returns number of bytes written to the underlying stream.
-     * @return The bytes written.
-     * @throws IOException
-     */
-    long getBytesWritten() throws IOException;
-  }
-
-  /**
-   * Obtain a reader for the given data file. Since existing file formats may depend on the
file system directly (vs.
-   * work just based on InputStream), construction of the reader is part of the file system
abstraction itself.
-   */
-  public HDSFileReader getReader(long bucketKey, String fileName) throws IOException;
-
-  /**
-   * Obtain a writer for the given data file. Since existing file formats may depend on the
file system directly (vs.
-   * work just based on OutputStream), construction of the writer is part of the file system
abstraction itself.
-   */
-  public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/HDHTFileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/HDHTFileAccessFSImpl.java b/HDHTFileAccessFSImpl.java
deleted file mode 100644
index 13dd0ad..0000000
--- a/HDHTFileAccessFSImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht;
-
-import java.io.IOException;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * Hadoop file system backed store.
- *
- * @since 2.0.0
- */
-abstract public class HDHTFileAccessFSImpl implements HDHTFileAccess
-{
-  @NotNull
-  private String basePath;
-  protected transient FileSystem fs;
-
-  public HDHTFileAccessFSImpl()
-  {
-  }
-
-  public String getBasePath()
-  {
-    return basePath;
-  }
-
-  public void setBasePath(String path)
-  {
-    this.basePath = path;
-  }
-
-  protected Path getFilePath(long bucketKey, String fileName) {
-    return new Path(getBucketPath(bucketKey), fileName);
-  }
-
-  protected Path getBucketPath(long bucketKey)
-  {
-    return new Path(basePath, Long.toString(bucketKey));
-  }
-
-  @Override
-  public long getFileSize(long bucketKey, String fileName) throws IOException {
-    return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
-  }
-
-  @Override
-  public void close() throws IOException
-  {
-    fs.close();
-  }
-
-  @Override
-  public void init()
-  {
-    if (fs == null) {
-      Path dataFilePath = new Path(basePath);
-      try {
-        fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration());
-      } catch (IOException e) {
-        DTThrowable.rethrow(e);
-      }
-    }
-  }
-
-  @Override
-  public void delete(long bucketKey, String fileName) throws IOException
-  {
-    fs.delete(getFilePath(bucketKey, fileName), true);
-  }
-
-  @Override
-  public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException
-  {
-    Path path = getFilePath(bucketKey, fileName);
-    return fs.create(path, true);
-  }
-
-  @Override
-  public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException
-  {
-    return fs.open(getFilePath(bucketKey, fileName));
-  }
-
-  @Override
-  public void rename(long bucketKey, String fromName, String toName) throws IOException
-  {
-    FileContext fc = FileContext.getFileContext(fs.getUri());
-    Path bucketPath = getBucketPath(bucketKey);
-    // file context requires absolute path
-    if (!bucketPath.isAbsolute()) {
-      bucketPath = new Path(fs.getWorkingDirectory(), bucketPath);
-    }
-    fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE);
-  }
-
-  @Override
-  public String toString()
-  {
-    return this.getClass().getSimpleName() + "[basePath=" + basePath + "]";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
new file mode 100644
index 0000000..cb97520
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/DTFileReader.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
+import org.apache.hadoop.io.file.tfile.TFile;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * {@link DTFile} wrapper for HDSFileReader
+ * <br>
+ * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader}
implementation
+ * <br>
+ * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}.
So there is no corresponding "DTFileWriter"
+ *
+ *
+ * @since 2.0.0
+ */
+public class DTFileReader implements FileAccess.FileReader
+{
+  private final Reader reader;
+  private final Scanner scanner;
+  private final FSDataInputStream fsdis;
+
+  public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws
IOException
+  {
+    this.fsdis = fsdis;
+    reader = new Reader(fsdis, fileLength, conf);
+    scanner = reader.createScanner();
+  }
+
+  /**
+   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+   * @see java.io.Closeable#close()
+   */
+  @Override
+  public void close() throws IOException
+  {
+    scanner.close();
+    reader.close();
+    fsdis.close();
+  }
+
+  @Override
+  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+  {
+    scanner.rewind();
+    for (; !scanner.atEnd(); scanner.advance()) {
+      Entry en = scanner.entry();
+      Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength());
+      byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset()
+ en.getValueLength());
+      data.put(key, value);
+    }
+
+  }
+
+  @Override
+  public void reset() throws IOException
+  {
+    scanner.rewind();
+  }
+
+  @Override
+  public boolean seek(Slice key) throws IOException
+  {
+    return scanner.seekTo(key.buffer, key.offset, key.length);
+  }
+
+  @Override
+  public boolean next(Slice key, Slice value) throws IOException
+  {
+    if (scanner.atEnd()) return false;
+    Entry en = scanner.entry();
+
+    key.buffer = en.getBlockBuffer();
+    key.offset = en.getKeyOffset();
+    key.length = en.getKeyLength();
+
+    value.buffer = en.getBlockBuffer();
+    value.offset = en.getValueOffset();
+    value.length = en.getValueLength();
+
+    scanner.advance();
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
new file mode 100644
index 0000000..4b7f6e5
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.TreeMap;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Abstraction for file system and format interaction.
+ *
+ * @since 2.0.0
+ */
+public interface FileAccess extends Closeable
+{
+  void init();
+
+  DataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException;
+
+  DataInputStream getInputStream(long bucketKey, String fileName) throws IOException;
+
+  /**
+   * Atomic file rename.
+   * @param bucketKey
+   * @param oldName
+   * @param newName
+   * @throws IOException
+   */
+  void rename(long bucketKey, String oldName, String newName) throws IOException;
+  void delete(long bucketKey, String fileName) throws IOException;
+
+  long getFileSize(long bucketKey, String s) throws IOException;
+
+  /**
+   * Data File Format Reader
+   */
+  interface FileReader extends Closeable
+  {
+    /**
+     * Read the entire contents of the underlying file into a TreeMap structure
+     * @param data
+     * @throws IOException
+     */
+    //Move to
+    // void readFully(TreeMap<Slice, Slice> data) throws IOException;
+    void readFully(TreeMap<Slice, byte[]> data) throws IOException;
+
+    /**
+     * Repositions the pointer to the beginning of the underlying file.
+     * @throws IOException
+     */
+    void reset() throws IOException;
+
+    /**
+     * Searches for a matching key, and positions the pointer before the start of the key.
+     * @param key Byte array representing the key
+     * @throws IOException
+     * @return true if a given key is found
+     */
+    boolean seek(Slice key) throws IOException;
+
+    /**
+     * Reads next available key/value pair starting from the current pointer position
+     * into Slice objects and advances pointer to next key.  If pointer is at the end
+     * of the file, false is returned, and Slice objects remains unmodified.
+     *
+     * @param key Empty slice object
+     * @param value Empty slice object
+     * @return True if key/value were successfully read, false otherwise
+     * @throws IOException
+     */
+    boolean next(Slice key, Slice value) throws IOException;
+
+  }
+
+  /**
+   * Data File Format Writer
+   */
+  interface FileWriter extends Closeable
+  {
+    /**
+     * Appends key/value pair to the underlying file.
+     * @param key
+     * @param value
+     * @throws IOException
+     */
+    void append(byte[] key, byte[] value) throws IOException;
+
+    /**
+     * Returns number of bytes written to the underlying stream.
+     * @return The bytes written.
+     * @throws IOException
+     */
+    long getBytesWritten() throws IOException;
+  }
+
+  /**
+   * Obtain a reader for the given data file. Since existing file formats may depend on the
file system directly (vs.
+   * work just based on InputStream), construction of the reader is part of the file system
abstraction itself.
+   */
+  public FileReader getReader(long bucketKey, String fileName) throws IOException;
+
+  /**
+   * Obtain a writer for the given data file. Since existing file formats may depend on the
file system directly (vs.
+   * work just based on OutputStream), construction of the writer is part of the file system
abstraction itself.
+   */
+  public FileWriter getWriter(long bucketKey, String fileName) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
new file mode 100644
index 0000000..80a201a
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Hadoop file system backed store.
+ *
+ * @since 2.0.0
+ */
+public abstract class FileAccessFSImpl implements FileAccess
+{
+  @NotNull
+  private String basePath;
+  protected transient FileSystem fs;
+
+  public FileAccessFSImpl()
+  {
+  }
+
+  public String getBasePath()
+  {
+    return basePath;
+  }
+
+  public void setBasePath(String path)
+  {
+    this.basePath = path;
+  }
+
+  protected Path getFilePath(long bucketKey, String fileName) {
+    return new Path(getBucketPath(bucketKey), fileName);
+  }
+
+  protected Path getBucketPath(long bucketKey)
+  {
+    return new Path(basePath, Long.toString(bucketKey));
+  }
+
+  @Override
+  public long getFileSize(long bucketKey, String fileName) throws IOException {
+    return fs.getFileStatus(getFilePath(bucketKey, fileName)).getLen();
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    fs.close();
+  }
+
+  @Override
+  public void init()
+  {
+    if (fs == null) {
+      Path dataFilePath = new Path(basePath);
+      try {
+        fs = FileSystem.newInstance(dataFilePath.toUri(), new Configuration());
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+  }
+
+  @Override
+  public void delete(long bucketKey, String fileName) throws IOException
+  {
+    fs.delete(getFilePath(bucketKey, fileName), true);
+  }
+
+  @Override
+  public FSDataOutputStream getOutputStream(long bucketKey, String fileName) throws IOException
+  {
+    Path path = getFilePath(bucketKey, fileName);
+    return fs.create(path, true);
+  }
+
+  @Override
+  public FSDataInputStream getInputStream(long bucketKey, String fileName) throws IOException
+  {
+    return fs.open(getFilePath(bucketKey, fileName));
+  }
+
+  @Override
+  public void rename(long bucketKey, String fromName, String toName) throws IOException
+  {
+    FileContext fc = FileContext.getFileContext(fs.getUri());
+    Path bucketPath = getBucketPath(bucketKey);
+    // file context requires absolute path
+    if (!bucketPath.isAbsolute()) {
+      bucketPath = new Path(fs.getWorkingDirectory(), bucketPath);
+    }
+    fc.rename(new Path(bucketPath, fromName), new Path(bucketPath, toName), Rename.OVERWRITE);
+  }
+
+  @Override
+  public String toString()
+  {
+    return this.getClass().getSimpleName() + "[basePath=" + basePath + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
new file mode 100644
index 0000000..5526832
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.DTFile;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+/**
+ * A TFile wrapper with FileAccess API
+ * <ul>
+ * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link
Writer} for IO operations</li> 
+ * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which
is faster than default TFile reader) and {@link Writer} for IO operations</li> 
+ * </ul>
+ *
+ * @since 2.0.0
+ */
+public abstract class TFileImpl extends FileAccessFSImpl
+{
+  private int minBlockSize = 64 * 1024;
+
+  private String compressName = TFile.COMPRESSION_NONE;
+  
+  private String comparator = "memcmp";
+  
+  private int chunkSize = 1024 * 1024;
+  
+  private int inputBufferSize = 256 * 1024;
+  
+  private int outputBufferSize = 256 * 1024;
+
+  
+  private void setupConfig(Configuration conf)
+  {
+    conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
+    conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize));
+    conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize));
+  }
+
+
+  @Override
+  public FileWriter getWriter(long bucketKey, String fileName) throws IOException
+  {
+    FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName);
+    setupConfig(fs.getConf());
+    return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
+  }
+  
+  public int getMinBlockSize()
+  {
+    return minBlockSize;
+  }
+
+
+  public void setMinBlockSize(int minBlockSize)
+  {
+    this.minBlockSize = minBlockSize;
+  }
+
+
+  public String getCompressName()
+  {
+    return compressName;
+  }
+
+
+  public void setCompressName(String compressName)
+  {
+    this.compressName = compressName;
+  }
+
+
+  public String getComparator()
+  {
+    return comparator;
+  }
+
+
+  public void setComparator(String comparator)
+  {
+    this.comparator = comparator;
+  }
+
+
+  public int getChunkSize()
+  {
+    return chunkSize;
+  }
+
+
+  public void setChunkSize(int chunkSize)
+  {
+    this.chunkSize = chunkSize;
+  }
+
+
+  public int getInputBufferSize()
+  {
+    return inputBufferSize;
+  }
+
+
+  public void setInputBufferSize(int inputBufferSize)
+  {
+    this.inputBufferSize = inputBufferSize;
+  }
+
+
+  public int getOutputBufferSize()
+  {
+    return outputBufferSize;
+  }
+
+
+  public void setOutputBufferSize(int outputBufferSize)
+  {
+    this.outputBufferSize = outputBufferSize;
+  }
+  
+  /**
+   * Return {@link TFile} {@link Reader}
+   *
+   */
+  public static class DefaultTFileImpl extends TFileImpl{
+    
+    @Override
+    public FileReader getReader(long bucketKey, String fileName) throws IOException
+    {
+      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
+      long fileLength = getFileSize(bucketKey, fileName);
+      super.setupConfig(fs.getConf());
+      return new TFileReader(fsdis, fileLength, fs.getConf());
+    }
+    
+  }
+  
+  
+  /**
+   * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
+   *
+   */
+  public static class DTFileImpl extends TFileImpl {
+    
+    @Override
+    public FileReader getReader(long bucketKey, String fileName) throws IOException
+    {
+      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
+      long fileLength = getFileSize(bucketKey, fileName);
+      super.setupConfig(fs.getConf());
+      return new DTFileReader(fsdis, fileLength, fs.getConf());
+    }
+    
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
new file mode 100644
index 0000000..8426c3f
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileReader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * TFileReader
+ *
+ * @since 2.0.0
+ */
+public class TFileReader implements FileAccess.FileReader
+{
+
+  private final Reader reader;
+  private final Scanner scanner;
+  private final FSDataInputStream fsdis;
+  private boolean closed = false;
+
+  public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws
IOException
+  {
+    this.fsdis = fsdis;
+    reader = new Reader(fsdis, fileLength, conf);
+    scanner = reader.createScanner();
+  }
+
+  /**
+   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
+   * @see java.io.Closeable#close()
+   */
+  @Override
+  public void close() throws IOException
+  {
+    closed = true;
+    scanner.close();
+    reader.close();
+    fsdis.close();
+  }
+
+  @Override
+  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
+  {
+    scanner.rewind();
+    for (; !scanner.atEnd(); scanner.advance()) {
+      Entry en = scanner.entry();
+      int klen = en.getKeyLength();
+      int vlen = en.getValueLength();
+      byte[] key = new byte[klen];
+      byte[] value = new byte[vlen];
+      en.getKey(key);
+      en.getValue(value);
+      data.put(new Slice(key, 0, key.length), value);
+    }
+
+  }
+
+  @Override
+  public void reset() throws IOException
+  {
+    scanner.rewind();
+  }
+
+  @Override
+  public boolean seek(Slice key) throws IOException
+  {
+    try {
+      return scanner.seekTo(key.buffer, key.offset, key.length);
+    } catch (NullPointerException ex) {
+      if (closed)
+        throw new IOException("Stream was closed");
+      else
+        throw ex;
+    }
+  }
+
+  @Override
+  public boolean next(Slice key, Slice value) throws IOException
+  {
+    if (scanner.atEnd()) return false;
+    Entry en = scanner.entry();
+    byte[] rkey = new byte[en.getKeyLength()];
+    byte[] rval = new byte[en.getValueLength()];
+    en.getKey(rkey);
+    en.getValue(rval);
+
+    key.buffer = rkey;
+    key.offset = 0;
+    key.length = en.getKeyLength();
+
+    value.buffer = rval;
+    value.offset = 0;
+    value.length = en.getValueLength();
+
+    scanner.advance();
+    return true;
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
new file mode 100644
index 0000000..b362987
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.fileaccess;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+/**
+ * TFileWriter
+ *
+ * @since 2.0.0
+ */
+public final class TFileWriter implements FileAccess.FileWriter
+{
+  private Writer writer;
+  
+  private FSDataOutputStream fsdos;
+  
+  public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String
comparator, Configuration conf) throws IOException
+  {
+    this.fsdos = stream;
+    writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
+    
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    writer.close();
+    fsdos.close();
+  }
+
+  @Override
+  public void append(byte[] key, byte[] value) throws IOException
+  {
+    writer.append(key, value);
+  }
+
+  @Override
+  public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 92466ab..678540d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -144,7 +144,7 @@
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-checkstyle-plugin</artifactId>
             <configuration>
-              <maxAllowedViolations>8768</maxAllowedViolations>
+              <maxAllowedViolations>8789</maxAllowedViolations>
             </configuration>
           </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/DTFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/DTFileReader.java b/tfile/DTFileReader.java
deleted file mode 100644
index e61d475..0000000
--- a/tfile/DTFileReader.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.file.tfile.DTFile;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner;
-import org.apache.hadoop.io.file.tfile.DTFile.Reader.Scanner.Entry;
-import org.apache.hadoop.io.file.tfile.TFile;
-
-import com.datatorrent.netlet.util.Slice;
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
-
-/**
- * {@link DTFile} wrapper for HDSFileReader
- * <br>
- * {@link DTFile} has exact same format as {@link TFile} with a much faster {@link Reader}
implementation
- * <br>
- * DTFileReader is also fully compatible with any file generated by {@link TFileWriter}.
So there is no corresponding "DTFileWriter"
- *
- *
- * @since 2.0.0
- */
-public class DTFileReader implements HDSFileReader
-{
-  private final Reader reader;
-  private final Scanner scanner;
-  private final FSDataInputStream fsdis;
-
-  public DTFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws
IOException
-  {
-    this.fsdis = fsdis;
-    reader = new Reader(fsdis, fileLength, conf);
-    scanner = reader.createScanner();
-  }
-
-  /**
-   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
-   * @see java.io.Closeable#close()
-   */
-  @Override
-  public void close() throws IOException
-  {
-    scanner.close();
-    reader.close();
-    fsdis.close();
-  }
-
-  @Override
-  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
-  {
-    scanner.rewind();
-    for (; !scanner.atEnd(); scanner.advance()) {
-      Entry en = scanner.entry();
-      Slice key = new Slice(en.getBlockBuffer(), en.getKeyOffset(), en.getKeyLength());
-      byte[] value = Arrays.copyOfRange(en.getBlockBuffer(), en.getValueOffset(), en.getValueOffset()
+ en.getValueLength());
-      data.put(key, value);
-    }
-
-  }
-
-  @Override
-  public void reset() throws IOException
-  {
-    scanner.rewind();
-  }
-
-  @Override
-  public boolean seek(Slice key) throws IOException
-  {
-    return scanner.seekTo(key.buffer, key.offset, key.length);
-  }
-
-  @Override
-  public boolean next(Slice key, Slice value) throws IOException
-  {
-    if (scanner.atEnd()) return false;
-    Entry en = scanner.entry();
-
-    key.buffer = en.getBlockBuffer();
-    key.offset = en.getKeyOffset();
-    key.length = en.getKeyLength();
-
-    value.buffer = en.getBlockBuffer();
-    value.offset = en.getValueOffset();
-    value.length = en.getValueLength();
-
-    scanner.advance();
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileImpl.java
----------------------------------------------------------------------
diff --git a/tfile/TFileImpl.java b/tfile/TFileImpl.java
deleted file mode 100644
index 5dc9464..0000000
--- a/tfile/TFileImpl.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.file.tfile.DTFile;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.TFile.Reader;
-import org.apache.hadoop.io.file.tfile.TFile.Writer;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccessFSImpl;
-
-/**
- * A TFile wrapper with HDHTFileAccess API
- * <ul>
- * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link
Writer} for IO operations</li> 
- * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which
is faster than default TFile reader) and {@link Writer} for IO operations</li> 
- * </ul>
- *
- * @since 2.0.0
- */
-public abstract class TFileImpl extends HDHTFileAccessFSImpl
-{
-  private int minBlockSize = 64 * 1024;
-
-  private String compressName = TFile.COMPRESSION_NONE;
-  
-  private String comparator = "memcmp";
-  
-  private int chunkSize = 1024 * 1024;
-  
-  private int inputBufferSize = 256 * 1024;
-  
-  private int outputBufferSize = 256 * 1024;
-
-  
-  private void setupConfig(Configuration conf)
-  {
-    conf.set("tfile.io.chunk.size", String.valueOf(chunkSize));
-    conf.set("tfile.fs.input.buffer.size", String.valueOf(inputBufferSize));
-    conf.set("tfile.fs.output.buffer.size", String.valueOf(outputBufferSize));
-  }
-
-
-  @Override
-  public HDSFileWriter getWriter(long bucketKey, String fileName) throws IOException
-  {
-    FSDataOutputStream fsdos = getOutputStream(bucketKey, fileName);
-    setupConfig(fs.getConf());
-    return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf());
-  }
-  
-  public int getMinBlockSize()
-  {
-    return minBlockSize;
-  }
-
-
-  public void setMinBlockSize(int minBlockSize)
-  {
-    this.minBlockSize = minBlockSize;
-  }
-
-
-  public String getCompressName()
-  {
-    return compressName;
-  }
-
-
-  public void setCompressName(String compressName)
-  {
-    this.compressName = compressName;
-  }
-
-
-  public String getComparator()
-  {
-    return comparator;
-  }
-
-
-  public void setComparator(String comparator)
-  {
-    this.comparator = comparator;
-  }
-
-
-  public int getChunkSize()
-  {
-    return chunkSize;
-  }
-
-
-  public void setChunkSize(int chunkSize)
-  {
-    this.chunkSize = chunkSize;
-  }
-
-
-  public int getInputBufferSize()
-  {
-    return inputBufferSize;
-  }
-
-
-  public void setInputBufferSize(int inputBufferSize)
-  {
-    this.inputBufferSize = inputBufferSize;
-  }
-
-
-  public int getOutputBufferSize()
-  {
-    return outputBufferSize;
-  }
-
-
-  public void setOutputBufferSize(int outputBufferSize)
-  {
-    this.outputBufferSize = outputBufferSize;
-  }
-  
-  /**
-   * Return {@link TFile} {@link Reader}
-   *
-   */
-  public static class DefaultTFileImpl extends TFileImpl{
-    
-    @Override
-    public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
-    {
-      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
-      long fileLength = getFileSize(bucketKey, fileName);
-      super.setupConfig(fs.getConf());
-      return new TFileReader(fsdis, fileLength, fs.getConf());
-    }
-    
-  }
-  
-  
-  /**
-   * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}
-   *
-   */
-  public static class DTFileImpl extends TFileImpl {
-    
-    @Override
-    public HDSFileReader getReader(long bucketKey, String fileName) throws IOException
-    {
-      FSDataInputStream fsdis =  getInputStream(bucketKey, fileName);
-      long fileLength = getFileSize(bucketKey, fileName);
-      super.setupConfig(fs.getConf());
-      return new DTFileReader(fsdis, fileLength, fs.getConf());
-    }
-    
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileReader.java
----------------------------------------------------------------------
diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java
deleted file mode 100644
index 0994666..0000000
--- a/tfile/TFileReader.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-import java.util.TreeMap;
-
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.netlet.util.Slice;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.file.tfile.TFile.Reader;
-import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
-import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner.Entry;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
-
-/**
- * TFileReader
- *
- * @since 2.0.0
- */
-public class TFileReader implements HDSFileReader
-{
-
-  private final Reader reader;
-  private final Scanner scanner;
-  private final FSDataInputStream fsdis;
-  private boolean closed = false;
-
-  public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws
IOException
-  {
-    this.fsdis = fsdis;
-    reader = new Reader(fsdis, fileLength, conf);
-    scanner = reader.createScanner();
-  }
-
-  /**
-   * Unlike the TFile.Reader.close method this will close the wrapped InputStream.
-   * @see java.io.Closeable#close()
-   */
-  @Override
-  public void close() throws IOException
-  {
-    closed = true;
-    scanner.close();
-    reader.close();
-    fsdis.close();
-  }
-
-  @Override
-  public void readFully(TreeMap<Slice, byte[]> data) throws IOException
-  {
-    scanner.rewind();
-    for (; !scanner.atEnd(); scanner.advance()) {
-      Entry en = scanner.entry();
-      int klen = en.getKeyLength();
-      int vlen = en.getValueLength();
-      byte[] key = new byte[klen];
-      byte[] value = new byte[vlen];
-      en.getKey(key);
-      en.getValue(value);
-      data.put(new Slice(key, 0, key.length), value);
-    }
-
-  }
-
-  @Override
-  public void reset() throws IOException
-  {
-    scanner.rewind();
-  }
-
-  @Override
-  public boolean seek(Slice key) throws IOException
-  {
-    try {
-      return scanner.seekTo(key.buffer, key.offset, key.length);
-    } catch (NullPointerException ex) {
-      if (closed)
-        throw new IOException("Stream was closed");
-      else
-        throw ex;
-    }
-  }
-
-  @Override
-  public boolean next(Slice key, Slice value) throws IOException
-  {
-    if (scanner.atEnd()) return false;
-    Entry en = scanner.entry();
-    byte[] rkey = new byte[en.getKeyLength()];
-    byte[] rval = new byte[en.getValueLength()];
-    en.getKey(rkey);
-    en.getValue(rval);
-
-    key.buffer = rkey;
-    key.offset = 0;
-    key.length = en.getKeyLength();
-
-    value.buffer = rval;
-    value.offset = 0;
-    value.length = en.getValueLength();
-
-    scanner.advance();
-    return true;
-  }
-
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7d2f4749/tfile/TFileWriter.java
----------------------------------------------------------------------
diff --git a/tfile/TFileWriter.java b/tfile/TFileWriter.java
deleted file mode 100644
index 549e1b8..0000000
--- a/tfile/TFileWriter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.hdht.tfile;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.file.tfile.TFile.Writer;
-
-import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileWriter;
-
-/**
- * TFileWriter
- *
- * @since 2.0.0
- */
-public final class TFileWriter implements HDSFileWriter
-{
-  private Writer writer;
-  
-  private FSDataOutputStream fsdos;
-  
-  public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String
comparator, Configuration conf) throws IOException
-  {
-    this.fsdos = stream;
-    writer = new Writer(stream, minBlockSize, compressName, comparator, conf);
-    
-  }
-
-  @Override
-  public void close() throws IOException
-  {
-    writer.close();
-    fsdos.close();
-  }
-
-  @Override
-  public void append(byte[] key, byte[] value) throws IOException
-  {
-    writer.append(key, value);
-  }
-
-  @Override
-  public long getBytesWritten() throws IOException{ return fsdos.getPos(); }
-
-}


Mime
View raw message