trafodion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/9] trafodion git commit: [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text formatted hive tables
Date Fri, 16 Feb 2018 20:00:34 GMT
Repository: trafodion
Updated Branches:
  refs/heads/master 087af70db -> de8357677


http://git-wip-us.apache.org/repos/asf/trafodion/blob/60db1533/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
new file mode 100644
index 0000000..8d2052f
--- /dev/null
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -0,0 +1,319 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HDFSClient 
+{
+   static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
+   private static Configuration config_ = null;
+   private static ExecutorService executorService_ = null;
+   private static FileSystem defaultFs_ = null;
+   private FileSystem fs_ = null;
+   private int bufNo_;
+   private FSDataInputStream fsdis_; 
+   private OutputStream outStream_;
+   private String filename_;
+   private ByteBuffer buf_;
+   private int bufLen_;
+   private int bufOffset_ = 0;
+   private long pos_ = 0;
+   private int len_ = 0;
+   private int lenRemain_ = 0; 
+   private int blockSize_; 
+   private int bytesRead_;
+   private Future future_ = null;
+    
+   static {
+      String confFile = System.getProperty("trafodion.log4j.configFile");
+      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+      if (confFile == null) {
+         confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
+      }
+      PropertyConfigurator.configure(confFile);
+      config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
+      executorService_ = Executors.newCachedThreadPool();
+      try {
+         defaultFs_ = FileSystem.get(config_);
+      }
+      catch (IOException ioe) {
+         throw new RuntimeException("Exception in HDFSClient static block", ioe);
+      }
+   }
+
+   class HDFSRead implements Callable 
+   {
+      int length_;
+
+      HDFSRead(int length) 
+      {
+         length_ = length;
+      }
+ 
+      public Object call() throws IOException 
+      {
+         int bytesRead;
+         if (buf_.hasArray())
+            bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, length_);
+         else
+         {
+            buf_.limit(bufOffset_ + length_);
+            bytesRead = fsdis_.read(buf_);
+         }
+         return new Integer(bytesRead);
+      }
+   }
+       
+   public HDFSClient() 
+   {
+   }
+ 
+   public HDFSClient(int bufNo, String filename, ByteBuffer buffer, long position, int length)
throws IOException
+   {
+      bufNo_ = bufNo; 
+      filename_ = filename;
+      Path filepath = new Path(filename_);
+      fs_ = FileSystem.get(filepath.toUri(),config_);
+      fsdis_ = fs_.open(filepath);
+      blockSize_ = (int)fs_.getDefaultBlockSize(filepath);
+      buf_  = buffer;
+      bufOffset_ = 0;
+      pos_ = position;
+      len_ = length;
+      if (buffer.hasArray()) 
+         bufLen_ = buffer.array().length;
+      else
+      {
+         bufLen_ = buffer.capacity();
+         buf_.position(0);
+      }
+      lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_;
+      if (lenRemain_ != 0)
+      {
+         int readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
+         future_ = executorService_.submit(new HDFSRead(readLength));
+      }
+   }
+
+   public int trafHdfsRead() throws IOException, InterruptedException, ExecutionException
+   {
+      Integer retObject = 0;
+      int bytesRead;
+      int readLength;
+       
+      if (lenRemain_ == 0)
+         return 0;
+      retObject = (Integer)future_.get();
+      bytesRead = retObject.intValue();
+      if (bytesRead == -1)
+         return -1;
+      bufOffset_ += bytesRead;
+      pos_ += bytesRead;
+      lenRemain_ -= bytesRead;
+      if (bufOffset_ == bufLen_)
+         return bytesRead; 
+      else if (bufOffset_ > bufLen_)
+         throw new IOException("Internal Error in trafHdfsRead ");
+      if (lenRemain_ == 0)
+         return bytesRead; 
+      readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
+      future_ = executorService_.submit(new HDFSRead(readLength));
+      return bytesRead;
+   } 
+
+   public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException
+   {
+      int bytesRead;
+      int totalBytesRead = 0;
+      while (true) {
+         bytesRead = trafHdfsRead();
+         if (bytesRead == -1 || bytesRead == 0)
+            return totalBytesRead;
+         totalBytesRead += bytesRead;
+         if (totalBytesRead == bufLen_)
+              return totalBytesRead;
+      }  
+   } 
+
+   boolean hdfsCreate(String fname , boolean compress) throws IOException
+   {
+     if (logger_.isDebugEnabled()) 
+        logger_.debug("HDFSClient.hdfsCreate() - started" );
+      Path filePath = null;
+      if (!compress || (compress && fname.endsWith(".gz")))
+        filePath = new Path(fname);
+      else
+        filePath = new Path(fname + ".gz");
+        
+      FileSystem fs = FileSystem.get(filePath.toUri(),config_);
+      FSDataOutputStream fsOut = fs.create(filePath, true);
+      
+      if (compress) {
+        GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_);
+        Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
+        outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
+      }
+      else
+        outStream_ = fsOut;      
+      if (logger_.isDebugEnabled()) 
+         logger_.debug("HDFSClient.hdfsCreate() - compressed output stream created" );
+      return true;
+    }
+    
+    boolean hdfsWrite(byte[] buff, long len) throws IOException
+    {
+
+      if (logger_.isDebugEnabled()) 
+         logger_.debug("HDFSClient.hdfsWrite() - started" );
+      outStream_.write(buff);
+      outStream_.flush();
+      if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsWrite() - bytes written
and flushed:" + len  );
+      return true;
+    }
+    
+    boolean hdfsClose() throws IOException
+    {
+      if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsClose() - started" );
+      if (outStream_ != null) {
+          outStream_.close();
+          outStream_ = null;
+      }
+      return true;
+    }
+
+    
+    public boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException
+    {
+      if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - start");
+      if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - source Path:
" + srcPathStr + 
+                                               ", destination File:" + dstPathStr );
+        Path srcPath = new Path(srcPathStr );
+        srcPath = srcPath.makeQualified(srcPath.toUri(), null);
+        FileSystem srcFs = FileSystem.get(srcPath.toUri(),config_);
+  
+        Path dstPath = new Path(dstPathStr);
+        dstPath = dstPath.makeQualified(dstPath.toUri(), null);
+        FileSystem dstFs = FileSystem.get(dstPath.toUri(),config_);
+        
+        if (dstFs.exists(dstPath))
+        {
+          if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - destination
files exists" );
+          // for this prototype we just delete the file-- will change in next code drops
+          dstFs.delete(dstPath, false);
+           // The caller should already have checked existence of file-- throw exception

+           //throw new FileAlreadyExistsException(dstPath.toString());
+        }
+        
+        Path tmpSrcPath = new Path(srcPath, "tmp");
+
+        FileSystem.mkdirs(srcFs, tmpSrcPath,srcFs.getFileStatus(srcPath).getPermission());
+        logger_.debug("HDFSClient.hdfsMergeFiles() - tmp folder created." );
+        Path[] files = FileUtil.stat2Paths(srcFs.listStatus(srcPath));
+        for (Path f : files)
+        {
+          srcFs.rename(f, tmpSrcPath);
+        }
+        // copyMerge and use false for the delete option since it removes the whole directory
+        if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - copyMerge"
);
+        FileUtil.copyMerge(srcFs, tmpSrcPath, dstFs, dstPath, false, config_, null);
+        
+        if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - delete
intermediate files" );
+        srcFs.delete(tmpSrcPath, true);
+      return true;
+    }
+
+   public boolean hdfsCleanUnloadPath(String uldPathStr
+                         /*, boolean checkExistence, String mergeFileStr*/) throws IOException
+   {
+      if (logger_.isDebugEnabled()) 
+         logger_.debug("HDFSClient.hdfsCleanUnloadPath() - unload Path: " + uldPathStr );
+      
+      Path uldPath = new Path(uldPathStr );
+      FileSystem fs = FileSystem.get(uldPath.toUri(), config_);
+      if (!fs.exists(uldPath))
+      {
+        //unload location does not exist. hdfscreate will create it later
+        //nothing to do 
+        return true;
+      }
+       
+      Path[] files = FileUtil.stat2Paths(fs.listStatus(uldPath));
+      if (logger_.isDebugEnabled()) 
+         logger_.debug("HDFSClient.hdfsCleanUnloadPath() - delete files" );
+      for (Path f : files){
+        fs.delete(f, false);
+      }
+      return true;
+   }
+
+   public boolean hdfsExists(String filePathStr) throws IOException 
+   {
+      if (logger_.isDebugEnabled()) 
+         logger_.debug("HDFSClient.hdfsExists() - Path: " + filePathStr);
+
+      Path filePath = new Path(filePathStr );
+      FileSystem fs = FileSystem.get(filePath.toUri(), config_);
+      if (fs.exists(filePath)) 
+         return true;
+      return false;
+   }
+
+   public boolean hdfsDeletePath(String pathStr) throws IOException
+   {
+      if (logger_.isDebugEnabled()) 
+         logger_.debug("HDFSClient.hdfsDeletePath() - start - Path: " + pathStr);
+      Path delPath = new Path(pathStr );
+      FileSystem fs = FileSystem.get(delPath.toUri(), config_);
+      fs.delete(delPath, true);
+      return true;
+   }
+ 
+   public static void shutdown() throws InterruptedException
+   {
+      executorService_.awaitTermination(100, TimeUnit.MILLISECONDS);
+      executorService_.shutdown();
+   }
+}

http://git-wip-us.apache.org/repos/asf/trafodion/blob/60db1533/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
new file mode 100644
index 0000000..bf81ab0
--- /dev/null
+++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
@@ -0,0 +1,248 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+// This class implements an efficient mechanism to read hdfs files
+// Trafodion ExHdfsScan operator provides a range of scans to be performed.
+// The range consists of a hdfs filename, offset and length to be read
+// This class takes in two ByteBuffers. These ByteBuffer can be either direct buffers
+// backed up native buffers or indirect buffer backed by java arrays.
+// All the ranges are read alternating between the two buffers using ExecutorService
+// using CachedThreadPool mechanism. 
+// For a given HdfsScan instance, only one thread(IO thread) is scheduled to read
+// the next full or partial buffer while the main thread processes the previously
+// read information from the other buffer
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import org.trafodion.sql.HDFSClient;
+
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.fs.FileStatus;
+import java.net.URI;
+
+public class HdfsScan 
+{
+   static Logger logger_ = Logger.getLogger(HdfsScan.class.getName());
+   private ByteBuffer buf_[];
+   private int bufLen_[];
+   private HDFSClient hdfsClient_[];
+   private int currRange_;
+   private long currPos_;
+   private long lenRemain_;
+   private int lastBufCompleted_ = -1;
+   private boolean scanCompleted_;
+   private boolean lastScanRangeScheduled_;
+   
+   class HdfsScanRange 
+   {
+      String filename_;
+      long pos_;
+      long len_;
+      
+      HdfsScanRange(String filename, long pos, long len)
+      {
+         filename_ = filename;
+         pos_ = pos;
+         len_ = len;
+      }
+   }
+   
+   private HdfsScanRange hdfsScanRanges_[];
+    
+   static {
+      String confFile = System.getProperty("trafodion.log4j.configFile");
+      System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
+   }
+
+   HdfsScan(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[])
throws IOException
+   {
+      buf_ = new ByteBuffer[2];
+      bufLen_ = new int[2];
+
+      buf_[0] = buf1;
+      buf_[1] = buf2;
+
+      for (int i = 0; i < 2 ; i++) {
+          if (buf_[i].hasArray())
+             bufLen_[i] = buf_[i].array().length;
+          else
+             bufLen_[i] = buf_[i].capacity();
+      }
+      hdfsClient_ = new HDFSClient[2];
+      hdfsScanRanges_ = new HdfsScanRange[filename.length]; 
+      for (int i = 0; i < filename.length; i++) {
+         hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i]);
+      }
+      if (hdfsScanRanges_.length > 0) {
+         currRange_ = 0;
+         currPos_ = hdfsScanRanges_[0].pos_;
+         lenRemain_ = hdfsScanRanges_[0].len_;
+         hdfsScanRange(0);
+      }
+      scanCompleted_ = false;
+      lastScanRangeScheduled_ = false;
+   }
+
+   public void hdfsScanRange(int bufNo) throws IOException
+   {
+      System.out.println (" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + "
BufNo " + bufNo); 
+      int readLength;
+      if (lenRemain_ > bufLen_[bufNo])
+         readLength = bufLen_[bufNo];
+      else
+         readLength = (int)lenRemain_;
+      hdfsClient_[bufNo] = new HDFSClient(bufNo, hdfsScanRanges_[currRange_].filename_, buf_[bufNo],
currPos_, readLength);
+      lenRemain_ -= readLength;
+      currPos_ += readLength; 
+      if (lenRemain_ == 0) {
+         if (currRange_  == (hdfsScanRanges_.length-1)) 
+            lastScanRangeScheduled_ = true;
+         else {
+            currRange_++;
+            currPos_ = hdfsScanRanges_[currRange_].pos_;
+            lenRemain_ = hdfsScanRanges_[currRange_].len_; 
+         }
+      } 
+   } 
+   
+   public int[] trafHdfsRead() throws IOException, InterruptedException, ExecutionException
+   {
+      int[] retArray;
+      int byteCompleted;
+      int bufNo;
+ 
+      if (scanCompleted_)
+         return null; 
+      retArray = new int[2];
+      switch (lastBufCompleted_) {
+         case -1:
+         case 1:
+            byteCompleted = hdfsClient_[0].trafHdfsReadBuffer(); 
+            bufNo = 0;
+            break;
+         case 0:
+            byteCompleted = hdfsClient_[1].trafHdfsReadBuffer(); 
+            bufNo = 1;
+            break;
+         default:
+            bufNo = -1;
+            byteCompleted = -1;
+      }    
+      lastBufCompleted_ = bufNo;
+      retArray[0] = byteCompleted;
+      retArray[1] = bufNo;
+      System.out.println (" Buffer No " + retArray[1] + " Bytes Read " + retArray[0]); 
+      lastBufCompleted_ = bufNo;
+      if (lastScanRangeScheduled_) {
+         scanCompleted_ = true;
+         return retArray; 
+      }
+      switch (lastBufCompleted_)
+      {
+         case 0:
+            hdfsScanRange(1);
+            break;
+         case 1:
+            hdfsScanRange(0);
+            break;            
+         default:
+            break;
+      }
+      return retArray;
+   } 
+   
+   public static void shutdown() throws InterruptedException
+   {
+      HDFSClient.shutdown();
+   }
+   public static void main(String[] args) throws Exception
+   {
+
+      if (args.length < 3)
+      {
+         System.out.println("Usage: org.trafodion.sql.HdfsScan <tableName> <buffer_length>
<number_of_splits>");
+         return;
+      }
+      String tableName = args[0];
+      int capacity = Integer.parseInt(args[1]) * 1024 *1024;
+      int split = Integer.parseInt(args[2]);
+      HiveConf config = new HiveConf(); 
+      HiveMetaStoreClient hiveMeta = new HiveMetaStoreClient(config); 
+      Table table = hiveMeta.getTable(tableName);
+      StorageDescriptor sd = table.getSd();
+      String location = sd.getLocation();
+      URI uri = new URI(location);
+      Path path = new Path(uri);
+      FileSystem fs = FileSystem.get(config);       
+      FileStatus file_status[] = fs.listStatus(path);
+      ByteBuffer buf1 = ByteBuffer.allocateDirect(capacity);
+      ByteBuffer buf2 = ByteBuffer.allocateDirect(capacity);
+      String fileName[] = new String[file_status.length * split];
+      long pos[] = new long[file_status.length * split];
+      long len[] = new long[file_status.length * split];
+      for (int i = 0 ; i < file_status.length * split; i++) {
+         Path filePath = file_status[i].getPath();
+         long fileLen = file_status[i].getLen(); 
+         long splitLen = fileLen / split;
+         fileName[i] = filePath.toString();
+         System.out.println (" fileName " + fileName[i] + " Length " + fileLen); 
+         long splitPos = 0;
+         for (int j = 0 ; j < split ; j++)
+         { 
+            fileName[i] = filePath.toString();
+            pos[i] = splitPos + (splitLen * j);
+            len[i] = splitLen;
+            if (j == (split-1))
+               len[i] = fileLen - (splitLen *(j));
+            System.out.println ("Range " + i + " Pos " + pos[i] + " Length " + len[i]); 
+            i++;
+         }
+      }
+      long time1 = System.currentTimeMillis();
+      HdfsScan hdfsScan = new HdfsScan(buf1, buf2, fileName, pos, len);
+      int[] retArray;
+      int bytesCompleted;
+      while (true) {
+         retArray = hdfsScan.trafHdfsRead();
+         if (retArray == null)
+            break;
+      }
+      long time2 = System.currentTimeMillis();
+      HdfsScan.shutdown();
+      System.out.println("Time taken in milliSeconds " + (time2-time1) );
+   }
+}

http://git-wip-us.apache.org/repos/asf/trafodion/blob/60db1533/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java b/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java
index 2007005..ff88dd7 100644
--- a/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java
+++ b/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java
@@ -143,158 +143,4 @@ public class SequenceFileWriter {
         }
         return null;
     }
-    
-    boolean hdfsCreate(String fname , boolean compress) throws IOException
-    {
-      if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCreate() - started"
);
-      Path filePath = null;
-      if (!compress || (compress && fname.endsWith(".gz")))
-        filePath = new Path(fname);
-      else
-        filePath = new Path(fname + ".gz");
-        
-      fs = FileSystem.get(filePath.toUri(),conf);
-      fsOut = fs.create(filePath, true);
-      
-      outStream = fsOut;
-      
-      if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCreate() - file created"
);
-      if (compress)
-      {
-        GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, conf);
-        Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
-        outStream = gzipCodec.createOutputStream(fsOut, gzipCompressor);
-        sameStream = false;
-      }
-      
-      if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCreate() - compressed
output stream created" );
-      return true;
-    }
-    
-    boolean hdfsWrite(byte[] buff, long len) throws IOException
-    {
-
-      if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsWrite() - started"
);
-      outStream.write(buff);
-      outStream.flush();
-      if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsWrite() - bytes written
and flushed:" + len  );
-      return true;
-    }
-    
-    boolean hdfsClose() throws IOException
-    {
-      if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsClose() - started"
);
-      if (sameStream) { 
-         if (outStream != null) {
-            outStream.close();
-            outStream = null;
-         }
-         fsOut = null;
-      }
-      else {
-         if (outStream != null) {
-            outStream.close();
-            outStream = null;
-         }
-         if (fsOut != null) {
-            fsOut.close();
-            fsOut = null;
-         }
-      }
-      return true;
-    }
-
-    
-    public boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException
-    {
-      if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - start");
-      if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - source
Path: " + srcPathStr + 
-                                               ", destination File:" + dstPathStr );
-        Path srcPath = new Path(srcPathStr );
-        srcPath = srcPath.makeQualified(srcPath.toUri(), null);
-        FileSystem srcFs = FileSystem.get(srcPath.toUri(),conf);
-  
-        Path dstPath = new Path(dstPathStr);
-        dstPath = dstPath.makeQualified(dstPath.toUri(), null);
-        FileSystem dstFs = FileSystem.get(dstPath.toUri(),conf);
-        
-        if (dstFs.exists(dstPath))
-        {
-          if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles()
- destination files exists" );
-          // for this prototype we just delete the file-- will change in next code drops
-          dstFs.delete(dstPath, false);
-           // The caller should already have checked existence of file-- throw exception

-           //throw new FileAlreadyExistsException(dstPath.toString());
-        }
-        
-        Path tmpSrcPath = new Path(srcPath, "tmp");
-
-        FileSystem.mkdirs(srcFs, tmpSrcPath,srcFs.getFileStatus(srcPath).getPermission());
-        logger.debug("SequenceFileWriter.hdfsMergeFiles() - tmp folder created." );
-        Path[] files = FileUtil.stat2Paths(srcFs.listStatus(srcPath));
-        for (Path f : files)
-        {
-          srcFs.rename(f, tmpSrcPath);
-        }
-        // copyMerge and use false for the delete option since it removes the whole directory
-        if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() -
copyMerge" );
-        FileUtil.copyMerge(srcFs, tmpSrcPath, dstFs, dstPath, false, conf, null);
-        
-        if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() -
delete intermediate files" );
-        srcFs.delete(tmpSrcPath, true);
-      return true;
-    }
-
-    public boolean hdfsCleanUnloadPath(String uldPathStr
-                         /*, boolean checkExistence, String mergeFileStr*/) throws IOException
-    {
-      if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCleanUnloadPath()
- start");
-      logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() - unload Path: " + uldPathStr
);
-      
-      Path uldPath = new Path(uldPathStr );
-      uldPath = uldPath.makeQualified(uldPath.toUri(), null);
-      FileSystem srcFs = FileSystem.get(uldPath.toUri(),conf);
-      if (!srcFs.exists(uldPath))
-      {
-        //unload location does not exist. hdfscreate will create it later
-        //nothing to do 
-        logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() -- unload location does not
exist." );
-        return true;
-      }
-       
-      Path[] files = FileUtil.stat2Paths(srcFs.listStatus(uldPath));
-      logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() - delete files" );
-      for (Path f : files){
-        srcFs.delete(f, false);
-      }
-      return true;
-    }
-
-  public boolean hdfsExists(String filePathStr) throws IOException 
-  {
-    logger.debug("SequenceFileWriter.hdfsExists() - start");
-    logger.debug("SequenceFileWriter.hdfsExists() - Path: " + filePathStr);
-
-        //check existence of the merge Path
-       Path filePath = new Path(filePathStr );
-       filePath = filePath.makeQualified(filePath.toUri(), null);
-       FileSystem mergeFs = FileSystem.get(filePath.toUri(),conf);
-       if (mergeFs.exists( filePath))
-       {
-       logger.debug("SequenceFileWriter.hdfsExists() - Path: "
-       + filePath + " exists" );
-         return true;
-       }
-    return false;
-  }
-
-  public boolean hdfsDeletePath(String pathStr) throws IOException
-  {
-    if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsDeletePath() - start
- Path: " + pathStr);
-      Path delPath = new Path(pathStr );
-      delPath = delPath.makeQualified(delPath.toUri(), null);
-      FileSystem fs = FileSystem.get(delPath.toUri(),conf);
-      fs.delete(delPath, true);
-    return true;
-  }
 }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/60db1533/core/sql/src/main/java/org/trafodion/sql/TrafConfiguration.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/TrafConfiguration.java b/core/sql/src/main/java/org/trafodion/sql/TrafConfiguration.java
index 80b9382..cf0cfa1 100644
--- a/core/sql/src/main/java/org/trafodion/sql/TrafConfiguration.java
+++ b/core/sql/src/main/java/org/trafodion/sql/TrafConfiguration.java
@@ -37,6 +37,7 @@ public class TrafConfiguration {
     Configuration lv_conf = new Configuration();
     switch (config) {
        case HBASE_CONF:
+       case HDFS_CONF:
           String trafSiteXml = new String(System.getenv("TRAF_CONF") + "/trafodion-site.xml");
           Path fileRes = new Path(trafSiteXml);
           lv_conf.addResource(fileRes);


Mime
View raw message