hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject svn commit: r980659 [8/34] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/common/ contrib/src/test/results/clientpositive/ metastore/if/ metastore/src/gen-cpp/ metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/ metasto...
Date Fri, 30 Jul 2010 06:40:11 GMT
Modified: hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original)
+++ hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Fri Jul 30 06:40:04 2010
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Type;
 import org.apache.hadoop.hive.metastore.model.MDatabase;
 import org.apache.hadoop.hive.metastore.model.MFieldSchema;
+import org.apache.hadoop.hive.metastore.model.MIndex;
 import org.apache.hadoop.hive.metastore.model.MOrder;
 import org.apache.hadoop.hive.metastore.model.MPartition;
 import org.apache.hadoop.hive.metastore.model.MSerDeInfo;
@@ -83,7 +85,7 @@ public class ObjectStore implements RawS
   private boolean isInitialized = false;
   private PersistenceManager pm = null;
   private Configuration hiveConf;
-  private int openTrasactionCalls = 0;
+  int openTrasactionCalls = 0;
   private Transaction currentTransaction = null;
   private TXN_STATUS transactionStatus = TXN_STATUS.NO_STATE;
 
@@ -247,7 +249,7 @@ public class ObjectStore implements RawS
     if (!currentTransaction.isActive()) {
       throw new RuntimeException(
           "Commit is called, but transaction is not active. Either there are"
-              + "mismatching open and close calls or rollback was called in the same trasaction");
+              + " mismatching open and close calls or rollback was called in the same trasaction");
     }
     openTrasactionCalls--;
     if ((openTrasactionCalls == 0) && currentTransaction.isActive()) {
@@ -1012,4 +1014,201 @@ public class ObjectStore implements RawS
         newSd.getSerDeInfo().getSerializationLib());
     oldSd.getSerDeInfo().setParameters(newSd.getSerDeInfo().getParameters());
   }
+
+  @Override
+  public boolean addIndex(Index index) throws InvalidObjectException,
+      MetaException {
+    boolean commited = false;
+    try {
+      openTransaction();
+      MIndex idx = convertToMIndex(index);
+      pm.makePersistent(idx);
+      commited = commitTransaction();
+      return true;
+    } finally {
+      if (!commited) {
+        rollbackTransaction();
+        return false;
+      }
+    }
+  }
+
+  private MIndex convertToMIndex(Index index) throws InvalidObjectException,
+      MetaException {
+
+    StorageDescriptor sd = index.getSd();
+    if (sd == null) {
+      throw new InvalidObjectException("Storage descriptor is not defined for index.");
+    }
+
+    MStorageDescriptor msd = this.convertToMStorageDescriptor(sd);
+    MTable origTable = getMTable(index.getDbName(), index.getOrigTableName());
+    if (origTable == null) {
+      throw new InvalidObjectException(
+          "Original table does not exist for the given index.");
+    }
+
+    MTable indexTable = getMTable(index.getDbName(), index.getIndexTableName());
+    if (indexTable == null) {
+      throw new InvalidObjectException(
+          "Underlying index table does not exist for the given index.");
+    }
+
+    return new MIndex(index.getIndexName(), origTable, index.getCreateTime(),
+        index.getLastAccessTime(), index.getParameters(), indexTable, msd,
+        index.getIndexHandlerClass(), index.isDeferredRebuild());
+  }
+
+  @Override
+  public boolean dropIndex(String dbName, String origTableName, String indexName)
+      throws MetaException {
+    boolean success = false;
+    try {
+      openTransaction();
+      MIndex index = getMIndex(dbName, origTableName, indexName);
+      if (index != null) {
+        pm.deletePersistent(index);
+      }
+      success = commitTransaction();
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+    return success;
+  }
+  
+  private MIndex getMIndex(String dbName, String originalTblName, String indexName) throws MetaException {
+    MIndex midx = null;
+    boolean commited = false;
+    try {
+      openTransaction();
+      dbName = dbName.toLowerCase();
+      originalTblName = originalTblName.toLowerCase();
+      MTable mtbl = getMTable(dbName, originalTblName);
+      if (mtbl == null) {
+        commited = commitTransaction();
+        return null;
+      }
+
+      Query query = pm
+          .newQuery(MIndex.class,
+              "origTable.tableName == t1 && origTable.database.name == t2 && indexName == t3");
+      query
+          .declareParameters("java.lang.String t1, java.lang.String t2, java.lang.String t3");
+      query.setUnique(true);
+      midx = (MIndex) query.execute(originalTblName.trim(), dbName.trim(), indexName);
+      pm.retrieve(midx);
+      commited = commitTransaction();
+    } finally {
+      if (!commited) {
+        rollbackTransaction();
+      }
+    }
+    return midx;
+  }
+
+  @Override
+  public Index getIndex(String dbName, String origTableName, String indexName)
+      throws MetaException {
+    openTransaction();
+    MIndex mIndex = this.getMIndex(dbName, origTableName, indexName);
+    Index ret = convertToIndex(mIndex);
+    commitTransaction();
+    return ret;
+  }
+
+  private Index convertToIndex(MIndex mIndex) throws MetaException {
+    if(mIndex == null) {
+      return null;
+    }
+
+    return new Index(
+    mIndex.getIndexName(),
+    mIndex.getIndexHandlerClass(),
+    MetaStoreUtils.DEFAULT_DATABASE_NAME,
+    mIndex.getOrigTable().getTableName(),
+    mIndex.getCreateTime(),
+    mIndex.getLastAccessTime(),
+    mIndex.getIndexTable().getTableName(),
+    this.convertToStorageDescriptor(mIndex.getSd()),
+    mIndex.getParameters(),
+    mIndex.getDeferredRebuild());
+
+  }
+
+  @Override
+  public List<Index> getIndexes(String dbName, String origTableName, int max)
+      throws MetaException {
+    boolean success = false;
+    try {
+      openTransaction();
+      List<MIndex> mIndexList = listMIndexes(dbName, origTableName, max);
+      List<Index> indexes = new ArrayList<Index>(mIndexList.size());
+      for (MIndex midx : mIndexList) {
+        indexes.add(this.convertToIndex(midx));
+      }
+      success = commitTransaction();
+      return indexes;
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+  }
+  
+  private List<MIndex> listMIndexes(String dbName, String origTableName,
+      int max) {
+    boolean success = false;
+    List<MIndex> mindexes = null;
+    try {
+      openTransaction();
+      LOG.debug("Executing listMIndexes");
+      dbName = dbName.toLowerCase();
+      origTableName = origTableName.toLowerCase();
+      Query query = pm.newQuery(MIndex.class,
+          "origTable.tableName == t1 && origTable.database.name == t2");
+      query.declareParameters("java.lang.String t1, java.lang.String t2");
+      mindexes = (List<MIndex>) query
+          .execute(origTableName.trim(), dbName.trim());
+      LOG.debug("Done executing query for listMIndexes");
+      pm.retrieveAll(mindexes);
+      success = commitTransaction();
+      LOG.debug("Done retrieving all objects for listMIndexes");
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+    return mindexes;
+  }
+
+  @Override
+  public List<String> listIndexNames(String dbName, String origTableName,
+      short max) throws MetaException {
+    List<String> pns = new ArrayList<String>();
+    boolean success = false;
+    try {
+      openTransaction();
+      LOG.debug("Executing listIndexNames");
+      dbName = dbName.toLowerCase();
+      origTableName = origTableName.toLowerCase();
+      Query q = pm
+          .newQuery("select indexName from org.apache.hadoop.hive.metastore.model.MIndex where origTable.database.name == t1 && origTable.tableName == t2 order by indexName asc");
+      q.declareParameters("java.lang.String t1, java.lang.String t2");
+      q.setResult("indexName");
+      Collection names = (Collection) q
+          .execute(dbName.trim(), origTableName.trim());
+      pns = new ArrayList<String>();
+      for (Iterator i = names.iterator(); i.hasNext();) {
+        pns.add((String) i.next());
+      }
+      success = commitTransaction();
+    } finally {
+      if (!success) {
+        rollbackTransaction();
+      }
+    }
+    return pns;
+  }
 }

Modified: hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java (original)
+++ hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java Fri Jul 30 06:40:04 2010
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -104,4 +105,18 @@ public interface RawStore extends Config
 
   public abstract void alterPartition(String db_name, String tbl_name,
       Partition new_part) throws InvalidObjectException, MetaException;
+  
+  public abstract boolean addIndex(Index index)
+      throws InvalidObjectException, MetaException;
+  
+  public abstract Index getIndex(String dbName, String origTableName, String indexName) throws MetaException;
+
+  public abstract boolean dropIndex(String dbName, String origTableName, String indexName) throws MetaException;
+
+  public abstract List<Index> getIndexes(String dbName,
+      String origTableName, int max) throws MetaException;
+  
+  public abstract List<String> listIndexNames(String dbName,
+      String origTableName, short max) throws MetaException;
+  
 }

Modified: hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/TableType.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/TableType.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/TableType.java (original)
+++ hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/TableType.java Fri Jul 30 06:40:04 2010
@@ -22,5 +22,5 @@ package org.apache.hadoop.hive.metastore
  * Typesafe enum for types of tables described by the metastore.
  */
 public enum TableType {
-  MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_VIEW
+  MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_VIEW, INDEX_TABLE
 }

Modified: hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java (original)
+++ hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java Fri Jul 30 06:40:04 2010
@@ -328,4 +328,13 @@ public class Warehouse {
     }
     return FileUtils.makePartName(colNames, vals);
   }
+  
+  public static List<String> getPartValuesFromPartName(String partName)
+      throws MetaException {
+    LinkedHashMap<String, String> partSpec = Warehouse.makeSpecFromName(partName);
+    List<String> values = new ArrayList<String>();
+    values.addAll(partSpec.values());
+    return values;
+  }
+
 }

Added: hadoop/hive/trunk/metastore/src/model/org/apache/hadoop/hive/metastore/model/MIndex.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/metastore/src/model/org/apache/hadoop/hive/metastore/model/MIndex.java?rev=980659&view=auto
==============================================================================
--- hadoop/hive/trunk/metastore/src/model/org/apache/hadoop/hive/metastore/model/MIndex.java (added)
+++ hadoop/hive/trunk/metastore/src/model/org/apache/hadoop/hive/metastore/model/MIndex.java Fri Jul 30 06:40:04 2010
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.model;
+
+import java.util.Map;
+
+/**
+ * Represents hive's index definition.
+ */
+public class MIndex {
+  
+  private String indexName;
+  private MTable origTable;
+  private int createTime;
+  private int lastAccessTime;
+  private Map<String, String> parameters;
+  private MTable indexTable;
+  private MStorageDescriptor sd;
+  private String indexHandlerClass;
+  private boolean deferredRebuild;
+
+  public MIndex() {}
+  
+/**
+ * @param indexName
+ * @param orignialTable
+ * @param createTime
+ * @param lastAccessTime
+ * @param parameters
+ * @param indexTable
+ * @param sd
+ */
+  public MIndex(String indexName, MTable baseTable, int createTime,
+      int lastAccessTime, Map<String, String> parameters, MTable indexTable,
+      MStorageDescriptor sd, String indexHandlerClass, boolean deferredRebuild) {
+    super();
+    this.indexName = indexName;
+    this.origTable = baseTable;
+    this.createTime = createTime;
+    this.lastAccessTime = lastAccessTime;
+    this.parameters = parameters;
+    this.indexTable = indexTable;
+    this.sd = sd;
+    this.indexHandlerClass = indexHandlerClass;
+    this.deferredRebuild = deferredRebuild;
+  }
+
+
+
+  /**
+   * @return index name
+   */
+  public String getIndexName() {
+    return indexName;
+  }
+
+  /**
+   * @param indexName index name
+   */
+  public void setIndexName(String indexName) {
+    this.indexName = indexName;
+  }
+
+  /**
+   * @return create time
+   */
+  public int getCreateTime() {
+    return createTime;
+  }
+
+  /**
+   * @param createTime create time
+   */
+  public void setCreateTime(int createTime) {
+    this.createTime = createTime;
+  }
+
+  /**
+   * @return last access time
+   */
+  public int getLastAccessTime() {
+    return lastAccessTime;
+  }
+
+  /**
+   * @param lastAccessTime last access time
+   */
+  public void setLastAccessTime(int lastAccessTime) {
+    this.lastAccessTime = lastAccessTime;
+  }
+
+  /**
+   * @return parameters
+   */
+  public Map<String, String> getParameters() {
+    return parameters;
+  }
+
+  /**
+   * @param parameters parameters
+   */
+  public void setParameters(Map<String, String> parameters) {
+    this.parameters = parameters;
+  }
+
+  /**
+   * @return original table
+   */
+  public MTable getOrigTable() {
+    return origTable;
+  }
+
+  /**
+   * @param origTable
+   */
+  public void setOrigTable(MTable origTable) {
+    this.origTable = origTable;
+  }
+
+  /**
+   * @return index table
+   */
+  public MTable getIndexTable() {
+    return indexTable;
+  }
+
+  /**
+   * @param indexTable
+   */
+  public void setIndexTable(MTable indexTable) {
+    this.indexTable = indexTable;
+  }
+  
+  /**
+   * @return storage descriptor
+   */
+  public MStorageDescriptor getSd() {
+    return sd;
+  }
+
+  /**
+   * @param sd
+   */
+  public void setSd(MStorageDescriptor sd) {
+    this.sd = sd;
+  }
+
+  /**
+   * @return indexHandlerClass
+   */
+  public String getIndexHandlerClass() {
+    return indexHandlerClass;
+  }
+
+  /**
+   * @param indexHandlerClass
+   */
+  public void setIndexHandlerClass(String indexHandlerClass) {
+    this.indexHandlerClass = indexHandlerClass;
+  }
+  
+  /**
+   * @return auto rebuild
+   */
+  public boolean isDeferredRebuild() {
+    return deferredRebuild;
+  }
+  
+  /**
+   * @return auto rebuild
+   */
+  public boolean getDeferredRebuild() {
+    return deferredRebuild;
+  }
+
+  /**
+   * @param autoRebuild
+   */
+  public void setDeferredRebuild(boolean deferredRebuild) {
+    this.deferredRebuild = deferredRebuild;
+  }
+}

Modified: hadoop/hive/trunk/metastore/src/model/package.jdo
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/metastore/src/model/package.jdo?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/metastore/src/model/package.jdo (original)
+++ hadoop/hive/trunk/metastore/src/model/package.jdo Fri Jul 30 06:40:04 2010
@@ -289,6 +289,52 @@
         </value>
       </field>
     </class>
-
+    
+    <class name="MIndex" table="IDXS" identity-type="datastore" detachable="true">
+      <index name="UniqueINDEX" unique="true">
+        <column name="INDEX_NAME"/>
+        <column name="ORIG_TBL_ID"/>
+      </index>
+      
+      <datastore-identity>
+        <column name="INDEX_ID"/>
+      </datastore-identity>
+      <field name="indexName">
+        <column name="INDEX_NAME" length="128" jdbc-type="VARCHAR"/>
+      </field>
+      <field name="origTable">
+        <column name="ORIG_TBL_ID"/>
+      </field>
+      <field name="indexTable">
+        <column name="INDEX_TBL_ID"/>
+      </field>
+      <field name="indexHandlerClass">
+        <column name="INDEX_HANDLER_CLASS"/>
+      </field>
+      <field name="deferredRebuild">
+        <column name="DEFERRED_REBUILD"/>
+      </field>
+      <field name="createTime">
+        <column name="CREATE_TIME" jdbc-type="integer"/>
+      </field>
+      <field name="lastAccessTime">
+        <column name="LAST_ACCESS_TIME" jdbc-type="integer"/>
+      </field>
+      <field name="sd" dependent="true">
+        <column name="SD_ID"/>
+      </field>
+      <field name="parameters" table="INDEX_PARAMS">
+        <map key-type="java.lang.String" value-type="java.lang.String"/>
+        <join>
+          <column name="INDEX_ID"/>
+        </join>
+        <key>
+           <column name="PARAM_KEY" length="256" jdbc-type="VARCHAR"/>
+        </key>
+        <value>
+           <column name="PARAM_VALUE" length="767" jdbc-type="VARCHAR"/>
+        </value>
+      </field>
+    </class>
   </package>
 </jdo>

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java Fri Jul 30 06:40:04 2010
@@ -24,6 +24,7 @@ import java.util.Queue;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * DriverContext.
@@ -82,4 +83,5 @@ public class DriverContext {
   public void incCurJobNo(int amount) {
     this.curJobNo = this.curJobNo + amount;
   }
+  
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java Fri Jul 30 06:40:04 2010
@@ -44,29 +44,36 @@ public class ColumnInfo implements Seria
   private String tabAlias;
 
   /**
-   * Indicates whether the column is a partition column.
+   * Indicates whether the column is a virtual column.
    */
-  private boolean isPartitionCol;
+  private boolean isVirtualCol;
 
   private transient TypeInfo type;
+  
+  private boolean isHiddenVirtualCol;
 
   public ColumnInfo() {
   }
 
   public ColumnInfo(String internalName, TypeInfo type, String tabAlias,
-      boolean isPartitionCol) {
-    this.internalName = internalName;
-    this.type = type;
-    this.tabAlias = tabAlias;
-    this.isPartitionCol = isPartitionCol;
+      boolean isVirtualCol) {
+    this(internalName, type, tabAlias, isVirtualCol, false);
   }
 
   public ColumnInfo(String internalName, Class type, String tabAlias,
-      boolean isPartitionCol) {
+      boolean isVirtualCol) {
+    this(internalName, TypeInfoFactory
+        .getPrimitiveTypeInfoFromPrimitiveWritable(type), tabAlias,
+        isVirtualCol, false);
+  }
+  
+  public ColumnInfo(String internalName, TypeInfo type, String tabAlias,
+      boolean isVirtualCol, boolean isHiddenVirtualCol) {
     this.internalName = internalName;
-    this.type = TypeInfoFactory.getPrimitiveTypeInfoFromPrimitiveWritable(type);
+    this.type = type;
     this.tabAlias = tabAlias;
-    this.isPartitionCol = isPartitionCol;
+    this.isVirtualCol = isVirtualCol;
+    this.isHiddenVirtualCol = isHiddenVirtualCol;
   }
 
   public TypeInfo getType() {
@@ -89,8 +96,12 @@ public class ColumnInfo implements Seria
     return tabAlias;
   }
 
-  public boolean getIsPartitionCol() {
-    return isPartitionCol;
+  public boolean getIsVirtualCol() {
+    return isVirtualCol;
+  }
+  
+  public boolean isHiddenVirtualCol() {
+    return isHiddenVirtualCol;
   }
 
   /**

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Fri Jul 30 06:40:04 2010
@@ -78,12 +78,14 @@ import org.apache.hadoop.hive.ql.plan.Cr
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DescFunctionDesc;
 import org.apache.hadoop.hive.ql.plan.DescTableDesc;
+import org.apache.hadoop.hive.ql.plan.DropIndexDesc;
 import org.apache.hadoop.hive.ql.plan.DropTableDesc;
 import org.apache.hadoop.hive.ql.plan.MsckDesc;
 import org.apache.hadoop.hive.ql.plan.ShowFunctionsDesc;
 import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc;
 import org.apache.hadoop.hive.ql.plan.ShowTableStatusDesc;
 import org.apache.hadoop.hive.ql.plan.ShowTablesDesc;
+import org.apache.hadoop.hive.ql.plan.CreateIndexDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.serde.Constants;
@@ -145,7 +147,18 @@ public class DDLTask extends Task<DDLWor
         return createTable(db, crtTbl);
       }
 
+      CreateIndexDesc crtIndex = work.getCreateIndexDesc();
+      if (crtIndex != null) {
+        return createIndex(db, crtIndex);
+      }
+      
+      DropIndexDesc dropIdx = work.getDropIdxDesc();
+      if(dropIdx != null) {
+        return dropIndex(db, dropIdx);
+      }
+
       CreateTableLikeDesc crtTblLike = work.getCreateTblLikeDesc();
+
       if (crtTblLike != null) {
         return createTableLike(db, crtTblLike);
       }
@@ -234,6 +247,30 @@ public class DDLTask extends Task<DDLWor
     return 0;
   }
 
+  private int dropIndex(Hive db, DropIndexDesc dropIdx) throws HiveException {
+    db.dropIndex(MetaStoreUtils.DEFAULT_DATABASE_NAME, dropIdx.getTableName(), 
+        dropIdx.getIndexName(), true);
+    return 0;
+  }
+
+  private int createIndex(Hive db, CreateIndexDesc crtIndex) throws HiveException {
+
+    if( crtIndex.getSerde() != null) {
+      validateSerDe(crtIndex.getSerde());
+    }
+
+    db
+        .createIndex(
+        crtIndex.getTableName(), crtIndex.getIndexName(), crtIndex.getIndexTypeHandlerClass(), 
+        crtIndex.getIndexedCols(), crtIndex.getIndexTableName(), crtIndex.getDeferredRebuild(),
+        crtIndex.getInputFormat(), crtIndex.getOutputFormat(), crtIndex.getSerde(), 
+        crtIndex.getStorageHandler(), crtIndex.getLocation(), crtIndex.getIdxProps(), crtIndex.getSerdeProps(),
+        crtIndex.getCollItemDelim(), crtIndex.getFieldDelim(), crtIndex.getFieldEscape(),
+        crtIndex.getLineDelim(), crtIndex.getMapKeyDelim()
+        );
+    return 0;
+  }
+
   /**
    * Add a partition to a table.
    *

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Fri Jul 30 06:40:04 2010
@@ -177,7 +177,7 @@ public class ExecMapper extends MapReduc
       } else {
         // Since there is no concept of a group, we don't invoke
         // startGroup/endGroup for a mapper
-        mo.process((Writable) value);
+        mo.process((Writable)value);
         if (l4j.isInfoEnabled()) {
           numRows++;
           if (numRows == nextCntr) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java Fri Jul 30 06:40:04 2010
@@ -8,6 +8,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
@@ -33,7 +34,10 @@ public class ExecMapperContext {
   private Map<String, FetchOperator> fetchOperators;
   private JobConf jc;
 
+  private IOContext ioCxt;
+  
   public ExecMapperContext() {
+    ioCxt = IOContext.get();
   }
 
   public void processInputFileChangeForLocalWork() throws HiveException {
@@ -56,7 +60,7 @@ public class ExecMapperContext {
    */
   public boolean inputFileChanged() {
     if (!inputFileChecked) {
-      currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME);
+      currentInputFile = this.ioCxt.getInputFile();
       inputFileChecked = true;
     }
     return lastInputFile == null || !lastInputFile.equals(currentInputFile);
@@ -190,4 +194,13 @@ public class ExecMapperContext {
   public void setFetchOperators(Map<String, FetchOperator> fetchOperators) {
     this.fetchOperators = fetchOperators;
   }
+  
+  public IOContext getIoCxt() {
+    return ioCxt;
+  }
+
+  public void setIoCxt(IOContext ioCxt) {
+    this.ioCxt = ioCxt;
+  }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Fri Jul 30 06:40:04 2010
@@ -136,6 +136,7 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFHistogramNumeric;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectSet;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
@@ -367,6 +368,7 @@ public final class FunctionRegistry {
 
     registerGenericUDAF("histogram_numeric", new GenericUDAFHistogramNumeric());
     registerGenericUDAF("percentile_approx", new GenericUDAFPercentileApprox());
+    registerGenericUDAF("collect_set", new GenericUDAFCollectSet());
 
     registerGenericUDAF("ngrams", new GenericUDAFnGrams());
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Fri Jul 30 06:40:04 2010
@@ -31,16 +31,21 @@ import java.util.Map.Entry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -67,8 +72,12 @@ public class MapOperator extends Operato
   private transient Deserializer deserializer;
 
   private transient Object[] rowWithPart;
+  private transient Writable[] vcValues;
+  private transient List<VirtualColumn> vcs;
+  private transient Object[] rowWithPartAndVC;
   private transient StructObjectInspector rowObjectInspector;
   private transient boolean isPartitioned;
+  private transient boolean hasVC;
   private Map<MapInputPath, MapOpCtx> opCtxMap;
 
   private Map<Operator<? extends Serializable>, java.util.ArrayList<String>> operatorToPaths;
@@ -116,6 +125,8 @@ public class MapOperator extends Operato
 
   private static class MapOpCtx {
     boolean isPartitioned;
+    StructObjectInspector rawRowObjectInspector; //without partition
+    StructObjectInspector partObjectInspector; // partition
     StructObjectInspector rowObjectInspector;
     Object[] rowWithPart;
     Deserializer deserializer;
@@ -128,10 +139,15 @@ public class MapOperator extends Operato
      * @param rowWithPart
      */
     public MapOpCtx(boolean isPartitioned,
-        StructObjectInspector rowObjectInspector, Object[] rowWithPart,
+        StructObjectInspector rowObjectInspector,
+        StructObjectInspector rawRowObjectInspector,
+        StructObjectInspector partObjectInspector,
+        Object[] rowWithPart,
         Deserializer deserializer) {
       this.isPartitioned = isPartitioned;
       this.rowObjectInspector = rowObjectInspector;
+      this.rawRowObjectInspector = rawRowObjectInspector;
+      this.partObjectInspector = partObjectInspector;
       this.rowWithPart = rowWithPart;
       this.deserializer = deserializer;
     }
@@ -206,7 +222,7 @@ public class MapOperator extends Operato
     // HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, partName);
     Deserializer deserializer = (Deserializer) sdclass.newInstance();
     deserializer.initialize(hconf, tblProps);
-    StructObjectInspector rowObjectInspector = (StructObjectInspector) deserializer
+    StructObjectInspector rawRowObjectInspector = (StructObjectInspector) deserializer
         .getObjectInspector();
 
     MapOpCtx opCtx = null;
@@ -239,16 +255,16 @@ public class MapOperator extends Operato
 
       Object[] rowWithPart = new Object[2];
       rowWithPart[1] = partValues;
-      rowObjectInspector = ObjectInspectorFactory
+      StructObjectInspector rowObjectInspector = ObjectInspectorFactory
           .getUnionStructObjectInspector(Arrays
-          .asList(new StructObjectInspector[] {rowObjectInspector, partObjectInspector}));
+          .asList(new StructObjectInspector[] {rawRowObjectInspector, partObjectInspector}));
       // LOG.info("dump " + tableName + " " + partName + " " +
       // rowObjectInspector.getTypeName());
-      opCtx = new MapOpCtx(true, rowObjectInspector, rowWithPart, deserializer);
+      opCtx = new MapOpCtx(true, rowObjectInspector, rawRowObjectInspector ,partObjectInspector,rowWithPart, deserializer);
     } else {
       // LOG.info("dump2 " + tableName + " " + partName + " " +
       // rowObjectInspector.getTypeName());
-      opCtx = new MapOpCtx(false, rowObjectInspector, null, deserializer);
+      opCtx = new MapOpCtx(false, rawRowObjectInspector, rawRowObjectInspector, null, null, deserializer);
     }
     opCtx.tableName = tableName;
     opCtx.partName = partName;
@@ -271,6 +287,7 @@ public class MapOperator extends Operato
         MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile);
         Path onepath = new Path(new Path(onefile).toUri().getPath());
         List<String> aliases = conf.getPathToAliases().get(onefile);
+        
         for (String onealias : aliases) {
           Operator<? extends Serializable> op = conf.getAliasToWork().get(
               onealias);
@@ -298,6 +315,47 @@ public class MapOperator extends Operato
               isPartitioned = opCtxMap.get(inp).isPartitioned();
               rowWithPart = opCtxMap.get(inp).getRowWithPart();
               rowObjectInspector = opCtxMap.get(inp).getRowObjectInspector();
+              StructObjectInspector rawRowObjectInspector = opCtxMap.get(inp).rawRowObjectInspector;
+              StructObjectInspector partObjectInspector = opCtxMap.get(inp).partObjectInspector;
+              if (op instanceof TableScanOperator) {
+                TableScanOperator tsOp = (TableScanOperator) op;
+                TableScanDesc tsDesc = tsOp.getConf();
+                if(tsDesc != null) {
+                  this.vcs = tsDesc.getVirtualCols();
+                  if (vcs != null && vcs.size() > 0) {
+                    this.hasVC = true;
+                    List<String> vcNames = new ArrayList<String>(vcs.size());
+                    this.vcValues = new Writable[vcs.size()];
+                    List<ObjectInspector> vcsObjectInspectors = new ArrayList<ObjectInspector>(vcs.size());
+                    for (int i = 0; i < vcs.size(); i++) {
+                      VirtualColumn vc = vcs.get(i);
+                      vcsObjectInspectors.add(
+                          PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+                              ((PrimitiveTypeInfo) vc.getTypeInfo()).getPrimitiveCategory()));
+                      vcNames.add(vc.getName());
+                    }
+                    StructObjectInspector vcStructObjectInspector = ObjectInspectorFactory
+                        .getStandardStructObjectInspector(vcNames,
+                            vcsObjectInspectors);
+                    if (isPartitioned) {
+                      this.rowWithPartAndVC = new Object[3];
+                      this.rowWithPartAndVC[1] = this.rowWithPart[1];
+                    } else {
+                      this.rowWithPartAndVC = new Object[2];
+                    }
+                    if(partObjectInspector == null) {
+                      this.rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(Arrays
+                          .asList(new StructObjectInspector[] {
+                              rowObjectInspector, vcStructObjectInspector }));  
+                    } else {
+                      this.rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(Arrays
+                          .asList(new StructObjectInspector[] {
+                              rawRowObjectInspector, partObjectInspector, vcStructObjectInspector }));
+                    }
+                    opCtxMap.get(inp).rowObjectInspector = this.rowObjectInspector;
+                  }
+                }
+              }
               done = true;
             }
           }
@@ -375,10 +433,15 @@ public class MapOperator extends Operato
   public void process(Writable value) throws HiveException {
     Object row = null;
     try {
-      if (!isPartitioned) {
-        row = deserializer.deserialize(value);
+      if (this.hasVC) {
+        this.rowWithPartAndVC[0] = deserializer.deserialize(value);
+        int vcPos = isPartitioned ? 2 : 1;
+        populateVirtualColumnValues();
+        this.rowWithPartAndVC[vcPos] = this.vcValues;
+      } else if (!isPartitioned) {
+        row = deserializer.deserialize((Writable)value);
       } else {
-        rowWithPart[0] = deserializer.deserialize(value);
+        rowWithPart[0] = deserializer.deserialize((Writable)value);
       }
     } catch (Exception e) {
       // Serialize the row and output.
@@ -396,7 +459,9 @@ public class MapOperator extends Operato
     }
     
     try {
-      if (!isPartitioned) {
+      if (this.hasVC) {
+        forward(this.rowWithPartAndVC, this.rowObjectInspector);
+      } else if (!isPartitioned) {
         forward(row, rowObjectInspector);
       } else {
         forward(rowWithPart, rowObjectInspector);
@@ -405,7 +470,9 @@ public class MapOperator extends Operato
       // Serialize the row and output the error message.
       String rowString;
       try {
-        if (!isPartitioned) {
+        if (this.hasVC) {
+          rowString = SerDeUtils.getJSONString(rowWithPartAndVC, rowObjectInspector);
+        } else if (!isPartitioned) {
           rowString = SerDeUtils.getJSONString(row, rowObjectInspector);
         } else {
           rowString = SerDeUtils.getJSONString(rowWithPart, rowObjectInspector);
@@ -418,6 +485,30 @@ public class MapOperator extends Operato
     }
   }
 
+  private void populateVirtualColumnValues() {
+    if (this.vcs != null) {
+      ExecMapperContext mapExecCxt = this.getExecContext();
+      IOContext ioCxt = mapExecCxt.getIoCxt();
+      for (int i = 0; i < vcs.size(); i++) {
+        VirtualColumn vc = vcs.get(i);
+        if (vc.equals(VirtualColumn.FILENAME) && mapExecCxt.inputFileChanged()) {
+          this.vcValues[i] = new Text(mapExecCxt.getCurrentInputFile());
+        } else if (vc.equals(VirtualColumn.BLOCKOFFSET)) {
+          long current = ioCxt.getCurrentBlockStart();
+          LongWritable old = (LongWritable) this.vcValues[i];
+          if (old == null) {
+            old = new LongWritable(current);
+            this.vcValues[i] = old;
+            continue;
+          }
+          if (current != old.get()) {
+            old.set(current);
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public void processOp(Object row, int tag) throws HiveException {
     throw new HiveException("Hive 2 Internal error: should not be called!");

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Fri Jul 30 06:40:04 2010
@@ -129,6 +129,14 @@ public final class TaskFactory {
       return (ret);
     }
 
+    makeChild(ret, tasklist);
+    
+    return (ret);
+  }
+
+
+  public static  void makeChild(Task<?> ret,
+      Task<? extends Serializable>... tasklist) {
     // Add the new task as child of each of the passed in tasks
     for (Task<? extends Serializable> tsk : tasklist) {
       List<Task<? extends Serializable>> children = tsk.getChildTasks();
@@ -138,8 +146,6 @@ public final class TaskFactory {
       children.add(ret);
       tsk.setChildTasks(children);
     }
-
-    return (ret);
   }
 
   private TaskFactory() {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Fri Jul 30 06:40:04 2010
@@ -282,7 +282,9 @@ public class HiveHistory {
       sb.append(DELIMITER);
       String key = ent.getKey();
       String val = ent.getValue();
-      val = val.replace('\n', ' ');
+      if(val != null) {
+        val = val.replace('\n', ' ');        
+      }
       sb.append(key + "=\"" + val + "\"");
 
     }

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AbstractIndexHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AbstractIndexHandler.java?rev=980659&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AbstractIndexHandler.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AbstractIndexHandler.java Fri Jul 30 06:40:04 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+
+/**
+ * Abstract base class for index handlers.  This is provided as insulation
+ * so that as HiveIndexHandler evolves, default implementations of new
+ * methods can be added here in order to avoid breaking existing
+ * plugin implementations.
+ */
+public abstract class AbstractIndexHandler implements HiveIndexHandler {
+  
+  public static String getColumnNames(List<FieldSchema> fieldSchemas) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < fieldSchemas.size(); i++) {
+      if (i > 0) {
+        sb.append(",");
+      }
+      sb.append(HiveUtils.unparseIdentifier(fieldSchemas.get(i).getName()));
+    }
+    return sb.toString();
+  }
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java?rev=980659&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java Fri Jul 30 06:40:04 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Holds index related constants
+ */
+public class HiveIndex {
+
+  public static final Log l4j = LogFactory.getLog("HiveIndex");
+  
+  public static String INDEX_TABLE_CREATETIME = "hive.index.basetbl.dfs.lastModifiedTime";
+  
+  public static enum IndexType {
+    COMPACT_SUMMARY_TABLE("compact", "org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler");
+
+    private IndexType(String indexType, String className) {
+      indexTypeName = indexType;
+      this.handlerClsName = className;
+    }
+
+    private String indexTypeName;
+    private String handlerClsName;
+
+    public String getName() {
+      return indexTypeName;
+    }
+    
+    public String getHandlerClsName() {
+      return handlerClsName;
+    }
+  }
+  
+  public static IndexType getIndexType(String name) {
+    IndexType[] types = IndexType.values();
+    for (IndexType type : types) {
+      if(type.getName().equals(name.toLowerCase()))
+        return type;
+    }
+    return null;
+  }
+
+}
+

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java?rev=980659&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java Fri Jul 30 06:40:04 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+
+/**
+ * HiveIndexHandler defines a pluggable interface for adding new index handlers
+ * to Hive.
+ */
+public interface HiveIndexHandler extends Configurable {
+  /**
+   * Determines whether this handler implements indexes by creating an index
+   * table.
+   * 
+   * @return true if index creation implies creation of an index table in Hive;
+   *         false if the index representation is not stored in a Hive table
+   */
+  boolean usesIndexTable();
+
+  /**
+   * Requests that the handler validate an index definition and fill in
+   * additional information about its stored representation.
+   * 
+   * @param baseTable
+   *          the definition of the table being indexed
+   * 
+   * @param index
+   *          the definition of the index being created
+   * 
+   * @param indexTable
+   *          a partial definition of the index table to be used for storing the
+   *          index representation, or null if usesIndexTable() returns false;
+   *          the handler can augment the index's storage descriptor (e.g. with
+   *          information about input/output format) and/or the index table's
+   *          definition (typically with additional columns containing the index
+   *          representation, e.g. pointers into HDFS).
+   * 
+   * @throw HiveException if the index definition is invalid with respect to
+   *        either the base table or the supplied index table definition
+   */
+  void analyzeIndexDefinition(
+      org.apache.hadoop.hive.metastore.api.Table baseTable,
+      org.apache.hadoop.hive.metastore.api.Index index,
+      org.apache.hadoop.hive.metastore.api.Table indexTable)
+      throws HiveException;
+
+  /**
+   * Requests that the handler generate a plan for building the index; the plan
+   * should read the base table and write out the index representation.
+   * 
+   * @param baseTable
+   *          the definition of the table being indexed
+   * 
+   * @param index
+   *          the definition of the index
+   * 
+   * @param indexTblPartitions
+   *          list of index partitions
+   * 
+   * @param baseTblPartitions
+   *          list of base table partitions with each element mirrors to the
+   *          corresponding one in indexTblPartitions
+   * 
+   * @param indexTable
+   *          the definition of the index table, or null if usesIndexTable()
+   *          returns null
+   * 
+   * @param db
+   * 
+   * @return list of tasks to be executed in parallel for building the index
+   * 
+   * @throw HiveException if plan generation fails
+   */
+  List<Task<?>> generateIndexBuildTaskList(
+      org.apache.hadoop.hive.ql.metadata.Table baseTbl,
+      org.apache.hadoop.hive.metastore.api.Index index,
+      List<Partition> indexTblPartitions, List<Partition> baseTblPartitions,
+      org.apache.hadoop.hive.ql.metadata.Table indexTbl, Hive db)
+      throws HiveException;
+
+}
\ No newline at end of file

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java?rev=980659&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java Fri Jul 30 06:40:04 2010
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index.compact;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.index.AbstractIndexHandler;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+public class CompactIndexHandler extends AbstractIndexHandler {
+  
+  private Configuration configuration;
+
+  @Override
+  public void analyzeIndexDefinition(Table baseTable, Index index,
+      Table indexTable) throws HiveException {
+    StorageDescriptor storageDesc = index.getSd();
+    if (this.usesIndexTable() && indexTable != null) {
+      StorageDescriptor indexTableSd = storageDesc.clone();
+      List<FieldSchema> indexTblCols = indexTableSd.getCols();
+      FieldSchema bucketFileName = new FieldSchema("_bucketname", "string", "");
+      indexTblCols.add(bucketFileName);
+      FieldSchema offSets = new FieldSchema("_offsets", "array<bigint>", "");
+      indexTblCols.add(offSets);
+      indexTable.setSd(indexTableSd);
+    }
+  }
+
+  @Override
+  public List<Task<?>> generateIndexBuildTaskList(
+      org.apache.hadoop.hive.ql.metadata.Table baseTbl,
+      org.apache.hadoop.hive.metastore.api.Index index,
+      List<Partition> indexTblPartitions,
+      List<Partition> baseTblPartitions,
+      org.apache.hadoop.hive.ql.metadata.Table indexTbl,
+      Hive db) throws HiveException {
+    try {
+
+      TableDesc desc = Utilities.getTableDesc(indexTbl);
+
+      List<Partition> newBaseTblPartitions = new ArrayList<Partition>();
+
+      List<Task<?>> indexBuilderTasks = new ArrayList<Task<?>>();
+
+      if (!baseTbl.isPartitioned()) {
+        // the table does not have any partition, then create index for the
+        // whole table
+        Task<?> indexBuilder = getIndexBuilderMapRedTask(index.getSd().getCols(), false,
+            new PartitionDesc(desc, null), indexTbl.getTableName(),
+            new PartitionDesc(Utilities.getTableDesc(baseTbl), null), 
+            baseTbl.getTableName(), db, indexTbl.getDbName());
+        indexBuilderTasks.add(indexBuilder);
+      } else {
+
+        // check whether the index table partitions are still exists in base
+        // table
+        for (int i = 0; i < indexTblPartitions.size(); i++) {
+          Partition indexPart = indexTblPartitions.get(i);
+          Partition basePart = null;
+          for (int j = 0; j < baseTblPartitions.size(); j++) {
+            if (baseTblPartitions.get(j).getName().equals(indexPart.getName())) {
+              basePart = baseTblPartitions.get(j);
+              newBaseTblPartitions.add(baseTblPartitions.get(j));
+              break;
+            }
+          }
+          if (basePart == null)
+            throw new RuntimeException(
+                "Partitions of base table and index table are inconsistent.");
+          // for each partition, spawn a map reduce task.
+          Task<?> indexBuilder = getIndexBuilderMapRedTask(index.getSd().getCols(), true,
+              new PartitionDesc(indexPart), indexTbl.getTableName(),
+              new PartitionDesc(basePart), baseTbl.getTableName(), db, indexTbl.getDbName());
+          
+          indexBuilderTasks.add(indexBuilder);
+        }
+      }
+      return indexBuilderTasks;
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  private Task<?> getIndexBuilderMapRedTask(List<FieldSchema> indexField, boolean partitioned,
+      PartitionDesc indexTblPartDesc, String indexTableName,
+      PartitionDesc baseTablePartDesc, String baseTableName, Hive db, String dbName) {
+    
+    String indexCols = MetaStoreUtils.getColumnNamesFromFieldSchema(indexField);
+
+    //form a new insert overwrite query.
+    StringBuilder command= new StringBuilder();
+    LinkedHashMap<String, String> partSpec = indexTblPartDesc.getPartSpec();
+
+    command.append("INSERT OVERWRITE TABLE " + indexTableName );
+    if (partitioned && indexTblPartDesc != null) {
+      command.append(" PARTITION ( ");
+      List<String> ret = getPartKVPairStringArray(partSpec);
+      for (int i = 0; i < ret.size(); i++) {
+        String partKV = ret.get(i);
+        command.append(partKV);
+        if (i < ret.size() - 1)
+          command.append(",");
+      }
+      command.append(" ) ");
+    }
+    
+    command.append(" SELECT ");
+    command.append(indexCols);
+    command.append(",");
+
+    command.append(VirtualColumn.FILENAME.getName());
+    command.append(",");
+    command.append(" collect_set (");
+    command.append(VirtualColumn.BLOCKOFFSET.getName());
+    command.append(") ");
+    command.append(" FROM " + baseTableName );
+    LinkedHashMap<String, String> basePartSpec = baseTablePartDesc.getPartSpec();
+    if(basePartSpec != null) {
+      command.append(" WHERE ");
+      List<String> pkv = getPartKVPairStringArray(basePartSpec);
+      for (int i = 0; i < pkv.size(); i++) {
+        String partKV = pkv.get(i);
+        command.append(partKV);
+        if (i < pkv.size() - 1)
+          command.append(" AND ");
+      }
+    }
+    command.append(" GROUP BY ");
+    command.append(indexCols + ", " + VirtualColumn.FILENAME.getName());
+    command.append(" SORT BY ");
+    command.append(indexCols);
+
+    Driver driver = new Driver(db.getConf());
+    driver.compile(command.toString());
+
+    Task<?> rootTask = driver.getPlan().getRootTasks().get(0);
+    
+    IndexMetadataChangeWork indexMetaChange = new IndexMetadataChangeWork(partSpec, indexTableName, dbName);
+    IndexMetadataChangeTask indexMetaChangeTsk = new IndexMetadataChangeTask(); 
+    indexMetaChangeTsk.setWork(indexMetaChange);
+    rootTask.addDependentTask(indexMetaChangeTsk);
+
+    return rootTask;
+  }
+
+  private List<String> getPartKVPairStringArray(
+      LinkedHashMap<String, String> partSpec) {
+    List<String> ret = new ArrayList<String>(partSpec.size());
+    Iterator<Entry<String, String>> iter = partSpec.entrySet().iterator();
+    while (iter.hasNext()) {
+      StringBuilder sb = new StringBuilder();
+      Entry<String, String> p = iter.next();
+      sb.append(HiveUtils.unparseIdentifier(p.getKey()));
+      sb.append(" = ");
+      sb.append("'");
+      sb.append(p.getValue());
+      sb.append("'");
+      ret.add(sb.toString());
+    }
+    return ret;
+  }
+
+  @Override
+  public boolean usesIndexTable() {
+    return true;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return configuration;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.configuration = conf;
+  }
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java?rev=980659&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java Fri Jul 30 06:40:04 2010
@@ -0,0 +1,75 @@
+package org.apache.hadoop.hive.ql.index.compact;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+public class HiveCompactIndexInputFormat extends HiveInputFormat {
+
+  public static final Log l4j = LogFactory.getLog("HiveIndexInputFormat");
+
+  public HiveCompactIndexInputFormat() {
+    super();
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    String indexFileStr = job.get("hive.index.compact.file");
+    l4j.info("index_file is " + indexFileStr);
+    HiveInputSplit[] splits = (HiveInputSplit[]) super
+    .getSplits(job, numSplits);
+
+    if (indexFileStr == null) {
+      return splits;
+    }
+
+    HiveCompactIndexResult hiveIndexResult = null;
+    try {
+      hiveIndexResult = new HiveCompactIndexResult(indexFileStr, job);
+    } catch (HiveException e) {
+      // there is
+      l4j.error("Unable to read index so we will go with all the file splits.");
+      e.printStackTrace();
+    }
+
+    ArrayList<HiveInputSplit> newSplits = new ArrayList<HiveInputSplit>(
+        numSplits);
+    for (HiveInputSplit split : splits) {
+      l4j.info("split start : " + split.getStart());
+      l4j.info("split end : " + (split.getStart() + split.getLength()));
+
+      try {
+        if (hiveIndexResult.contains(split)) {
+          // we may miss a sync here
+          HiveInputSplit newSplit = split;
+          if (split.inputFormatClassName().contains("RCFile")
+              || split.inputFormatClassName().contains("SequenceFile")) {
+            if (split.getStart() > SequenceFile.SYNC_INTERVAL) {
+              newSplit = new HiveInputSplit(new FileSplit(split.getPath(), split
+                  .getStart()
+                  - SequenceFile.SYNC_INTERVAL, split.getLength()
+                  + SequenceFile.SYNC_INTERVAL, split.getLocations()), split
+                  .inputFormatClassName());
+            }
+          }
+          newSplits.add(newSplit);
+        }
+      } catch (HiveException e) {
+        throw new RuntimeException(
+            "Unable to get metadata for input table split" + split.getPath());
+      }
+    }
+    InputSplit retA[] = newSplits.toArray((new FileSplit[newSplits.size()]));
+    l4j.info("Number of input splits: " + splits.length + " new input splits: "
+        + retA.length);
+    return retA;
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexResult.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexResult.java?rev=980659&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexResult.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexResult.java Fri Jul 30 06:40:04 2010
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.index.compact;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+
+public class HiveCompactIndexResult {
+
+  public static final Log l4j = LogFactory.getLog("HiveCompactIndexResult");
+
+  // IndexBucket
+  static class IBucket {
+    private String name = null;
+    private SortedSet<Long> offsets = new TreeSet<Long>();
+
+    public IBucket(String n) {
+      name = n;
+    }
+
+    public void add(Long offset) {
+      offsets.add(offset);
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public SortedSet<Long> getOffsets() {
+      return offsets;
+    }
+
+    public boolean equals(Object obj) {
+      if (obj.getClass() != this.getClass()) {
+        return false;
+      }
+      return (((IBucket) obj).name.compareToIgnoreCase(this.name) == 0);
+    }
+  }
+
+  JobConf job = null;
+  BytesRefWritable[] bytesRef = new BytesRefWritable[2];
+
+  public HiveCompactIndexResult(String indexFile, JobConf conf) throws IOException,
+      HiveException {
+    job = conf;
+
+    bytesRef[0] = new BytesRefWritable();
+    bytesRef[1] = new BytesRefWritable();
+
+    if (indexFile != null) {
+      Path indexFilePath = new Path(indexFile);
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus indexStat = fs.getFileStatus(indexFilePath);
+      List<Path> paths = new ArrayList<Path>();
+      if (indexStat.isDir()) {
+        FileStatus[] fss = fs.listStatus(indexFilePath);
+        for (FileStatus f : fss) {
+          paths.add(f.getPath());
+        }
+      } else {
+        paths.add(indexFilePath);
+      }
+
+      for (Path indexFinalPath : paths) {
+        FSDataInputStream ifile = fs.open(indexFinalPath);
+        LineReader lr = new LineReader(ifile, conf);
+        Text line = new Text();
+        while (lr.readLine(line) > 0) {
+          add(line);
+        }
+        // this will close the input stream
+        lr.close();
+      }
+    }
+  }
+
+  Map<String, IBucket> buckets = new HashMap<String, IBucket>();
+
+  private void add(Text line) throws HiveException {
+    String l = line.toString();
+    byte[] bytes = l.getBytes();
+    int firstEnd = 0;
+    int i = 0;
+    for (int index = 0; index < bytes.length; index++) {
+      if (bytes[index] == LazySimpleSerDe.DefaultSeparators[0]) {
+        i++;
+        firstEnd = index;
+      }
+    }
+    if (i > 1) {
+      throw new HiveException(
+          "Bad index file row (index file should only contain two columns: bucket_file_name and offset lists.) ."
+              + line.toString());
+    }
+    String bucketFileName = new String(bytes, 0, firstEnd);
+    IBucket bucket = buckets.get(bucketFileName);
+    if (bucket == null) {
+      bucket = new IBucket(bucketFileName);
+      buckets.put(bucketFileName, bucket);
+    }
+
+    int currentStart = firstEnd + 1;
+    int currentEnd = firstEnd + 1;
+    for (; currentEnd < bytes.length; currentEnd++) {
+      if (bytes[currentEnd] == LazySimpleSerDe.DefaultSeparators[1]) {
+        String one_offset = new String(bytes, currentStart, currentEnd
+            - currentStart);
+        Long offset = Long.parseLong(one_offset);
+        bucket.getOffsets().add(offset);
+        currentStart = currentEnd + 1;
+      }
+    }
+    String one_offset = new String(bytes, currentStart, currentEnd
+        - currentStart);
+    bucket.getOffsets().add(Long.parseLong(one_offset));
+  }
+
+  public boolean contains(FileSplit split) throws HiveException {
+
+    if (buckets == null) {
+      return false;
+    }
+    String bucketName = split.getPath().toString();
+    IBucket bucket = buckets.get(bucketName);
+    if (bucket == null) {
+      bucketName = split.getPath().toUri().getPath();
+      bucket = buckets.get(bucketName);
+      if (bucket == null) {
+        return false;
+      }
+    }
+
+    for (Long offset : bucket.getOffsets()) {
+      if ((offset >= split.getStart())
+          && (offset <= split.getStart() + split.getLength())) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeTask.java?rev=980659&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeTask.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeTask.java Fri Jul 30 06:40:04 2010
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index.compact;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.index.HiveIndex;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+public class IndexMetadataChangeTask extends Task<IndexMetadataChangeWork>{
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  protected int execute(DriverContext driverContext) {
+    
+    try {
+      Hive db = Hive.get(conf);
+      IndexMetadataChangeWork work = this.getWork();
+      String tblName = work.getIndexTbl();
+      Table tbl = db.getTable(work.getDbName(), tblName);
+      if (tbl == null ) {
+        console.printError("Index table can not be null.");
+        return 1;
+      }
+
+      if (!tbl.getTableType().equals(TableType.INDEX_TABLE)) {
+        console.printError("Table " + tbl.getTableName() + " not specified.");
+        return 1;
+      }
+
+      if (tbl.isPartitioned() && work.getPartSpec() == null) {
+        console.printError("Index table is partitioned, but no partition specified.");
+        return 1;
+      }
+      
+      if (work.getPartSpec() != null) {
+        Partition part = db.getPartition(tbl, work.getPartSpec(), false);
+        if (part == null) {
+          console.printError("Partition " + Warehouse.makePartName(work.getPartSpec()).toString()
+              + " does not exist.");
+          return 1;
+        }
+        
+        Path url = new Path(part.getDataLocation().toString());
+        FileSystem fs = url.getFileSystem(conf);
+        FileStatus fstat = fs.getFileStatus(url);
+        
+        part.getParameters().put(HiveIndex.INDEX_TABLE_CREATETIME, Long.toString(fstat.getModificationTime()));
+        db.alterPartition(tbl.getTableName(), part);
+      } else {
+        Path url = new Path(tbl.getDataLocation().toString());
+        FileSystem fs = url.getFileSystem(conf);
+        FileStatus fstat = fs.getFileStatus(url);
+        tbl.getParameters().put(HiveIndex.INDEX_TABLE_CREATETIME, Long.toString(fstat.getModificationTime()));
+        db.alterTable(tbl.getTableName(), tbl);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      console.printError("Error changing index table/partition metadata "
+          + e.getMessage());
+      return 1;
+    }
+    return 0;
+  }
+
+  @Override
+  public String getName() {
+    return "IndexMetadataChangeTask";
+  }
+  
+  @Override
+  public int getType() {
+    return StageType.DDL;
+  }
+
+  @Override
+  protected void localizeMRTmpFilesImpl(Context ctx) {
+  }
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeWork.java?rev=980659&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeWork.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/IndexMetadataChangeWork.java Fri Jul 30 06:40:04 2010
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.index.compact;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class IndexMetadataChangeWork implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private HashMap<String, String> partSpec;
+  private String indexTbl;
+  private String dbName;
+  
+  public IndexMetadataChangeWork() {
+  }
+  
+  public IndexMetadataChangeWork(HashMap<String, String> partSpec,
+      String indexTbl, String dbName) {
+    super();
+    this.partSpec = partSpec;
+    this.indexTbl = indexTbl;
+    this.dbName = dbName;
+  }
+
+  public HashMap<String, String> getPartSpec() {
+    return partSpec;
+  }
+
+  public void setPartSpec(HashMap<String, String> partSpec) {
+    this.partSpec = partSpec;
+  }
+
+  public String getIndexTbl() {
+    return indexTbl;
+  }
+
+  public void setIndexTbl(String indexTbl) {
+    this.indexTbl = indexTbl;
+  }
+  
+  public String getDbName() {
+    return dbName;
+  }
+
+  public void setDbName(String dbName) {
+    this.dbName = dbName;
+  }
+  
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java Fri Jul 30 06:40:04 2010
@@ -18,54 +18,26 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
 import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.ql.exec.ExecMapper;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.hive.shims.HadoopShims.CombineFileInputFormatShim;
-import org.apache.hadoop.hive.shims.HadoopShims.InputSplitShim;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.InvalidInputException;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * BucketizedHiveInputFormat serves the similar function as hiveInputFormat but
@@ -101,8 +73,10 @@ public class BucketizedHiveInputFormat<K
 
     InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
         cloneJobConf);
-    return new BucketizedHiveRecordReader(inputFormat, hsplit, cloneJobConf,
+    BucketizedHiveRecordReader<K, V> rr= new BucketizedHiveRecordReader(inputFormat, hsplit, cloneJobConf,
         reporter);
+    rr.initIOContext(hsplit, cloneJobConf, inputFormatClass);
+    return rr;
   }
 
   protected FileStatus[] listStatus(JobConf job, Path path) throws IOException {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveRecordReader.java Fri Jul 30 06:40:04 2010
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.ExecMapper;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.InputFormat;
@@ -36,7 +35,7 @@ import org.apache.hadoop.mapred.Sequence
  * file.
  */
 public class BucketizedHiveRecordReader<K extends WritableComparable, V extends Writable>
-    implements RecordReader<K, V> {
+    extends HiveContextAwareRecordReader<K, V> {
   protected final BucketizedHiveInputSplit split;
   protected final InputFormat inputFormat;
   protected final JobConf jobConf;
@@ -55,7 +54,7 @@ public class BucketizedHiveRecordReader<
     initNextRecordReader();
   }
 
-  public void close() throws IOException {
+  public void doClose() throws IOException {
     if (curReader != null) {
       curReader.close();
       curReader = null;
@@ -86,7 +85,7 @@ public class BucketizedHiveRecordReader<
         / (float) (split.getLength()));
   }
 
-  public boolean next(K key, V value) throws IOException {
+  public boolean doNext(K key, V value) throws IOException {
     while ((curReader == null) || !curReader.next(key, value)) {
       if (!initNextRecordReader()) {
         return false;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java?rev=980659&r1=980658&r2=980659&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java Fri Jul 30 06:40:04 2010
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapred.Reporter
  * @param <V>
  */
 public class CombineHiveRecordReader<K extends WritableComparable, V extends Writable>
-    implements RecordReader<K, V> {
+    extends HiveContextAwareRecordReader<K, V> {
 
   private final RecordReader recordReader;
 
@@ -66,9 +66,10 @@ public class CombineHiveRecordReader<K e
         .getLocations());
 
     this.recordReader = inputFormat.getRecordReader(fsplit, job, reporter);
+    this.initIOContext(fsplit, job, inputFormatClass);
   }
 
-  public void close() throws IOException {
+  public void doClose() throws IOException {
     recordReader.close();
   }
 
@@ -88,7 +89,7 @@ public class CombineHiveRecordReader<K e
     return recordReader.getProgress();
   }
 
-  public boolean next(K key, V value) throws IOException {
+  public boolean doNext(K key, V value) throws IOException {
     if (ExecMapper.getDone()) {
       return false;
     }

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=980659&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Fri Jul 30 06:40:04 2010
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+public abstract class HiveContextAwareRecordReader<K, V> implements RecordReader<K, V> {
+  
+  private boolean initDone = false;
+  
+  /** 
+   * Reads the next key/value pair from the input for processing.
+   *
+   * @param key the key to read data into
+   * @param value the value to read data into
+   * @return true if a key/value was read, false if at EOF
+   */
+  public abstract boolean doNext(K key, V value) throws IOException;
+  
+  /** 
+   * Close this {@link InputSplit} to future operations.
+   * 
+   * @throws IOException
+   */ 
+  public abstract void doClose() throws IOException;
+  
+  private IOContext ioCxtRef =  null;
+  
+  @Override
+  public void close() throws IOException {
+    doClose();
+    initDone = false;
+    ioCxtRef = null;
+  }
+  
+  @Override
+  public boolean next(K key, V value) throws IOException {
+    if(!initDone) {
+      throw new IOException("Hive IOContext is not inited.");
+    }
+    updateIOContext();
+    return doNext(key, value);
+  }
+  
+  protected void updateIOContext()
+      throws IOException {
+    long pointerPos = this.getPos();
+    if (!ioCxtRef.isBlockPointer) {
+      ioCxtRef.currentBlockStart = pointerPos;
+      return;
+    }
+
+    if (ioCxtRef.nextBlockStart == -1) {
+      ioCxtRef.nextBlockStart = pointerPos;
+    }
+    if (pointerPos != ioCxtRef.nextBlockStart) {
+      // the reader pointer has moved to the end of next block, or the end of
+      // current record.
+
+      ioCxtRef.currentBlockStart = ioCxtRef.nextBlockStart;
+      ioCxtRef.nextBlockStart = pointerPos;
+    }
+  }
+  
+  public IOContext getIOContext() {
+    return IOContext.get();
+  }
+  
+  public void initIOContext(long startPos, boolean isBlockPointer, String inputFile) {
+    ioCxtRef = this.getIOContext();
+    ioCxtRef.currentBlockStart = startPos;
+    ioCxtRef.isBlockPointer = isBlockPointer;
+    ioCxtRef.inputFile = inputFile;
+    initDone = true;
+  }
+
+  public void initIOContext(FileSplit split, JobConf job,
+      Class inputFormatClass) throws IOException {
+    boolean blockPointer = false;
+    long blockStart = -1;    
+    FileSplit fileSplit = (FileSplit) split;
+    Path path = fileSplit.getPath();
+    FileSystem fs = path.getFileSystem(job);
+    if (inputFormatClass.getName().contains("SequenceFile")) {
+      SequenceFile.Reader in = new SequenceFile.Reader(fs, path, job);
+      blockPointer = in.isBlockCompressed();
+      in.sync(fileSplit.getStart());
+      blockStart = in.getPosition();
+      in.close();
+    } else if (inputFormatClass.getName().contains("RCFile")) {
+      RCFile.Reader in = new RCFile.Reader(fs, path, job);
+      blockPointer = true;
+      in.sync(fileSplit.getStart());
+      blockStart = in.getPosition();
+      in.close();
+    }
+    this.initIOContext(blockStart, blockPointer, split.getPath().toString());
+  }
+}
\ No newline at end of file



Mime
View raw message