incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1305444 - in /incubator/hcatalog/trunk: CHANGES.txt src/java/org/apache/hcatalog/pig/HCatBaseLoader.java src/java/org/apache/hcatalog/pig/HCatLoader.java src/test/org/apache/hcatalog/pig/TestHCatLoader.java
Date Mon, 26 Mar 2012 16:53:56 GMT
Author: gates
Date: Mon Mar 26 16:53:56 2012
New Revision: 1305444

URL: http://svn.apache.org/viewvc?rev=1305444&view=rev
Log:
HCATALOG-328 HCatLoader should report its input size so pig can estimate the number of reducers

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1305444&r1=1305443&r2=1305444&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Mon Mar 26 16:53:56 2012
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-328 HCatLoader should report its input size so pig can estimate the number of reducers
(traviscrawford via gates)
+
   HCAT-287 Add data api to HCatalog (hashutosh)
 
   IMPROVEMENTS

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java?rev=1305444&r1=1305443&r2=1305444&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java Mon Mar
26 16:53:56 2012
@@ -22,10 +22,15 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.PartInfo;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.LoadPushDown;
@@ -114,4 +119,31 @@ public abstract class HCatBaseLoader ext
     props.put(key, value);
   }
 
+  /**
+   * A utility method to get the size of inputs. This is accomplished by summing the
+   * size of all input paths on supported FileSystems. Locations whose size cannot be
+   * determined are ignored. Note non-FileSystem and unpartitioned locations will not
+   * report their input size by default.
+   */
+  protected static long getSizeInBytes(InputJobInfo inputJobInfo) throws IOException {
+    Configuration conf = new Configuration();
+    long sizeInBytes = 0;
+
+    for (PartInfo partInfo : inputJobInfo.getPartitions()) {
+      try {
+        Path p = new Path(partInfo.getLocation());
+        if (p.getFileSystem(conf).isFile(p)) {
+          sizeInBytes += p.getFileSystem(conf).getFileStatus(p).getLen();
+        } else {
+          for (FileStatus child : p.getFileSystem(conf).listStatus(p)) {
+            sizeInBytes += child.getLen();
+          }
+        }
+      } catch (IOException e) {
+        // Report size to the extent possible.
+      }
+    }
+
+    return sizeInBytes;
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1305444&r1=1305443&r2=1305444&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java Mon Mar 26 16:53:56
2012
@@ -37,6 +37,7 @@ import org.apache.pig.Expression.BinaryE
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
 import org.apache.pig.impl.util.UDFContext;
 
 /**
@@ -177,6 +178,22 @@ public class HCatLoader extends HCatBase
         PARTITION_FILTER, partitionFilterString);
   }
 
+  /**
+   * Get statistics about the data to be loaded. Only input data size is implemented at this
time.
+   */
+  @Override
+  public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+    try {
+      ResourceStatistics stats = new ResourceStatistics();
+      InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(
+          job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
+      stats.setmBytes(getSizeInBytes(inputJobInfo) / 1024 / 1024);
+      return stats;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
   private String getPartitionFilterString() {
     if(partitionFilterString == null) {
       Properties props = UDFContext.getUDFContext().getUDFProperties(

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java?rev=1305444&r1=1305443&r2=1305444&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java Mon Mar
26 16:53:56 2012
@@ -19,6 +19,7 @@ package org.apache.hcatalog.pig;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -34,10 +35,12 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hcatalog.HcatTestUtils;
 import org.apache.hcatalog.data.Pair;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.ResourceStatistics;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -53,9 +56,10 @@ public class TestHCatLoader extends Test
   private static final String BASIC_TABLE = "junit_unparted_basic";
   private static final String COMPLEX_TABLE = "junit_unparted_complex";
   private static final String PARTITIONED_TABLE = "junit_parted_basic";
+  private static final String SPECIFIC_SIZE_TABLE = "junit_specific_size";
   private static Driver driver;
 
-  private static int guardTestCount = 5; // ugh, instantiate using introspection in guardedSetupBeforeClass
+  private static int guardTestCount = 6; // ugh, instantiate using introspection in guardedSetupBeforeClass
   private static boolean setupHasRun = false;
 
   private static Map<Integer,Pair<Integer,String>> basicInputData;
@@ -113,7 +117,7 @@ public class TestHCatLoader extends Test
         + "phnos array<struct<phno:string,type:string>>");
 
     createTable(PARTITIONED_TABLE,"a int, b string","bkt string");
-
+    createTable(SPECIFIC_SIZE_TABLE, "a int, b string");
 
     int LOOP_SIZE = 3;
     String[] input = new String[LOOP_SIZE*LOOP_SIZE];
@@ -141,6 +145,7 @@ public class TestHCatLoader extends Test
     server.registerQuery("A = load '"+BASIC_FILE_NAME+"' as (a:int, b:chararray);");
 
     server.registerQuery("store A into '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+    server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();");
     server.registerQuery("B = foreach A generate a,b;");
     server.registerQuery("B2 = filter B by a < 2;");
     server.registerQuery("store B2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=0');");
@@ -158,6 +163,7 @@ public class TestHCatLoader extends Test
     dropTable(BASIC_TABLE);
     dropTable(COMPLEX_TABLE);
     dropTable(PARTITIONED_TABLE);
+    dropTable(SPECIFIC_SIZE_TABLE);
   }
 
   protected void guardedTearDownAfterClass() throws Exception {
@@ -376,4 +382,18 @@ public class TestHCatLoader extends Test
     }
     assertEquals(basicInputData.size(),numTuplesRead);
   }
+
+  public void testGetInputBytes() throws Exception {
+    File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE + "/part-m-00000");
+    file.deleteOnExit();
+    RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+    randomAccessFile.setLength(2L * 1024 * 1024 * 1024);
+
+    Job job = new Job();
+    HCatLoader hCatLoader = new HCatLoader();
+    hCatLoader.setUDFContextSignature(this.getName());
+    hCatLoader.setLocation(SPECIFIC_SIZE_TABLE, job);
+    ResourceStatistics statistics = hCatLoader.getStatistics(file.getAbsolutePath(), job);
+    assertEquals(2048, (long) statistics.getmBytes());
+  }
 }



Mime
View raw message