hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From heyongqi...@apache.org
Subject svn commit: r1027672 [2/13] - in /hadoop/hive/trunk: ./ ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/io...
Date Tue, 26 Oct 2010 18:28:25 GMT
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1027672&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Tue Oct 26 18:28:23 2010
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class MapredLocalTask  extends Task<MapredLocalWork> implements Serializable {
+
+  private Map<String, FetchOperator> fetchOperators;
+  private File jdbmFile;
+  private JobConf job;
+  public static final Log l4j = LogFactory.getLog("MapredLocalTask");
+  private MapOperator mo;
+  // not sure we need this exec context; but all the operators in the work
+  // will pass this context throught
+  private final ExecMapperContext execContext = new ExecMapperContext();
+
+  public MapredLocalTask(){
+    super();
+  }
+
+  @Override
+  public void initialize(HiveConf conf, QueryPlan queryPlan,
+      DriverContext driverContext) {
+    super.initialize(conf, queryPlan, driverContext);
+    job = new JobConf(conf, ExecDriver.class);
+  }
+
+  @Override
+public int execute(DriverContext driverContext){
+    // check the local work
+    if(work == null){
+      return -1;
+    }
+    fetchOperators = new HashMap<String, FetchOperator>();
+    Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
+    execContext.setJc(job);
+    //set the local work, so all the operator can get this context
+    execContext.setLocalWork(work);
+    boolean inputFileChangeSenstive = work.getInputFileChangeSensitive();
+    try{
+
+      initializeOperators(fetchOpJobConfMap);
+      //for each big table's bucket, call the start forward
+      if(inputFileChangeSenstive){
+        for( LinkedHashMap<String, ArrayList<String>> bigTableBucketFiles:
+          work.getBucketMapjoinContext().getAliasBucketFileNameMapping().values()){
+          for(String bigTableBucket: bigTableBucketFiles.keySet()){
+            startForward(inputFileChangeSenstive,bigTableBucket);
+          }
+        }
+      }else{
+        startForward(inputFileChangeSenstive,null);
+      }
+    } catch (Throwable e) {
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        l4j.error("Out of Merror Error");
+      } else {
+        l4j.error("Hive Runtime Error: Map local work failed");
+        e.printStackTrace();
+      }
+    }
+    return 0;
+  }
+
+  private void startForward(boolean inputFileChangeSenstive, String bigTableBucket)
+    throws Exception{
+    for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
+      int fetchOpRows = 0;
+      String alias = entry.getKey();
+      FetchOperator fetchOp = entry.getValue();
+
+      if (inputFileChangeSenstive) {
+        fetchOp.clearFetchContext();
+        setUpFetchOpContext(fetchOp, alias,bigTableBucket);
+      }
+
+      //get the root operator
+      Operator<? extends Serializable> forwardOp = work.getAliasToWork().get(alias);
+      //walk through the operator tree
+      while (true) {
+        InspectableObject row = fetchOp.getNextRow();
+        if (row == null) {
+          if (inputFileChangeSenstive) {
+            String fileName=this.getFileName(bigTableBucket);
+            execContext.setCurrentBigBucketFile(fileName);
+            forwardOp.reset();
+          }
+          forwardOp.close(false);
+          break;
+        }
+        fetchOpRows++;
+        forwardOp.process(row.o, 0);
+        // check if any operator had a fatal error or early exit during
+        // execution
+        if (forwardOp.getDone()) {
+          //ExecMapper.setDone(true);
+          break;
+        }
+      }
+    }
+  }
+  private void initializeOperators(Map<FetchOperator, JobConf> fetchOpJobConfMap)
+    throws HiveException{
+    // this mapper operator is used to initialize all the operators
+    for (Map.Entry<String, FetchWork> entry : work.getAliasToFetchWork().entrySet()) {
+      JobConf jobClone = new JobConf(job);
+
+      Operator<? extends Serializable> tableScan = work.getAliasToWork().get(entry.getKey());
+      boolean setColumnsNeeded = false;
+      if(tableScan instanceof TableScanOperator) {
+        ArrayList<Integer> list = ((TableScanOperator)tableScan).getNeededColumnIDs();
+        if (list != null) {
+          ColumnProjectionUtils.appendReadColumnIDs(jobClone, list);
+          setColumnsNeeded = true;
+        }
+      }
+
+      if (!setColumnsNeeded) {
+        ColumnProjectionUtils.setFullyReadColumns(jobClone);
+      }
+
+      //create a fetch operator
+      FetchOperator fetchOp = new FetchOperator(entry.getValue(),jobClone);
+      fetchOpJobConfMap.put(fetchOp, jobClone);
+      fetchOperators.put(entry.getKey(), fetchOp);
+      l4j.info("fetchoperator for " + entry.getKey() + " created");
+    }
+    //initilize all forward operator
+    for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
+      //get the forward op
+      Operator<? extends Serializable> forwardOp = work.getAliasToWork().get(entry.getKey());
+
+      //put the exe context into all the operators
+      forwardOp.setExecContext(execContext);
+      // All the operators need to be initialized before process
+      FetchOperator fetchOp = entry.getValue();
+      JobConf jobConf = fetchOpJobConfMap.get(fetchOp);
+
+      if (jobConf == null) {
+        jobConf = job;
+      }
+      //initialize the forward operator
+      forwardOp.initialize(jobConf, new ObjectInspector[] {fetchOp.getOutputObjectInspector()});
+      l4j.info("fetchoperator for " + entry.getKey() + " initialized");
+    }
+  }
+
+
+  private void setUpFetchOpContext(FetchOperator fetchOp, String alias,String currentInputFile)
+  throws Exception {
+
+    BucketMapJoinContext bucketMatcherCxt = this.work
+        .getBucketMapjoinContext();
+
+    Class<? extends BucketMatcher> bucketMatcherCls = bucketMatcherCxt
+        .getBucketMatcherClass();
+    BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance(
+        bucketMatcherCls, null);
+    bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt
+        .getAliasBucketFileNameMapping());
+
+    List<Path> aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile,
+        bucketMatcherCxt.getMapJoinBigTableAlias(), alias);
+    Iterator<Path> iter = aliasFiles.iterator();
+    fetchOp.setupContext(iter, null);
+  }
+
+  private String getFileName(String path){
+    if(path== null || path.length()==0) {
+      return null;
+    }
+
+    int last_separator = path.lastIndexOf(Path.SEPARATOR)+1;
+    String fileName = path.substring(last_separator);
+    return fileName;
+
+  }
+  @Override
+  public void localizeMRTmpFilesImpl(Context ctx){
+
+  }
+
+  @Override
+  public String getName() {
+    return "MAPREDLOCAL";
+  }
+  @Override
+  public int getType() {
+    //assert false;
+    return StageType.MAPREDLOCAL;
+  }
+
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Oct 26 18:28:23 2010
@@ -281,6 +281,10 @@ public abstract class Operator<T extends
       return true;
     }
     for (Operator<? extends Serializable> parent : parentOperators) {
+      if (parent == null) {
+        //return true;
+        continue;
+      }
       if (parent.state != State.INIT) {
         return false;
       }
@@ -427,6 +431,14 @@ public abstract class Operator<T extends
     initialize(hconf, null);
   }
 
+  public ObjectInspector[] getInputObjInspectors() {
+    return inputObjInspectors;
+  }
+
+  public void setInputObjInspectors(ObjectInspector[] inputObjInspectors) {
+    this.inputObjInspectors = inputObjInspectors;
+  }
+
   /**
    * Process the row.
    *
@@ -501,6 +513,9 @@ public abstract class Operator<T extends
   protected boolean allInitializedParentsAreClosed() {
     if (parentOperators != null) {
       for (Operator<? extends Serializable> parent : parentOperators) {
+        if(parent==null){
+          continue;
+        }
         if (!(parent.state == State.CLOSE || parent.state == State.UNINIT)) {
           return false;
         }
@@ -710,6 +725,16 @@ public abstract class Operator<T extends
     }
   }
 
+  public void reset(){
+    this.state=State.INIT;
+    if (childOperators != null) {
+      for (Operator<? extends Serializable> o : childOperators) {
+        o.reset();
+      }
+    }
+
+  }
+
   /**
    * OperatorFunc.
    *

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Oct 26 18:28:23 2010
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.ql.plan.Fi
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.ForwardDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.JDBMDummyDesc;
+import org.apache.hadoop.hive.ql.plan.JDBMSinkDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc;
 import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
@@ -85,6 +87,10 @@ public final class OperatorFactory {
         LateralViewJoinOperator.class));
     opvec.add(new OpTuple<LateralViewForwardDesc>(LateralViewForwardDesc.class,
         LateralViewForwardOperator.class));
+    opvec.add(new OpTuple<JDBMDummyDesc>(JDBMDummyDesc.class,
+        JDBMDummyOperator.class));
+    opvec.add(new OpTuple<JDBMSinkDesc>(JDBMSinkDesc.class,
+        JDBMSinkOperator.class));
   }
 
   public static <T extends Serializable> Operator<T> get(Class<T> opClass) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Oct 26 18:28:23 2010
@@ -34,9 +34,10 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.util.JoinUtil;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -106,10 +107,13 @@ public class SMBMapJoinOperator extends 
         HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE);
     byte storePos = (byte) 0;
     for (Byte alias : order) {
-      RowContainer rc = getRowContainer(hconf, storePos, alias, bucketSize);
+      RowContainer rc = JoinUtil.getRowContainer(hconf,
+          rowContainerStandardObjectInspectors.get(storePos),
+          alias, bucketSize,spillTableDesc, conf,noOuterJoin);
       nextGroupStorage[storePos] = rc;
-      RowContainer candidateRC = getRowContainer(hconf, storePos, alias,
-          bucketSize);
+      RowContainer candidateRC = JoinUtil.getRowContainer(hconf,
+          rowContainerStandardObjectInspectors.get((byte)storePos),
+          alias,bucketSize,spillTableDesc, conf,noOuterJoin);
       candidateStorage[alias] = candidateRC;
       storePos++;
     }
@@ -208,12 +212,15 @@ public class SMBMapJoinOperator extends 
 
     byte alias = (byte) tag;
     // compute keys and values as StandardObjects
-    ArrayList<Object> key = computeKeys(row, joinKeys.get(alias),
+
+    // compute keys and values as StandardObjects
+    ArrayList<Object> key = JoinUtil.computeKeys(row, joinKeys.get(alias),
         joinKeysObjectInspectors.get(alias));
-    ArrayList<Object> value = computeValues(row, joinValues.get(alias),
+    ArrayList<Object> value = JoinUtil.computeValues(row, joinValues.get(alias),
         joinValuesObjectInspectors.get(alias), joinFilters.get(alias),
         joinFilterObjectInspectors.get(alias), noOuterJoin);
 
+
     //have we reached a new key group?
     boolean nextKeyGroup = processKey(alias, key);
     if (nextKeyGroup) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Tue Oct 26 18:28:23 2010
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.pe
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.util.JoinUtil;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -60,15 +61,15 @@ import org.apache.hadoop.util.Reflection
  * dir-T1-keys(containing keys which is big in T3), dir-T2-keys(containing big
  * keys in T3),dir-T3-bigkeys(containing keys which is big in T3), ... .....
  * </ul>
- * 
+ *
  * <p>
  * For each skew key, we first write all values to a local tmp file. At the time
  * of ending the current group, the local tmp file will be uploaded to hdfs.
  * Right now, we use one file per skew key.
- * 
+ *
  * <p>
  * For more info, please see https://issues.apache.org/jira/browse/HIVE-964.
- * 
+ *
  */
 public class SkewJoinHandler {
 
@@ -89,6 +90,7 @@ public class SkewJoinHandler {
 
   private LongWritable skewjoinFollowupJobs;
 
+  private final boolean noOuterJoin;
   Configuration hconf = null;
   List<Object> dummyKey = null;
   String taskId;
@@ -101,6 +103,7 @@ public class SkewJoinHandler {
     this.joinOp = joinOp;
     numAliases = joinOp.numAliases;
     conf = joinOp.getConf();
+    noOuterJoin = joinOp.noOuterJoin;
   }
 
   public void initiliaze(Configuration hconf) {
@@ -143,7 +146,7 @@ public class SkewJoinHandler {
         break;
       }
 
-      TableDesc valTblDesc = joinOp.getSpillTableDesc(alias);
+      TableDesc valTblDesc = JoinUtil.getSpillTableDesc(alias,joinOp.spillTableDesc,conf,noOuterJoin);
       List<String> valColNames = new ArrayList<String>();
       if (valTblDesc != null) {
         valColNames = Utilities.getColumnNames(valTblDesc.getProperties());

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Tue Oct 26 18:28:23 2010
@@ -26,15 +26,16 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.mapred.JobConf;
 
 /**
@@ -50,6 +51,16 @@ public class TableScanOperator extends O
   private transient Configuration hconf;
   private transient Stat stat;
   private transient String partitionSpecs;
+  private TableDesc tableDesc;
+
+
+  public TableDesc getTableDesc() {
+    return tableDesc;
+  }
+
+  public void setTableDesc(TableDesc tableDesc) {
+    this.tableDesc = tableDesc;
+  }
 
   /**
    * Other than gathering statistics for the ANALYZE command, the table scan operator

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Tue Oct 26 18:28:23 2010
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.plan.DD
 import org.apache.hadoop.hive.ql.plan.ExplainWork;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FunctionWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
@@ -68,8 +69,12 @@ public final class TaskFactory {
         ConditionalTask.class));
     taskvec.add(new taskTuple<MapredWork>(MapredWork.class,
                                           MapRedTask.class));
+
+    taskvec.add(new taskTuple<MapredLocalWork>(MapredLocalWork.class,
+        MapredLocalTask.class));
     taskvec.add(new taskTuple<StatsWork>(StatsWork.class,
-        StatsTask.class));
+        StatsTask.class));        
+
   }
 
   private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Tue Oct 26 18:28:23 2010
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.p
 import java.io.File;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
@@ -72,7 +73,7 @@ public class HashMapWrapper<K, V> {
 
   /**
    * Constructor.
-   * 
+   *
    * @param threshold
    *          User specified threshold to store new values into persistent
    *          storage.
@@ -91,16 +92,73 @@ public class HashMapWrapper<K, V> {
   }
 
   /**
-   * Get the value based on the key. We try to get it from the main memory hash
-   * table first. If it is not there we will look up the persistent hash table.
-   * This function also guarantees if any item is found given a key, it is
-   * available in main memory HashMap. So mutating the returned value will be
-   * reflected (saved) in HashMapWrapper.
-   * 
+   * Get the value based on the key. this GET method will directly
+   * return the value from jdbm storage.
    * @param key
    * @return Value corresponding to the key. If the key is not found, return
    *         null.
    */
+/*
+  public V getMapJoinValueObject(K key) throws HiveException{
+    if(pHash == null) {
+      LOG.warn("the jdbm object is not ready!");
+      throw new HiveException();
+    }
+    try{
+      V value = (V)pHash.get(key);
+      return value;
+    }catch(Exception e){
+      throw new HiveException(e);
+    }
+  }*/
+
+  /*
+   * In this get operation, the jdbm should read only
+   */
+  public V getMapJoinValueObject(K key) throws HiveException {
+    V value = null;
+
+    // if not the MRU, searching the main memory hash table.
+    MRUItem item = mHash.get(key);
+    if (item != null) {
+      value = item.value;
+      MRUList.moveToHead(item);
+    } else if (pHash != null) {
+      try {
+        value = (V) pHash.get(key);
+        if (value != null) {
+          if (mHash.size() < threshold) {
+            MRUItem itm= new MRUItem(key, value);
+            mHash.put(key, itm);
+            //pHash.remove(key);
+            MRUList.put(itm);
+            //recman.commit();
+
+          } else if (threshold > 0) { // flush the LRU to disk
+            MRUItem tail = MRUList.tail(); // least recently used item
+            //pHash.put(tail.key, tail.value);
+            //pHash.remove(key);
+            //recman.commit();
+
+            // update mHash -- reuse MRUItem
+            item = mHash.remove(tail.key);
+            item.key = key;
+            item.value = value;
+            mHash.put(key, item);
+
+            // update MRU -- reusing MRUItem
+            tail.key = key;
+            tail.value = value;
+            MRUList.moveToHead(tail);
+          }
+        }
+      } catch (Exception e) {
+        LOG.warn(e.toString());
+        throw new HiveException(e);
+      }
+    }
+    return value;
+  }
   public V get(K key) throws HiveException {
     V value = null;
 
@@ -146,7 +204,7 @@ public class HashMapWrapper<K, V> {
    * Put the key value pair in the hash table. It will first try to put it into
    * the main memory hash table. If the size exceeds the threshold, it will put
    * it into the persistent hash table.
-   * 
+   *
    * @param key
    * @param value
    * @throws HiveException
@@ -208,9 +266,82 @@ public class HashMapWrapper<K, V> {
     }
   }
 
+  public void putToJDBM(K key, V value) throws HiveException{
+    if (pHash == null) {
+      pHash = getPersistentHash();
+    }
+    try {
+      pHash.put(key, value);
+      recman.commit();
+    } catch (Exception e) {
+      LOG.warn(e.toString());
+      throw new HiveException(e);
+    }
+
+  }
+
+  /**
+   * Flush the main memory hash table into the persistent cache file
+   *
+   * @return persistent cache file
+   */
+  public String flushMemoryCacheToPersistent() throws HiveException{
+    try{
+      //if no persistent cache file; create a new one
+      if(pHash == null){
+        pHash = getPersistentHash();
+      }
+      int mm_size = mHash.size();
+      //no data in the memory cache
+      if(mm_size == 0){
+        return tmpFile.getAbsolutePath();
+      }
+      //iterate the memory hash table and put them into persistent file
+      for (Map.Entry<K, MRUItem> entry : mHash.entrySet()) {
+        K key = entry.getKey();
+        MRUItem item = entry.getValue();
+        pHash.put(key, item.value);
+      }
+      //commit to the persistent file
+      recman.commit();
+
+      //release the memory
+      mHash.clear();
+
+    }catch (Exception e) {
+      LOG.warn(e.toString());
+      throw new HiveException(e);
+    }
+    return tmpFile.getAbsolutePath();
+  }
+
+  public void initilizePersistentHash(File jdbmfile) throws HiveException{
+    try{
+      Properties props = new Properties();
+      props.setProperty(RecordManagerOptions.CACHE_TYPE,
+          RecordManagerOptions.NORMAL_CACHE);
+      props.setProperty(RecordManagerOptions.DISABLE_TRANSACTIONS, "true");
+
+      recman = RecordManagerFactory.createRecordManager(jdbmfile, props);
+      long recid = recman.getNamedObject( "hashtable" );
+      if ( recid != 0 ) {
+          System.out.println( "Reloading existing hashtable..." );
+          pHash = HTree.load( recman, recid );
+      }else{
+        LOG.warn("initiliaze the hash table by jdbm file Error!");
+        throw new HiveException();
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOG.warn(e.toString());
+      throw new HiveException(e);
+    }
+  }
+
   /**
    * Get the persistent hash table.
-   * 
+   *
    * @return persistent hash table
    * @throws HiveException
    */
@@ -234,6 +365,9 @@ public class HashMapWrapper<K, V> {
 
       recman = RecordManagerFactory.createRecordManager(tmpFile, props);
       pHash = HTree.createInstance(recman);
+      recman.setNamedObject( "hashtable", pHash.getRecid() );
+      //commit to the persistent file
+      recman.commit();
     } catch (Exception e) {
       LOG.warn(e.toString());
       throw new HiveException(e);
@@ -259,7 +393,7 @@ public class HashMapWrapper<K, V> {
    * the pairs are removed from the main memory hash table, pairs in the
    * persistent hash table will not be moved to the main memory hash table.
    * Future inserted elements will go into the main memory hash table though.
-   * 
+   *
    * @param key
    * @throws HiveException
    */
@@ -279,7 +413,7 @@ public class HashMapWrapper<K, V> {
 
   /**
    * Get a list of all keys in the hash map.
-   * 
+   *
    * @return
    */
   public Set<K> keySet() {
@@ -306,7 +440,7 @@ public class HashMapWrapper<K, V> {
 
   /**
    * Get the main memory cache capacity.
-   * 
+   *
    * @return the maximum number of items can be put into main memory HashMap
    *         cache.
    */
@@ -316,7 +450,7 @@ public class HashMapWrapper<K, V> {
 
   /**
    * Close the persistent hash table and clean it up.
-   * 
+   *
    * @throws HiveException
    */
   public void close() throws HiveException {
@@ -330,8 +464,10 @@ public class HashMapWrapper<K, V> {
         throw new HiveException(e);
       }
       // delete the temporary file
-      tmpFile.delete();
-      tmpFile = null;
+      if(tmpFile != null){
+        tmpFile.delete();
+        tmpFile = null;
+      }
       pHash = null;
       recman = null;
     }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java Tue Oct 26 18:28:23 2010
@@ -24,8 +24,8 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.JDBMSinkOperator.JDBMSinkObjectCtx;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -81,7 +81,7 @@ public class MapJoinObjectKey implements
       metadataTag = in.readInt();
 
       // get the tableDesc from the map stored in the mapjoin operator
-      MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(
+      JDBMSinkObjectCtx ctx = MapJoinMetaData.get(
           Integer.valueOf(metadataTag));
 
       Writable val = ctx.getSerDe().getSerializedClass().newInstance();
@@ -89,6 +89,9 @@ public class MapJoinObjectKey implements
       obj = (ArrayList<Object>) ObjectInspectorUtils.copyToStandardObject(ctx
           .getSerDe().deserialize(val), ctx.getSerDe().getObjectInspector(),
           ObjectInspectorCopyOption.WRITABLE);
+      if(obj == null){
+        obj = new ArrayList<Object>(0);
+      }
     } catch (Exception e) {
       throw new IOException(e);
     }
@@ -99,9 +102,8 @@ public class MapJoinObjectKey implements
   public void writeExternal(ObjectOutput out) throws IOException {
     try {
       out.writeInt(metadataTag);
-
       // get the tableDesc from the map stored in the mapjoin operator
-      MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(
+      JDBMSinkObjectCtx ctx = MapJoinMetaData.get(
           Integer.valueOf(metadataTag));
 
       // Different processing for key and value

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java Tue Oct 26 18:28:23 2010
@@ -28,8 +28,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.JDBMSinkOperator.JDBMSinkObjectCtx;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -85,10 +85,11 @@ public class MapJoinObjectValue implemen
   public void readExternal(ObjectInput in) throws IOException,
       ClassNotFoundException {
     try {
+
       metadataTag = in.readInt();
 
       // get the tableDesc from the map stored in the mapjoin operator
-      MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(
+      JDBMSinkObjectCtx ctx = MapJoinMetaData.get(
           Integer.valueOf(metadataTag));
       int sz = in.readInt();
 
@@ -110,6 +111,11 @@ public class MapJoinObjectValue implemen
             res.add(memObj);
           }
         }
+        else{
+          for(int i = 0 ; i <sz; i++){
+            res.add(new ArrayList<Object>(0));
+          }
+        }
       }
       obj = res;
     } catch (Exception e) {
@@ -124,7 +130,7 @@ public class MapJoinObjectValue implemen
       out.writeInt(metadataTag);
 
       // get the tableDesc from the map stored in the mapjoin operator
-      MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(
+      JDBMSinkObjectCtx ctx = MapJoinMetaData.get(
           Integer.valueOf(metadataTag));
 
       // Different processing for key and value

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Tue Oct 26 18:28:23 2010
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -77,7 +77,7 @@ import org.apache.hadoop.util.Reflection
  */
 public class RowContainer<Row extends List<Object>> {
 
-  protected Log LOG = LogFactory.getLog(this.getClass().getName());
+  protected static Log LOG = LogFactory.getLog(RowContainer.class);
 
   // max # of rows can be put into one block
   private static final int BLOCKSIZE = 25000;
@@ -116,6 +116,7 @@ public class RowContainer<Row extends Li
 
   Writable val = null; // cached to use serialize data
 
+  Configuration jc;
   JobConf jobCloneUsingLocalFs = null;
   private LocalFileSystem localFs;
 
@@ -136,16 +137,19 @@ public class RowContainer<Row extends Li
     this.firstReadBlockPointer = currentReadBlock;
     this.serde = null;
     this.standardOI = null;
-    try {
-      this.localFs = FileSystem.getLocal(jc);
-    } catch (IOException e) {
-      throw new HiveException(e);
+    this.jc=jc;
+  }
+
+  private JobConf getLocalFSJobConfClone(Configuration jc) {
+    if(this.jobCloneUsingLocalFs == null) {
+      this.jobCloneUsingLocalFs = new JobConf(jc);
+      HiveConf.setVar(jobCloneUsingLocalFs, HiveConf.ConfVars.HADOOPFS,
+          Utilities.HADOOP_LOCAL_FS);
     }
-    this.jobCloneUsingLocalFs = new JobConf(jc);
-    HiveConf.setVar(jobCloneUsingLocalFs, HiveConf.ConfVars.HADOOPFS,
-        Utilities.HADOOP_LOCAL_FS);
+    return this.jobCloneUsingLocalFs;
   }
 
+
   public RowContainer(int blockSize, SerDe sd, ObjectInspector oi,
       Configuration jc) throws HiveException {
     this(blockSize, jc);
@@ -202,23 +206,24 @@ public class RowContainer<Row extends Li
         this.readBlockSize = this.addCursor;
         this.currentReadBlock = this.currentWriteBlock;
       } else {
+        JobConf localJc = getLocalFSJobConfClone(jc);
         if (inputSplits == null) {
           if (this.inputFormat == null) {
             inputFormat = (InputFormat<WritableComparable, Writable>) ReflectionUtils
                 .newInstance(tblDesc.getInputFileFormatClass(),
-                jobCloneUsingLocalFs);
+                    localJc);
           }
 
-          HiveConf.setVar(jobCloneUsingLocalFs,
+          HiveConf.setVar(localJc,
               HiveConf.ConfVars.HADOOPMAPREDINPUTDIR,
               org.apache.hadoop.util.StringUtils.escapeString(parentFile
               .getAbsolutePath()));
-          inputSplits = inputFormat.getSplits(jobCloneUsingLocalFs, 1);
+          inputSplits = inputFormat.getSplits(localJc, 1);
           acutalSplitNum = inputSplits.length;
         }
         currentSplitPointer = 0;
         rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer],
-            jobCloneUsingLocalFs, Reporter.NULL);
+            localJc, Reporter.NULL);
         currentSplitPointer++;
 
         nextBlock();
@@ -315,6 +320,7 @@ public class RowContainer<Row extends Li
         HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc
             .getOutputFileFormatClass().newInstance();
         tempOutPath = new Path(tmpFile.toString());
+        JobConf localJc = getLocalFSJobConfClone(jc);
         rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs,
             hiveOutputFormat, serde.getSerializedClass(), false, tblDesc
             .getProperties(), tempOutPath);
@@ -389,6 +395,7 @@ public class RowContainer<Row extends Li
       }
 
       if (nextSplit && this.currentSplitPointer < this.acutalSplitNum) {
+        JobConf localJc = getLocalFSJobConfClone(jc);
         // open record reader to read next split
         rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer],
             jobCloneUsingLocalFs, Reporter.NULL);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Tue Oct 26 18:28:23 2010
@@ -29,12 +29,11 @@ package org.apache.hadoop.hive.ql.io;
  */
 public class IOContext {
 
-  private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>();
-  static {
-    if (threadLocal.get() == null) {
-      threadLocal.set(new IOContext());
-    }
-  }
+
+  private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
+    @Override
+    protected synchronized IOContext initialValue() { return new IOContext(); }
+ };
 
   public static IOContext get() {
     return IOContext.threadLocal.get();

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Tue Oct 26 18:28:23 2010
@@ -247,6 +247,7 @@ public final class GenMRSkewJoinProcesso
       for (int k = 0; k < tags.length; k++) {
         Operator<? extends Serializable> ts = OperatorFactory.get(
             TableScanDesc.class, (RowSchema) null);
+        ((TableScanOperator)ts).setTableDesc(tableDescList.get((byte)k));
         parentOps[k] = ts;
       }
       Operator<? extends Serializable> tblScan_op = parentOps[i];
@@ -256,8 +257,14 @@ public final class GenMRSkewJoinProcesso
       aliases.add(alias);
       String bigKeyDirPath = bigKeysDirMap.get(src);
       newPlan.getPathToAliases().put(bigKeyDirPath, aliases);
+
+
+
+
       newPlan.getAliasToWork().put(alias, tblScan_op);
       PartitionDesc part = new PartitionDesc(tableDescList.get(src), null);
+
+
       newPlan.getPathToPartitionInfo().put(bigKeyDirPath, part);
       newPlan.getAliasToPartnInfo().put(alias, part);
 

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java?rev=1027672&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java Tue Oct 26 18:28:23 2010
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.JDBMDummyOperator;
+import org.apache.hadoop.hive.ql.exec.JDBMSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.physical.MapJoinResolver.LocalMapJoinProcCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.JDBMDummyDesc;
+import org.apache.hadoop.hive.ql.plan.JDBMSinkDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+/**
+ * Node processor factory for skew join resolver.
+ */
+public final class LocalMapJoinProcFactory {
+
+
+
+  public static NodeProcessor getJoinProc() {
+    return new LocalMapJoinProcessor();
+  }
+  public static NodeProcessor getMapJoinMapJoinProc() {
+    return new MapJoinMapJoinProc();
+  }
+  public static NodeProcessor getDefaultProc() {
+    return new NodeProcessor() {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+          NodeProcessorCtx procCtx, Object... nodeOutputs)
+          throws SemanticException {
+        return null;
+      }
+    };
+  }
+
+  /**
+   * LocalMapJoinProcessor.
+   *
+   */
+  public static class LocalMapJoinProcessor implements NodeProcessor {
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+        Object... nodeOutputs) throws SemanticException {
+      LocalMapJoinProcCtx context = (LocalMapJoinProcCtx) ctx;
+
+      if(!nd.getName().equals("MAPJOIN")){
+        return null;
+      }
+      MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
+
+      //create an new operator: JDBMSinkOperator
+      JDBMSinkDesc jdbmSinkDesc = new JDBMSinkDesc(mapJoinOp.getConf());
+      JDBMSinkOperator jdbmSinkOp =(JDBMSinkOperator)OperatorFactory.get(jdbmSinkDesc);
+
+
+      //get the last operator for processing big tables
+      int bigTable = mapJoinOp.getConf().getPosBigTable();
+      Byte[] order = mapJoinOp.getConf().getTagOrder();
+      int bigTableAlias=(int)order[bigTable];
+
+      Operator<? extends Serializable> bigOp = mapJoinOp.getParentOperators().get(bigTable);
+
+      //the parent ops for jdbmSinkOp
+      List<Operator<?extends Serializable>> smallTablesParentOp= new ArrayList<Operator<?extends Serializable>>();
+
+      List<Operator<?extends Serializable>> dummyOperators= new ArrayList<Operator<?extends Serializable>>();
+      //get all parents
+      List<Operator<? extends Serializable> >  parentsOp = mapJoinOp.getParentOperators();
+      for(int i = 0; i<parentsOp.size();i++){
+        if(i == bigTableAlias){
+          smallTablesParentOp.add(null);
+          continue;
+        }
+
+        Operator<? extends Serializable> parent = parentsOp.get(i);
+        //let jdbmOp be the child of this parent
+        parent.replaceChild(mapJoinOp, jdbmSinkOp);
+        //keep the parent id correct
+        smallTablesParentOp.add(parent);
+
+        //create an new operator: JDBMDummyOpeator, which share the table desc
+        JDBMDummyDesc desc = new JDBMDummyDesc();
+        JDBMDummyOperator dummyOp =(JDBMDummyOperator)OperatorFactory.get(desc);
+        TableDesc tbl;
+
+        if(parent.getSchema()==null){
+          if(parent instanceof TableScanOperator ){
+            tbl = ((TableScanOperator)parent).getTableDesc();
+         }else{
+           throw new SemanticException();
+         }
+        }else{
+          //get parent schema
+          RowSchema rowSchema = parent.getSchema();
+          tbl = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+              .getFieldSchemasFromRowSchema(rowSchema, ""));
+        }
+
+
+        dummyOp.getConf().setTbl(tbl);
+
+        //let the dummy op  be the parent of mapjoin op
+        mapJoinOp.replaceParent(parent, dummyOp);
+        List<Operator<? extends Serializable>> dummyChildren = new ArrayList<Operator<? extends Serializable>>();
+        dummyChildren.add(mapJoinOp);
+        dummyOp.setChildOperators(dummyChildren);
+
+        //add this dummy op to the dummp operator list
+        dummyOperators.add(dummyOp);
+
+      }
+
+      jdbmSinkOp.setParentOperators(smallTablesParentOp);
+      for(Operator<? extends Serializable> op: dummyOperators){
+        context.addDummyParentOp(op);
+      }
+      return null;
+    }
+
+  }
+
+  /**
+   * LocalMapJoinProcessor.
+   *
+   */
+  public static class MapJoinMapJoinProc implements NodeProcessor {
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+        Object... nodeOutputs) throws SemanticException {
+      LocalMapJoinProcCtx context = (LocalMapJoinProcCtx) ctx;
+      if(!nd.getName().equals("MAPJOIN")){
+        return null;
+      }
+      System.out.println("Mapjoin * MapJoin");
+
+      return null;
+    }
+  }
+
+
+  private LocalMapJoinProcFactory() {
+    // prevent instantiation
+  }
+ }
+

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java?rev=1027672&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java Tue Oct 26 18:28:23 2010
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx;
+
+/**
+ * An implementation of PhysicalPlanResolver. It iterator each MapRedTask to see whether the task has a local map work
+ * if it has, it will move the local work to a new local map join task. Then it will make this new generated task depends on
+ * current task's parent task and make current task depends on this new generated task.
+ */
+public class MapJoinResolver implements PhysicalPlanResolver {
+  @Override
+  public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+
+    //create dispatcher and graph walker
+    Dispatcher disp = new LocalMapJoinTaskDispatcher(pctx);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    //get all the tasks nodes from root task
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.rootTasks);
+
+    //begin to walk through the task tree.
+    ogw.startWalking(topNodes, null);
+    return pctx;
+  }
+
+  /**
+   * Iterator each tasks. If this task has a local work,create a new task for this local work, named MapredLocalTask.
+   * then make this new generated task depends on current task's parent task, and make current task
+   * depends on this new generated task
+   */
+  class LocalMapJoinTaskDispatcher implements Dispatcher {
+
+    private PhysicalContext physicalContext;
+
+    public LocalMapJoinTaskDispatcher(PhysicalContext context) {
+      super();
+      physicalContext = context;
+    }
+
+    private void processCurrentTask(Task<? extends Serializable> currTask,
+        ConditionalTask conditionalTask) throws SemanticException{
+
+
+      //get current mapred work and its local work
+      MapredWork mapredWork = (MapredWork) currTask.getWork();
+      MapredLocalWork localwork = mapredWork.getMapLocalWork();
+
+
+      if(localwork != null){
+        //get the context info and set up the shared tmp URI
+        Context ctx = physicalContext.getContext();
+        String tmpFileURI = ctx.getLocalTmpFileURI()+Path.SEPARATOR+"JDBM-"+currTask.getId();
+        localwork.setTmpFileURI(tmpFileURI);
+        mapredWork.setTmpHDFSFileURI(ctx.getMRTmpFileURI()+Path.SEPARATOR+"JDBM-"+currTask.getId());
+        //create a task for this local work; right now, this local work is shared
+        //by the original MapredTask and this new generated MapredLocalTask.
+        MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork,
+            physicalContext.getParseContext().getConf());
+
+
+        //replace the map join operator to local_map_join operator in the operator tree
+        //and return all the dummy parent
+        List<Operator<? extends Serializable>>  dummyOps= adjustLocalTask(localTask);
+
+        //create new local work and setup the dummy ops
+        MapredLocalWork newLocalWork = new MapredLocalWork();
+        newLocalWork.setDummyParentOp(dummyOps);
+        newLocalWork.setTmpFileURI(tmpFileURI);
+        newLocalWork.setInputFileChangeSensitive(localwork.getInputFileChangeSensitive());
+        mapredWork.setMapLocalWork(newLocalWork);
+
+        //get all parent tasks
+        List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
+        currTask.setParentTasks(null);
+        if (parentTasks != null) {
+
+          for (Task<? extends Serializable> tsk : parentTasks) {
+            //make new generated task depends on all the  parent tasks of current task.
+            tsk.addDependentTask(localTask);
+            //remove the current task from its original parent task's dependent task
+            tsk.removeDependentTask(currTask);
+          }
+
+        }else{
+          //in this case, current task is in the root tasks
+          //so add this new task into root tasks and remove the current task from root tasks
+          if(conditionalTask== null){
+            physicalContext.addToRootTask(localTask);
+            physicalContext.removeFromRootTask(currTask);
+          }else{
+            //set list task
+            List<Task<? extends Serializable>> listTask = conditionalTask.getListTasks();
+            ConditionalWork conditionalWork= conditionalTask.getWork();
+            int index = listTask.indexOf(currTask);
+            listTask.set(index, localTask);
+
+            //set list work
+            List<Serializable> listWork = (List<Serializable>)conditionalWork.getListWorks();
+            index = listWork.indexOf(mapredWork);
+            listWork.set(index,(Serializable)localwork);
+            conditionalWork.setListWorks(listWork);
+
+            //get bigKeysDirToTaskMap
+            ConditionalResolverSkewJoinCtx context  =
+              (ConditionalResolverSkewJoinCtx) conditionalTask.getResolverCtx();
+            HashMap<String, Task<? extends Serializable>> bigKeysDirToTaskMap =
+              context.getDirToTaskMap();
+
+            //to avoid concurrent modify the hashmap
+            HashMap<String, Task<? extends Serializable>> newbigKeysDirToTaskMap =
+              new HashMap<String, Task<? extends Serializable>>();
+
+
+            //reset the resolver
+            for(Map.Entry<String, Task<? extends Serializable>> entry: bigKeysDirToTaskMap.entrySet()){
+              Task<? extends Serializable> task = entry.getValue();
+              String key = entry.getKey();
+
+              if(task.equals(currTask)){
+                newbigKeysDirToTaskMap.put(key, localTask);
+              }else{
+                newbigKeysDirToTaskMap.put(key, task);
+              }
+            }
+
+            context.setDirToTaskMap(newbigKeysDirToTaskMap);
+            conditionalTask.setResolverCtx(context);
+          }
+        }
+
+        //make current task depends on this new generated localMapJoinTask
+        //now localTask is the parent task of the current task
+        localTask.addDependentTask(currTask);
+
+      }
+
+    }
+
+    @Override
+    public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+        throws SemanticException {
+      Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
+      //not map reduce task or not conditional task, just skip
+      if(currTask.isMapRedTask() ){
+        if(currTask instanceof ConditionalTask){
+          //get the list of task
+          List<Task<? extends Serializable>> taskList =  ((ConditionalTask) currTask).getListTasks();
+          for(Task<? extends Serializable> tsk : taskList){
+            if(tsk.isMapRedTask()){
+              this.processCurrentTask(tsk,((ConditionalTask) currTask));
+            }
+          }
+        }else{
+          this.processCurrentTask(currTask,null);
+        }
+      }
+      return null;
+    }
+
+    //replace the map join operator to local_map_join operator in the operator tree
+    private List<Operator<? extends Serializable>> adjustLocalTask(MapredLocalTask task) throws SemanticException {
+
+      LocalMapJoinProcCtx localMapJoinProcCtx = new LocalMapJoinProcCtx(task,
+          physicalContext.getParseContext());
+
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      //opRules.put(new RuleRegExp("R1", "MAPJOIN%.*MAPJOIN%"),
+          //LocalMapJoinProcFactory.getMapJoinMapJoinProc());
+      opRules.put(new RuleRegExp("R1", "MAPJOIN%"), LocalMapJoinProcFactory.getJoinProc());
+
+      // The dispatcher fires the processor corresponding to the closest
+      // matching rule and passes the context along
+      Dispatcher disp = new DefaultRuleDispatcher(LocalMapJoinProcFactory
+          .getDefaultProc(), opRules, localMapJoinProcCtx);
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+
+      // iterator the reducer operator tree
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+
+      topNodes.addAll(task.getWork().getAliasToWork().values());
+      ogw.startWalking(topNodes, null);
+
+      return localMapJoinProcCtx.getDummyParentOp();
+
+    }
+
+    public PhysicalContext getPhysicalContext() {
+      return physicalContext;
+    }
+
+    public void setPhysicalContext(PhysicalContext physicalContext) {
+      this.physicalContext = physicalContext;
+    }
+  }
+  /**
+   * A container of current task and parse context.
+   */
+  public static class LocalMapJoinProcCtx implements NodeProcessorCtx {
+    private Task<? extends Serializable> currentTask;
+    private ParseContext parseCtx;
+    private List<Operator<? extends Serializable>> dummyParentOp = null;
+
+    public LocalMapJoinProcCtx(Task<? extends Serializable> task,
+        ParseContext parseCtx) {
+      currentTask = task;
+      this.parseCtx = parseCtx;
+      dummyParentOp = new ArrayList<Operator<? extends Serializable>>();
+    }
+
+    public Task<? extends Serializable> getCurrentTask() {
+      return currentTask;
+    }
+
+    public void setCurrentTask(Task<? extends Serializable> currentTask) {
+      this.currentTask = currentTask;
+    }
+
+    public ParseContext getParseCtx() {
+      return parseCtx;
+    }
+
+    public void setParseCtx(ParseContext parseCtx) {
+      this.parseCtx = parseCtx;
+    }
+
+    public void setDummyParentOp(List<Operator<? extends Serializable>> op){
+      this.dummyParentOp=op;
+    }
+
+    public List<Operator<? extends Serializable>> getDummyParentOp(){
+      return this.dummyParentOp;
+    }
+    public void addDummyParentOp(Operator<? extends Serializable> op){
+       this.dummyParentOp.add(op);
+    }
+
+  }
+}
+

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalContext.java Tue Oct 26 18:28:23 2010
@@ -72,4 +72,11 @@ public class PhysicalContext {
     this.context = context;
   }
 
+  public void addToRootTask(Task<? extends Serializable> tsk){
+    rootTasks.add(tsk);
+  }
+  public void removeFromRootTask(Task<? extends Serializable> tsk){
+    rootTasks.remove(tsk);
+  }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Tue Oct 26 18:28:23 2010
@@ -41,7 +41,7 @@ public class PhysicalOptimizer {
 
   /**
    * create the list of physical plan resolvers.
-   * 
+   *
    * @param hiveConf
    */
   private void initialize(HiveConf hiveConf) {
@@ -49,11 +49,12 @@ public class PhysicalOptimizer {
     if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) {
       resolvers.add(new SkewJoinResolver());
     }
+    resolvers.add(new MapJoinResolver());
   }
 
   /**
    * invoke all the resolvers one-by-one, and alter the physical plan.
-   * 
+   *
    * @return PhysicalContext
    * @throws HiveException
    */

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java Tue Oct 26 18:28:23 2010
@@ -52,7 +52,7 @@ public class SkewJoinResolver implements
     ArrayList<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pctx.rootTasks);
     ogw.startWalking(topNodes, null);
-    return null;
+    return pctx;
   }
 
   /**

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java Tue Oct 26 18:28:23 2010
@@ -56,7 +56,7 @@ public class ConditionalResolverSkewJoin
      */
     public ConditionalResolverSkewJoinCtx() {
     }
-    
+
     public ConditionalResolverSkewJoinCtx(
         HashMap<String, Task<? extends Serializable>> dirToTaskMap) {
       super();
@@ -95,7 +95,16 @@ public class ConditionalResolverSkewJoin
         FileSystem inpFs = dirPath.getFileSystem(conf);
         FileStatus[] fstatus = inpFs.listStatus(dirPath);
         if (fstatus.length > 0) {
-          resTsks.add(entry.getValue());
+          Task <? extends Serializable> task = entry.getValue();
+          List<Task <? extends Serializable>> parentOps = task.getParentTasks();
+          if(parentOps!=null){
+            for(Task <? extends Serializable> parentOp: parentOps){
+              //right now only one parent
+              resTsks.add(parentOp);
+            }
+          }else{
+            resTsks.add(task);
+          }
         }
       }
     } catch (IOException e) {

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMDummyDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMDummyDesc.java?rev=1027672&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMDummyDesc.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMDummyDesc.java Tue Oct 26 18:28:23 2010
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+/**
+ * JDBM Dummy Descriptor implementation.
+ *
+ */
+@Explain(displayName = "JDBMDummy Operator")
+public class JDBMDummyDesc implements Serializable {
+  private TableDesc tbl;
+
+  public TableDesc getTbl() {
+    return tbl;
+  }
+
+  public void setTbl(TableDesc tbl) {
+    this.tbl = tbl;
+  }
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMSinkDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMSinkDesc.java?rev=1027672&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMSinkDesc.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMSinkDesc.java Tue Oct 26 18:28:23 2010
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+/**
+ * Map Join operator Descriptor implementation.
+ *
+ */
+@Explain(displayName = "JDBM Sink Operator")
+public class JDBMSinkDesc extends JoinDesc implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+
+  // used to handle skew join
+  private boolean handleSkewJoin = false;
+  private int skewKeyDefinition = -1;
+  private Map<Byte, String> bigKeysDirMap;
+  private Map<Byte, Map<Byte, String>> smallKeysDirMap;
+  private Map<Byte, TableDesc> skewKeysValuesTables;
+
+  // alias to key mapping
+  private Map<Byte, List<ExprNodeDesc>> exprs;
+
+  // alias to filter mapping
+  private Map<Byte, List<ExprNodeDesc>> filters;
+
+  // used for create joinOutputObjectInspector
+  protected List<String> outputColumnNames;
+
+  // key:column output name, value:tag
+  private transient Map<String, Byte> reversedExprs;
+
+  // No outer join involved
+  protected boolean noOuterJoin;
+
+  protected JoinCondDesc[] conds;
+
+  protected Byte[] tagOrder;
+  private TableDesc keyTableDesc;
+
+
+  private Map<Byte, List<ExprNodeDesc>> keys;
+  private TableDesc keyTblDesc;
+  private List<TableDesc> valueTblDescs;
+
+  private int posBigTable;
+
+  private Map<Byte, List<Integer>> retainList;
+
+  private transient String bigTableAlias;
+
+  private LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping;
+  private LinkedHashMap<String, Integer> bucketFileNameMapping;
+
+  public JDBMSinkDesc() {
+    bucketFileNameMapping = new LinkedHashMap<String, Integer>();
+  }
+
+  public JDBMSinkDesc(MapJoinDesc clone) {
+    this.bigKeysDirMap = clone.getBigKeysDirMap();
+    this.conds = clone.getConds();
+    this.exprs= clone.getExprs();
+    this.handleSkewJoin = clone.getHandleSkewJoin();
+    this.keyTableDesc = clone.getKeyTableDesc();
+    this.noOuterJoin = clone.getNoOuterJoin();
+    this.outputColumnNames = clone.getOutputColumnNames();
+    this.reversedExprs = clone.getReversedExprs();
+    this.skewKeyDefinition = clone.getSkewKeyDefinition();
+    this.skewKeysValuesTables = clone.getSkewKeysValuesTables();
+    this.smallKeysDirMap = clone.getSmallKeysDirMap();
+    this.tagOrder = clone.getTagOrder();
+    this.filters = clone.getFilters();
+
+    this.keys = clone.getKeys();
+    this.keyTblDesc = clone.getKeyTblDesc();
+    this.valueTblDescs = clone.getValueTblDescs();
+    this.posBigTable = clone.getPosBigTable();
+    this.retainList = clone.getRetainList();
+    this.bigTableAlias = clone.getBigTableAlias();
+    this.aliasBucketFileNameMapping = clone.getAliasBucketFileNameMapping();
+    this.bucketFileNameMapping = clone.getBucketFileNameMapping();
+  }
+
+
+  private void initRetainExprList() {
+    retainList = new HashMap<Byte, List<Integer>>();
+    Set<Entry<Byte, List<ExprNodeDesc>>> set = exprs.entrySet();
+    Iterator<Entry<Byte, List<ExprNodeDesc>>> setIter = set.iterator();
+    while (setIter.hasNext()) {
+      Entry<Byte, List<ExprNodeDesc>> current = setIter.next();
+      List<Integer> list = new ArrayList<Integer>();
+      for (int i = 0; i < current.getValue().size(); i++) {
+        list.add(i);
+      }
+      retainList.put(current.getKey(), list);
+    }
+  }
+
+  public boolean isHandleSkewJoin() {
+    return handleSkewJoin;
+  }
+
+  @Override
+  public void setHandleSkewJoin(boolean handleSkewJoin) {
+    this.handleSkewJoin = handleSkewJoin;
+  }
+
+  @Override
+  public int getSkewKeyDefinition() {
+    return skewKeyDefinition;
+  }
+
+  @Override
+  public void setSkewKeyDefinition(int skewKeyDefinition) {
+    this.skewKeyDefinition = skewKeyDefinition;
+  }
+
+  @Override
+  public Map<Byte, String> getBigKeysDirMap() {
+    return bigKeysDirMap;
+  }
+
+  @Override
+  public void setBigKeysDirMap(Map<Byte, String> bigKeysDirMap) {
+    this.bigKeysDirMap = bigKeysDirMap;
+  }
+
+  @Override
+  public Map<Byte, Map<Byte, String>> getSmallKeysDirMap() {
+    return smallKeysDirMap;
+  }
+
+  @Override
+  public void setSmallKeysDirMap(Map<Byte, Map<Byte, String>> smallKeysDirMap) {
+    this.smallKeysDirMap = smallKeysDirMap;
+  }
+
+  @Override
+  public Map<Byte, TableDesc> getSkewKeysValuesTables() {
+    return skewKeysValuesTables;
+  }
+
+  @Override
+  public void setSkewKeysValuesTables(Map<Byte, TableDesc> skewKeysValuesTables) {
+    this.skewKeysValuesTables = skewKeysValuesTables;
+  }
+
+  @Override
+  public Map<Byte, List<ExprNodeDesc>> getExprs() {
+    return exprs;
+  }
+
+  @Override
+  public void setExprs(Map<Byte, List<ExprNodeDesc>> exprs) {
+    this.exprs = exprs;
+  }
+
+  @Override
+  public Map<Byte, List<ExprNodeDesc>> getFilters() {
+    return filters;
+  }
+
+  @Override
+  public void setFilters(Map<Byte, List<ExprNodeDesc>> filters) {
+    this.filters = filters;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return outputColumnNames;
+  }
+
+  @Override
+  public void setOutputColumnNames(List<String> outputColumnNames) {
+    this.outputColumnNames = outputColumnNames;
+  }
+
+  @Override
+  public Map<String, Byte> getReversedExprs() {
+    return reversedExprs;
+  }
+
+  @Override
+  public void setReversedExprs(Map<String, Byte> reversedExprs) {
+    this.reversedExprs = reversedExprs;
+  }
+
+  @Override
+  public boolean isNoOuterJoin() {
+    return noOuterJoin;
+  }
+
+  @Override
+  public void setNoOuterJoin(boolean noOuterJoin) {
+    this.noOuterJoin = noOuterJoin;
+  }
+
+  @Override
+  public JoinCondDesc[] getConds() {
+    return conds;
+  }
+
+  @Override
+  public void setConds(JoinCondDesc[] conds) {
+    this.conds = conds;
+  }
+
+  @Override
+  public Byte[] getTagOrder() {
+    return tagOrder;
+  }
+
+  @Override
+  public void setTagOrder(Byte[] tagOrder) {
+    this.tagOrder = tagOrder;
+  }
+
+  @Override
+  public TableDesc getKeyTableDesc() {
+    return keyTableDesc;
+  }
+
+  @Override
+  public void setKeyTableDesc(TableDesc keyTableDesc) {
+    this.keyTableDesc = keyTableDesc;
+  }
+
+
+  public Map<Byte, List<Integer>> getRetainList() {
+    return retainList;
+  }
+
+  public void setRetainList(Map<Byte, List<Integer>> retainList) {
+    this.retainList = retainList;
+  }
+
+  /**
+   * @return the keys
+   */
+  @Explain(displayName = "keys")
+  public Map<Byte, List<ExprNodeDesc>> getKeys() {
+    return keys;
+  }
+
+  /**
+   * @param keys
+   *          the keys to set
+   */
+  public void setKeys(Map<Byte, List<ExprNodeDesc>> keys) {
+    this.keys = keys;
+  }
+
+  /**
+   * @return the position of the big table not in memory
+   */
+  @Explain(displayName = "Position of Big Table")
+  public int getPosBigTable() {
+    return posBigTable;
+  }
+
+  /**
+   * @param posBigTable
+   *          the position of the big table not in memory
+   */
+  public void setPosBigTable(int posBigTable) {
+    this.posBigTable = posBigTable;
+  }
+
+  /**
+   * @return the keyTblDesc
+   */
+  public TableDesc getKeyTblDesc() {
+    return keyTblDesc;
+  }
+
+  /**
+   * @param keyTblDesc
+   *          the keyTblDesc to set
+   */
+  public void setKeyTblDesc(TableDesc keyTblDesc) {
+    this.keyTblDesc = keyTblDesc;
+  }
+
+  /**
+   * @return the valueTblDescs
+   */
+  public List<TableDesc> getValueTblDescs() {
+    return valueTblDescs;
+  }
+
+  /**
+   * @param valueTblDescs
+   *          the valueTblDescs to set
+   */
+  public void setValueTblDescs(List<TableDesc> valueTblDescs) {
+    this.valueTblDescs = valueTblDescs;
+  }
+
+  /**
+   * @return bigTableAlias
+   */
+  public String getBigTableAlias() {
+    return bigTableAlias;
+  }
+
+  /**
+   * @param bigTableAlias
+   */
+  public void setBigTableAlias(String bigTableAlias) {
+    this.bigTableAlias = bigTableAlias;
+  }
+
+  public LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> getAliasBucketFileNameMapping() {
+    return aliasBucketFileNameMapping;
+  }
+
+  public void setAliasBucketFileNameMapping(
+      LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping) {
+    this.aliasBucketFileNameMapping = aliasBucketFileNameMapping;
+  }
+
+  public LinkedHashMap<String, Integer> getBucketFileNameMapping() {
+    return bucketFileNameMapping;
+  }
+
+  public void setBucketFileNameMapping(LinkedHashMap<String, Integer> bucketFileNameMapping) {
+    this.bucketFileNameMapping = bucketFileNameMapping;
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Tue Oct 26 18:28:23 2010
@@ -25,14 +25,14 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Map.Entry;
 
 /**
  * Map Join operator Descriptor implementation.
  *
  */
-@Explain(displayName = "Common Join Operator")
+@Explain(displayName = "Map Join Operator")
 public class MapJoinDesc extends JoinDesc implements Serializable {
   private static final long serialVersionUID = 1L;
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java Tue Oct 26 18:28:23 2010
@@ -20,15 +20,15 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.io.File;
 import java.io.Serializable;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.BucketMatcher;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.fs.Path;
 
 /**
  * MapredLocalWork.
@@ -42,8 +42,13 @@ public class MapredLocalWork implements 
   private LinkedHashMap<String, FetchWork> aliasToFetchWork;
   private boolean inputFileChangeSensitive;
   private BucketMapJoinContext bucketMapjoinContext;
+  private String tmpFileURI;
+
+
+  private List<Operator<? extends Serializable>> dummyParentOp ;
 
   public MapredLocalWork() {
+
   }
 
   public MapredLocalWork(
@@ -51,13 +56,32 @@ public class MapredLocalWork implements 
       final LinkedHashMap<String, FetchWork> aliasToFetchWork) {
     this.aliasToWork = aliasToWork;
     this.aliasToFetchWork = aliasToFetchWork;
+
   }
 
+  public MapredLocalWork(MapredLocalWork clone){
+    this.tmpFileURI = clone.tmpFileURI;
+    this.inputFileChangeSensitive=clone.inputFileChangeSensitive;
+
+  }
+
+
+  public void setDummyParentOp(List<Operator<? extends Serializable>> op){
+    this.dummyParentOp=op;
+  }
+
+
+  public List<Operator<? extends Serializable>> getDummyParentOp(){
+    return this.dummyParentOp;
+  }
+
+
   @Explain(displayName = "Alias -> Map Local Operator Tree")
   public LinkedHashMap<String, Operator<? extends Serializable>> getAliasToWork() {
     return aliasToWork;
   }
 
+
   public void setAliasToWork(
       final LinkedHashMap<String, Operator<? extends Serializable>> aliasToWork) {
     this.aliasToWork = aliasToWork;
@@ -88,6 +112,8 @@ public class MapredLocalWork implements 
     this.inputFileChangeSensitive = inputFileChangeSensitive;
   }
 
+
+
   public void deriveExplainAttributes() {
     if (bucketMapjoinContext != null) {
       bucketMapjoinContext.deriveBucketMapJoinMapping();
@@ -110,6 +136,14 @@ public class MapredLocalWork implements 
     this.bucketMapjoinContext = bucketMapjoinContext;
   }
 
+  public void setTmpFileURI(String tmpFileURI) {
+    this.tmpFileURI = tmpFileURI;
+  }
+
+  public String getTmpFileURI() {
+    return tmpFileURI;
+  }
+
   public static class BucketMapJoinContext implements Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -198,11 +232,13 @@ public class MapredLocalWork implements 
       this.aliasBucketFileNameMapping = aliasBucketFileNameMapping;
     }
 
+    @Override
     public String toString() {
-      if (aliasBucketFileNameMapping != null)
+      if (aliasBucketFileNameMapping != null) {
         return "Mapping:" + aliasBucketFileNameMapping.toString();
-      else
+      } else {
         return "";
+      }
     }
 
     @Explain(displayName = "Alias Bucket Base File Name Mapping", normalExplain = false)

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1027672&r1=1027671&r2=1027672&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Tue Oct 26 18:28:23 2010
@@ -67,6 +67,8 @@ public class MapredWork implements Seria
   private String inputformat;
   private boolean gatheringStats;
 
+  private String tmpHDFSFileURI;
+
   public MapredWork() {
     aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>();
   }
@@ -330,4 +332,12 @@ public class MapredWork implements Seria
   public boolean isGatheringStats() {
     return this.gatheringStats;
   }
+
+  public String getTmpHDFSFileURI() {
+    return tmpHDFSFileURI;
+  }
+
+  public void setTmpHDFSFileURI(String tmpHDFSFileURI) {
+    this.tmpHDFSFileURI = tmpHDFSFileURI;
+  }
 }



Mime
View raw message