incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tra...@apache.org
Subject svn commit: r1383152 [10/27] - in /incubator/hcatalog/trunk: ./ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ ...
Date Mon, 10 Sep 2012 23:29:03 GMT
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java Mon Sep 10 23:28:55 2012
@@ -75,299 +75,299 @@ import org.slf4j.LoggerFactory;
  */
 public class NotificationListener extends MetaStoreEventListener {
 
-  private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
-  protected Session session;
-  protected Connection conn;
-
-  /**
-   * Create message bus connection and session in constructor.
-   */
-  public NotificationListener(final Configuration conf) {
-
-    super(conf);
-    createConnection();
-  }
-
-  private static String getTopicName(Partition partition,
-      ListenerEvent partitionEvent) throws MetaException {
-    try {
-      return partitionEvent.getHandler()
-          .get_table(partition.getDbName(), partition.getTableName())
-          .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
-    } catch (NoSuchObjectException e) {
-      throw new MetaException(e.toString());
-    }
-  }
-
-  @Override
-  public void onAddPartition(AddPartitionEvent partitionEvent)
-      throws MetaException {
-    // Subscriber can get notification of newly add partition in a
-    // particular table by listening on a topic named "dbName.tableName"
-    // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
-    if (partitionEvent.getStatus()) {
-
-      Partition partition = partitionEvent.getPartition();
-      String topicName = getTopicName(partition, partitionEvent);
-      if (topicName != null && !topicName.equals("")) {
-        send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT);
-      } else {
-        LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
-            + partition.getDbName()
-            + "."
-            + partition.getTableName()
-            + " To enable notifications for this table, please do alter table set properties ("
-            + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
-            + "=<dbname>.<tablename>) or whatever you want topic name to be.");
-      }
-    }
-
-  }
-
-  @Override
-  public void onDropPartition(DropPartitionEvent partitionEvent)
-      throws MetaException {
-    // Subscriber can get notification of dropped partition in a
-    // particular table by listening on a topic named "dbName.tableName"
-    // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION"
-
-    // Datanucleus throws NPE when we try to serialize a partition object
-    // retrieved from metastore. To workaround that we reset following objects
-
-    if (partitionEvent.getStatus()) {
-      Partition partition = partitionEvent.getPartition();
-      StorageDescriptor sd = partition.getSd();
-      sd.setBucketCols(new ArrayList<String>());
-      sd.setSortCols(new ArrayList<Order>());
-      sd.setParameters(new HashMap<String, String>());
-      sd.getSerdeInfo().setParameters(new HashMap<String, String>());
-      String topicName = getTopicName(partition, partitionEvent);
-      if (topicName != null && !topicName.equals("")) {
-        send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT);
-      } else {
-        LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
-            + partition.getDbName()
-            + "."
-            + partition.getTableName()
-            + " To enable notifications for this table, please do alter table set properties ("
-            + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
-            + "=<dbname>.<tablename>) or whatever you want topic name to be.");
-      }
-    }
-  }
-
-  @Override
-  public void onCreateDatabase(CreateDatabaseEvent dbEvent)
-      throws MetaException {
-    // Subscriber can get notification about addition of a database in HCAT
-    // by listening on a topic named "HCAT" and message selector string
-    // as "HCAT_EVENT = HCAT_ADD_DATABASE"
-    if (dbEvent.getStatus())
-      send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
-          .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT);
-  }
-
-  @Override
-  public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
-    // Subscriber can get notification about drop of a database in HCAT
-    // by listening on a topic named "HCAT" and message selector string
-    // as "HCAT_EVENT = HCAT_DROP_DATABASE"
-    if (dbEvent.getStatus())
-      send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
-          .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT);
-  }
-
-  @Override
-  public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
-    // Subscriber can get notification about addition of a table in HCAT
-    // by listening on a topic named "HCAT" and message selector string
-    // as "HCAT_EVENT = HCAT_ADD_TABLE"
-    if (tableEvent.getStatus()) {
-      Table tbl = tableEvent.getTable();
-      HMSHandler handler = tableEvent.getHandler();
-      HiveConf conf = handler.getHiveConf();
-      Table newTbl;
-      try {
-        newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName())
-            .deepCopy();
-        newTbl.getParameters().put(
-            HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
-            getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "."
-                + newTbl.getTableName().toLowerCase());
-        handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
-      } catch (InvalidOperationException e) {
-        MetaException me = new MetaException(e.toString());
-        me.initCause(e);
-        throw me;
-      } catch (NoSuchObjectException e) {
-        MetaException me = new MetaException(e.toString());
-        me.initCause(e);
-        throw me;
-      }
-      send(newTbl, getTopicPrefix(conf) + "."
-          + newTbl.getDbName().toLowerCase(),
-          HCatConstants.HCAT_ADD_TABLE_EVENT);
-    }
-  }
-
-  private String getTopicPrefix(HiveConf conf) {
-    return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
-        HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
-  }
-
-  @Override
-  public void onDropTable(DropTableEvent tableEvent) throws MetaException {
-    // Subscriber can get notification about drop of a table in HCAT
-    // by listening on a topic named "HCAT" and message selector string
-    // as "HCAT_EVENT = HCAT_DROP_TABLE"
-
-    // Datanucleus throws NPE when we try to serialize a table object
-    // retrieved from metastore. To workaround that we reset following objects
-
-    if (tableEvent.getStatus()) {
-      Table table = tableEvent.getTable();
-      StorageDescriptor sd = table.getSd();
-      sd.setBucketCols(new ArrayList<String>());
-      sd.setSortCols(new ArrayList<Order>());
-      sd.setParameters(new HashMap<String, String>());
-      sd.getSerdeInfo().setParameters(new HashMap<String, String>());
-      send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "."
-          + table.getDbName().toLowerCase(),
-          HCatConstants.HCAT_DROP_TABLE_EVENT);
-    }
-  }
-
-  /**
-   * @param msgBody
-   *          is the metastore object. It is sent in full such that if
-   *          subscriber is really interested in details, it can reconstruct it
-   *          fully. In case of finalize_partition message this will be string
-   *          specification of the partition.
-   * @param topicName
-   *          is the name on message broker on which message is sent.
-   * @param event
-   *          is the value of HCAT_EVENT property in message. It can be used to
-   *          select messages in client side.
-   */
-  protected void send(Object msgBody, String topicName, String event) {
-
-    try {
-
-      Destination topic = null;
-      if (null == session) {
-        // this will happen, if we never able to establish a connection.
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
+    protected Session session;
+    protected Connection conn;
+
+    /**
+     * Create message bus connection and session in constructor.
+     */
+    public NotificationListener(final Configuration conf) {
+
+        super(conf);
         createConnection();
-        if (null == session) {
-          // Still not successful, return from here.
-          LOG.error("Invalid session. Failed to send message on topic: "
-              + topicName + " event: " + event);
-          return;
+    }
+
+    private static String getTopicName(Partition partition,
+                                       ListenerEvent partitionEvent) throws MetaException {
+        try {
+            return partitionEvent.getHandler()
+                .get_table(partition.getDbName(), partition.getTableName())
+                .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
+        } catch (NoSuchObjectException e) {
+            throw new MetaException(e.toString());
         }
-      }
-      try {
-        // Topics are created on demand. If it doesn't exist on broker it will
-        // be created when broker receives this message.
-        topic = session.createTopic(topicName);
-      } catch (IllegalStateException ise) {
-        // this will happen if we were able to establish connection once, but
-        // its no longer valid,
-        // ise is thrown, catch it and retry.
-        LOG.error("Seems like connection is lost. Retrying", ise);
-        createConnection();
-        topic = session.createTopic(topicName);
-      }
-      if (null == topic) {
-        // Still not successful, return from here.
-        LOG.error("Invalid session. Failed to send message on topic: "
-            + topicName + " event: " + event);
-        return;
-      }
-      MessageProducer producer = session.createProducer(topic);
-      Message msg;
-      if (msgBody instanceof Map) {
-        MapMessage mapMsg = session.createMapMessage();
-        Map<String, String> incomingMap = (Map<String, String>) msgBody;
-        for (Entry<String, String> partCol : incomingMap.entrySet()) {
-          mapMsg.setString(partCol.getKey(), partCol.getValue());
+    }
+
+    @Override
+    public void onAddPartition(AddPartitionEvent partitionEvent)
+        throws MetaException {
+        // Subscriber can get notification of newly add partition in a
+        // particular table by listening on a topic named "dbName.tableName"
+        // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
+        if (partitionEvent.getStatus()) {
+
+            Partition partition = partitionEvent.getPartition();
+            String topicName = getTopicName(partition, partitionEvent);
+            if (topicName != null && !topicName.equals("")) {
+                send(partition, topicName, HCatConstants.HCAT_ADD_PARTITION_EVENT);
+            } else {
+                LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+                    + partition.getDbName()
+                    + "."
+                    + partition.getTableName()
+                    + " To enable notifications for this table, please do alter table set properties ("
+                    + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+                    + "=<dbname>.<tablename>) or whatever you want topic name to be.");
+            }
+        }
+
+    }
+
+    @Override
+    public void onDropPartition(DropPartitionEvent partitionEvent)
+        throws MetaException {
+        // Subscriber can get notification of dropped partition in a
+        // particular table by listening on a topic named "dbName.tableName"
+        // and message selector string as "HCAT_EVENT = HCAT_DROP_PARTITION"
+
+        // Datanucleus throws NPE when we try to serialize a partition object
+        // retrieved from metastore. To workaround that we reset following objects
+
+        if (partitionEvent.getStatus()) {
+            Partition partition = partitionEvent.getPartition();
+            StorageDescriptor sd = partition.getSd();
+            sd.setBucketCols(new ArrayList<String>());
+            sd.setSortCols(new ArrayList<Order>());
+            sd.setParameters(new HashMap<String, String>());
+            sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+            String topicName = getTopicName(partition, partitionEvent);
+            if (topicName != null && !topicName.equals("")) {
+                send(partition, topicName, HCatConstants.HCAT_DROP_PARTITION_EVENT);
+            } else {
+                LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+                    + partition.getDbName()
+                    + "."
+                    + partition.getTableName()
+                    + " To enable notifications for this table, please do alter table set properties ("
+                    + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+                    + "=<dbname>.<tablename>) or whatever you want topic name to be.");
+            }
         }
-        msg = mapMsg;
-      } else {
-        msg = session.createObjectMessage((Serializable) msgBody);
-      }
-
-      msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
-      producer.send(msg);
-      // Message must be transacted before we return.
-      session.commit();
-    } catch (Exception e) {
-      // Gobble up the exception. Message delivery is best effort.
-      LOG.error("Failed to send message on topic: " + topicName + " event: "
-          + event, e);
-    }
-  }
-
-  protected void createConnection() {
-
-    Context jndiCntxt;
-    try {
-      jndiCntxt = new InitialContext();
-      ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
-          .lookup("ConnectionFactory");
-      Connection conn = connFac.createConnection();
-      conn.start();
-      conn.setExceptionListener(new ExceptionListener() {
-        @Override
-        public void onException(JMSException jmse) {
-          LOG.error(jmse.toString());
+    }
+
+    @Override
+    public void onCreateDatabase(CreateDatabaseEvent dbEvent)
+        throws MetaException {
+        // Subscriber can get notification about addition of a database in HCAT
+        // by listening on a topic named "HCAT" and message selector string
+        // as "HCAT_EVENT = HCAT_ADD_DATABASE"
+        if (dbEvent.getStatus())
+            send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
+                .getHiveConf()), HCatConstants.HCAT_ADD_DATABASE_EVENT);
+    }
+
+    @Override
+    public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
+        // Subscriber can get notification about drop of a database in HCAT
+        // by listening on a topic named "HCAT" and message selector string
+        // as "HCAT_EVENT = HCAT_DROP_DATABASE"
+        if (dbEvent.getStatus())
+            send(dbEvent.getDatabase(), getTopicPrefix(dbEvent.getHandler()
+                .getHiveConf()), HCatConstants.HCAT_DROP_DATABASE_EVENT);
+    }
+
+    @Override
+    public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
+        // Subscriber can get notification about addition of a table in HCAT
+        // by listening on a topic named "HCAT" and message selector string
+        // as "HCAT_EVENT = HCAT_ADD_TABLE"
+        if (tableEvent.getStatus()) {
+            Table tbl = tableEvent.getTable();
+            HMSHandler handler = tableEvent.getHandler();
+            HiveConf conf = handler.getHiveConf();
+            Table newTbl;
+            try {
+                newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName())
+                    .deepCopy();
+                newTbl.getParameters().put(
+                    HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
+                    getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "."
+                        + newTbl.getTableName().toLowerCase());
+                handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
+            } catch (InvalidOperationException e) {
+                MetaException me = new MetaException(e.toString());
+                me.initCause(e);
+                throw me;
+            } catch (NoSuchObjectException e) {
+                MetaException me = new MetaException(e.toString());
+                me.initCause(e);
+                throw me;
+            }
+            send(newTbl, getTopicPrefix(conf) + "."
+                + newTbl.getDbName().toLowerCase(),
+                HCatConstants.HCAT_ADD_TABLE_EVENT);
         }
-      });
-      // We want message to be sent when session commits, thus we run in
-      // transacted mode.
-      session = conn.createSession(true, Session.SESSION_TRANSACTED);
-    } catch (NamingException e) {
-      LOG.error("JNDI error while setting up Message Bus connection. "
-          + "Please make sure file named 'jndi.properties' is in "
-          + "classpath and contains appropriate key-value pairs.", e);
-    } catch (JMSException e) {
-      LOG.error("Failed to initialize connection to message bus", e);
-    } catch (Throwable t) {
-      LOG.error("Unable to connect to JMS provider", t);
-    }
-  }
-
-  @Override
-  protected void finalize() throws Throwable {
-    // Close the connection before dying.
-    try {
-      if (null != session)
-        session.close();
-      if (conn != null) {
-        conn.close();
-      }
-
-    } catch (Exception ignore) {
-      LOG.info("Failed to close message bus connection.", ignore);
-    }
-  }
-
-  @Override
-  public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
-      throws MetaException {
-    if (lpde.getStatus())
-      send(
-          lpde.getPartitionName(),
-          lpde.getTable().getParameters()
-              .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),
-          HCatConstants.HCAT_PARTITION_DONE_EVENT);
-  }
-
-  @Override
-  public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
-    // no-op
-  }
-
-  @Override
-  public void onAlterTable(AlterTableEvent ate) throws MetaException {
-    // no-op
-  }
+    }
+
+    private String getTopicPrefix(HiveConf conf) {
+        return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
+            HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
+    }
+
+    @Override
+    public void onDropTable(DropTableEvent tableEvent) throws MetaException {
+        // Subscriber can get notification about drop of a table in HCAT
+        // by listening on a topic named "HCAT" and message selector string
+        // as "HCAT_EVENT = HCAT_DROP_TABLE"
+
+        // Datanucleus throws NPE when we try to serialize a table object
+        // retrieved from metastore. To workaround that we reset following objects
+
+        if (tableEvent.getStatus()) {
+            Table table = tableEvent.getTable();
+            StorageDescriptor sd = table.getSd();
+            sd.setBucketCols(new ArrayList<String>());
+            sd.setSortCols(new ArrayList<Order>());
+            sd.setParameters(new HashMap<String, String>());
+            sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+            send(table, getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "."
+                + table.getDbName().toLowerCase(),
+                HCatConstants.HCAT_DROP_TABLE_EVENT);
+        }
+    }
+
+    /**
+     * @param msgBody
+     *          is the metastore object. It is sent in full such that if
+     *          subscriber is really interested in details, it can reconstruct it
+     *          fully. In case of finalize_partition message this will be string
+     *          specification of the partition.
+     * @param topicName
+     *          is the name on message broker on which message is sent.
+     * @param event
+     *          is the value of HCAT_EVENT property in message. It can be used to
+     *          select messages in client side.
+     */
+    protected void send(Object msgBody, String topicName, String event) {
+
+        try {
+
+            Destination topic = null;
+            if (null == session) {
+                // this will happen, if we never able to establish a connection.
+                createConnection();
+                if (null == session) {
+                    // Still not successful, return from here.
+                    LOG.error("Invalid session. Failed to send message on topic: "
+                        + topicName + " event: " + event);
+                    return;
+                }
+            }
+            try {
+                // Topics are created on demand. If it doesn't exist on broker it will
+                // be created when broker receives this message.
+                topic = session.createTopic(topicName);
+            } catch (IllegalStateException ise) {
+                // this will happen if we were able to establish connection once, but
+                // its no longer valid,
+                // ise is thrown, catch it and retry.
+                LOG.error("Seems like connection is lost. Retrying", ise);
+                createConnection();
+                topic = session.createTopic(topicName);
+            }
+            if (null == topic) {
+                // Still not successful, return from here.
+                LOG.error("Invalid session. Failed to send message on topic: "
+                    + topicName + " event: " + event);
+                return;
+            }
+            MessageProducer producer = session.createProducer(topic);
+            Message msg;
+            if (msgBody instanceof Map) {
+                MapMessage mapMsg = session.createMapMessage();
+                Map<String, String> incomingMap = (Map<String, String>) msgBody;
+                for (Entry<String, String> partCol : incomingMap.entrySet()) {
+                    mapMsg.setString(partCol.getKey(), partCol.getValue());
+                }
+                msg = mapMsg;
+            } else {
+                msg = session.createObjectMessage((Serializable) msgBody);
+            }
+
+            msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
+            producer.send(msg);
+            // Message must be transacted before we return.
+            session.commit();
+        } catch (Exception e) {
+            // Gobble up the exception. Message delivery is best effort.
+            LOG.error("Failed to send message on topic: " + topicName + " event: "
+                + event, e);
+        }
+    }
+
+    protected void createConnection() {
+
+        Context jndiCntxt;
+        try {
+            jndiCntxt = new InitialContext();
+            ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
+                .lookup("ConnectionFactory");
+            Connection conn = connFac.createConnection();
+            conn.start();
+            conn.setExceptionListener(new ExceptionListener() {
+                @Override
+                public void onException(JMSException jmse) {
+                    LOG.error(jmse.toString());
+                }
+            });
+            // We want message to be sent when session commits, thus we run in
+            // transacted mode.
+            session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        } catch (NamingException e) {
+            LOG.error("JNDI error while setting up Message Bus connection. "
+                + "Please make sure file named 'jndi.properties' is in "
+                + "classpath and contains appropriate key-value pairs.", e);
+        } catch (JMSException e) {
+            LOG.error("Failed to initialize connection to message bus", e);
+        } catch (Throwable t) {
+            LOG.error("Unable to connect to JMS provider", t);
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        // Close the connection before dying.
+        try {
+            if (null != session)
+                session.close();
+            if (conn != null) {
+                conn.close();
+            }
+
+        } catch (Exception ignore) {
+            LOG.info("Failed to close message bus connection.", ignore);
+        }
+    }
+
+    @Override
+    public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
+        throws MetaException {
+        if (lpde.getStatus())
+            send(
+                lpde.getPartitionName(),
+                lpde.getTable().getParameters()
+                    .get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),
+                HCatConstants.HCAT_PARTITION_DONE_EVENT);
+    }
+
+    @Override
+    public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
+        // no-op
+    }
+
+    @Override
+    public void onAlterTable(AlterTableEvent ate) throws MetaException {
+        // no-op
+    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java Mon Sep 10 23:28:55 2012
@@ -45,7 +45,7 @@ class DefaultOutputCommitterContainer ex
      * @throws IOException
      */
     public DefaultOutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter baseCommitter) throws IOException {
-        super(context,baseCommitter);
+        super(context, baseCommitter);
     }
 
     @Override
@@ -95,8 +95,8 @@ class DefaultOutputCommitterContainer ex
             HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration());
             client = HCatUtil.getHiveClient(hiveConf);
             String tokenStrForm = client.getTokenStrForm();
-            if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
-              client.cancelDelegationToken(tokenStrForm);
+            if (tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+                client.cancelDelegationToken(tokenStrForm);
             }
         } catch (Exception e) {
             LOG.warn("Failed to cancel delegation token", e);

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java Mon Sep 10 23:28:55 2012
@@ -42,8 +42,8 @@ class DefaultOutputFormatContainer exten
     private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
 
     static {
-      NUMBER_FORMAT.setMinimumIntegerDigits(5);
-      NUMBER_FORMAT.setGroupingUsed(false);
+        NUMBER_FORMAT.setMinimumIntegerDigits(5);
+        NUMBER_FORMAT.setGroupingUsed(false);
     }
 
     public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<WritableComparable<?>, Writable> of) {
@@ -52,7 +52,7 @@ class DefaultOutputFormatContainer exten
 
     static synchronized String getOutputName(int partition) {
         return "part-" + NUMBER_FORMAT.format(partition);
-      }
+    }
 
     /**
      * Get the record writer for the job. Uses the storagehandler's OutputFormat
@@ -66,7 +66,7 @@ class DefaultOutputFormatContainer exten
     getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
         String name = getOutputName(context.getTaskAttemptID().getTaskID().getId());
         return new DefaultRecordWriterContainer(context,
-                getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context)));
+            getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context)));
     }
 
 
@@ -80,7 +80,7 @@ class DefaultOutputFormatContainer exten
      */
     @Override
     public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-            throws IOException, InterruptedException {
+        throws IOException, InterruptedException {
         return new DefaultOutputCommitterContainer(context, new JobConf(context.getConfiguration()).getOutputCommitter());
     }
 
@@ -93,7 +93,7 @@ class DefaultOutputFormatContainer exten
     public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
         org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getBaseOutputFormat();
         JobConf jobConf = new JobConf(context.getConfiguration());
-        outputFormat.checkOutputSpecs(null,jobConf);
+        outputFormat.checkOutputSpecs(null, jobConf);
         HCatUtil.copyConf(jobConf, context.getConfiguration());
     }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java Mon Sep 10 23:28:55 2012
@@ -49,32 +49,32 @@ class DefaultRecordWriterContainer exten
      */
     public DefaultRecordWriterContainer(TaskAttemptContext context,
                                         org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter) throws IOException, InterruptedException {
-        super(context,baseRecordWriter);
+        super(context, baseRecordWriter);
         jobInfo = HCatOutputFormat.getJobInfo(context);
         storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
         HCatOutputFormat.configureOutputStorageHandler(context);
-        serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),context.getConfiguration());
+        serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration());
         hcatRecordOI = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema());
         try {
             InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo);
         } catch (SerDeException e) {
-            throw new IOException("Failed to initialize SerDe",e);
+            throw new IOException("Failed to initialize SerDe", e);
         }
     }
 
     @Override
     public void close(TaskAttemptContext context) throws IOException,
-            InterruptedException {
+        InterruptedException {
         getBaseRecordWriter().close(InternalUtil.createReporter(context));
     }
 
     @Override
     public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
-            InterruptedException {
+        InterruptedException {
         try {
             getBaseRecordWriter().write(null, serDe.serialize(value.getAll(), hcatRecordOI));
         } catch (SerDeException e) {
-            throw new IOException("Failed to serialize object",e);
+            throw new IOException("Failed to serialize object", e);
         }
     }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Mon Sep 10 23:28:55 2012
@@ -87,34 +87,34 @@ class FileOutputCommitterContainer exten
      * @throws IOException
      */
     public FileOutputCommitterContainer(JobContext context,
-                                                          org.apache.hadoop.mapred.OutputCommitter baseCommitter) throws IOException {
+                                        org.apache.hadoop.mapred.OutputCommitter baseCommitter) throws IOException {
         super(context, baseCommitter);
         jobInfo = HCatOutputFormat.getJobInfo(context);
         dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed();
 
         this.partitionsDiscovered = !dynamicPartitioningUsed;
-        cachedStorageHandler = HCatUtil.getStorageHandler(context.getConfiguration(),jobInfo.getTableInfo().getStorerInfo());
+        cachedStorageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
     }
 
     @Override
     public void abortTask(TaskAttemptContext context) throws IOException {
-        if (!dynamicPartitioningUsed){
+        if (!dynamicPartitioningUsed) {
             getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context));
         }
     }
 
     @Override
     public void commitTask(TaskAttemptContext context) throws IOException {
-        if (!dynamicPartitioningUsed){
+        if (!dynamicPartitioningUsed) {
             getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context));
         }
     }
 
     @Override
     public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
-        if (!dynamicPartitioningUsed){
+        if (!dynamicPartitioningUsed) {
             return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context));
-        }else{
+        } else {
             // called explicitly through FileRecordWriterContainer.close() if dynamic - return false by default
             return false;
         }
@@ -122,7 +122,7 @@ class FileOutputCommitterContainer exten
 
     @Override
     public void setupJob(JobContext context) throws IOException {
-        if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+        if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
             getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context));
         }
         // in dynamic usecase, called through FileRecordWriterContainer
@@ -130,7 +130,7 @@ class FileOutputCommitterContainer exten
 
     @Override
     public void setupTask(TaskAttemptContext context) throws IOException {
-        if (!dynamicPartitioningUsed){
+        if (!dynamicPartitioningUsed) {
             getBaseOutputCommitter().setupTask(HCatMapRedUtil.createTaskAttemptContext(context));
         }
     }
@@ -138,16 +138,15 @@ class FileOutputCommitterContainer exten
     @Override
     public void abortJob(JobContext jobContext, State state) throws IOException {
         org.apache.hadoop.mapred.JobContext
-                mapRedJobContext = HCatMapRedUtil.createJobContext(jobContext);
-        if (dynamicPartitioningUsed){
+            mapRedJobContext = HCatMapRedUtil.createJobContext(jobContext);
+        if (dynamicPartitioningUsed) {
             discoverPartitions(jobContext);
         }
 
-        if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+        if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
             getBaseOutputCommitter().abortJob(mapRedJobContext, state);
-        }
-        else if (dynamicPartitioningUsed){
-            for(JobContext currContext : contextDiscoveredByPath.values()){
+        } else if (dynamicPartitioningUsed) {
+            for (JobContext currContext : contextDiscoveredByPath.values()) {
                 try {
                     new JobConf(currContext.getConfiguration()).getOutputCommitter().abortJob(currContext, state);
                 } catch (Exception e) {
@@ -166,12 +165,12 @@ class FileOutputCommitterContainer exten
             // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in
             // the conf will not be set
             String tokenStrForm = client.getTokenStrForm();
-            if(tokenStrForm != null && jobContext.getConfiguration().get
-                    (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+            if (tokenStrForm != null && jobContext.getConfiguration().get
+                (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
                 client.cancelDelegationToken(tokenStrForm);
             }
-        } catch(Exception e) {
-            if( e instanceof HCatException ) {
+        } catch (Exception e) {
+            if (e instanceof HCatException) {
                 throw (HCatException) e;
             } else {
                 throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
@@ -182,10 +181,10 @@ class FileOutputCommitterContainer exten
 
         Path src;
         OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
-        if (dynamicPartitioningUsed){
+        if (dynamicPartitioningUsed) {
             src = new Path(getPartitionRootLocation(jobInfo.getLocation(),
                 jobInfo.getTableInfo().getTable().getPartitionKeysSize()));
-        }else{
+        } else {
             src = new Path(jobInfo.getLocation());
         }
         FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
@@ -195,31 +194,31 @@ class FileOutputCommitterContainer exten
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
     static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
-            "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+        "mapreduce.fileoutputcommitter.marksuccessfuljobs";
 
     private static boolean getOutputDirMarking(Configuration conf) {
         return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
-                false);
+            false);
     }
 
     @Override
     public void commitJob(JobContext jobContext) throws IOException {
-        if (dynamicPartitioningUsed){
+        if (dynamicPartitioningUsed) {
             discoverPartitions(jobContext);
         }
-        if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+        if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
             getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext));
         }
         // create _SUCCESS FILE if so requested.
         OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
-        if(getOutputDirMarking(jobContext.getConfiguration())) {
+        if (getOutputDirMarking(jobContext.getConfiguration())) {
             Path outputPath = new Path(jobInfo.getLocation());
             if (outputPath != null) {
                 FileSystem fileSys = outputPath.getFileSystem(jobContext.getConfiguration());
                 // create a file in the folder to mark it
                 if (fileSys.exists(outputPath)) {
                     Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
-                    if(!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob()
+                    if (!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob()
                         fileSys.create(filePath).close();
                     }
                 }
@@ -231,7 +230,7 @@ class FileOutputCommitterContainer exten
     @Override
     public void cleanupJob(JobContext context) throws IOException {
 
-        if (dynamicPartitioningUsed){
+        if (dynamicPartitioningUsed) {
             discoverPartitions(context);
         }
 
@@ -242,13 +241,12 @@ class FileOutputCommitterContainer exten
         Path tblPath = new Path(table.getTTable().getSd().getLocation());
         FileSystem fs = tblPath.getFileSystem(conf);
 
-        if( table.getPartitionKeys().size() == 0 ) {
+        if (table.getPartitionKeys().size() == 0) {
             //non partitioned table
-            if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-               getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
-            }
-            else if (dynamicPartitioningUsed){
-                for(JobContext currContext : contextDiscoveredByPath.values()){
+            if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+                getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
+            } else if (dynamicPartitioningUsed) {
+                for (JobContext currContext : contextDiscoveredByPath.values()) {
                     try {
                         JobConf jobConf = new JobConf(currContext.getConfiguration());
                         jobConf.getOutputCommitter().cleanupJob(currContext);
@@ -285,35 +283,35 @@ class FileOutputCommitterContainer exten
             FsPermission perms = tblStat.getPermission();
 
             List<Partition> partitionsToAdd = new ArrayList<Partition>();
-            if (!dynamicPartitioningUsed){
+            if (!dynamicPartitioningUsed) {
                 partitionsToAdd.add(
-                        constructPartition(
-                                context,
-                                tblPath.toString(), jobInfo.getPartitionValues()
-                                ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
-                                ,table, fs
-                                ,grpName,perms));
-            }else{
-                for (Entry<String,Map<String,String>> entry : partitionsDiscoveredByPath.entrySet()){
+                    constructPartition(
+                        context,
+                        tblPath.toString(), jobInfo.getPartitionValues()
+                        , jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+                        , table, fs
+                        , grpName, perms));
+            } else {
+                for (Entry<String, Map<String, String>> entry : partitionsDiscoveredByPath.entrySet()) {
                     partitionsToAdd.add(
-                            constructPartition(
-                                    context,
-                                    getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue()
-                                    ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
-                                    ,table, fs
-                                    ,grpName,perms));
+                        constructPartition(
+                            context,
+                            getPartitionRootLocation(entry.getKey(), entry.getValue().size()), entry.getValue()
+                            , jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+                            , table, fs
+                            , grpName, perms));
                 }
             }
 
             //Publish the new partition(s)
-            if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
+            if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())) {
 
                 Path src = new Path(ptnRootLocation);
 
                 // check here for each dir we're copying out, to see if it already exists, error out if so
-                moveTaskOutputs(fs, src, src, tblPath,true);
+                moveTaskOutputs(fs, src, src, tblPath, true);
 
-                moveTaskOutputs(fs, src, src, tblPath,false);
+                moveTaskOutputs(fs, src, src, tblPath, false);
                 fs.delete(src, true);
 
 
@@ -326,18 +324,18 @@ class FileOutputCommitterContainer exten
                 try {
                     client.add_partitions(partitionsToAdd);
                     partitionsAdded = partitionsToAdd;
-                } catch (Exception e){
+                } catch (Exception e) {
                     // There was an error adding partitions : rollback fs copy and rethrow
-                    for (Partition p : partitionsToAdd){
+                    for (Partition p : partitionsToAdd) {
                         Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
-                        if (fs.exists(ptnPath)){
-                            fs.delete(ptnPath,true);
+                        if (fs.exists(ptnPath)) {
+                            fs.delete(ptnPath, true);
                         }
                     }
                     throw e;
                 }
 
-            }else{
+            } else {
                 // no harProcessor, regular operation
 
                 // No duplicate partition publish case to worry about because we'll
@@ -346,37 +344,37 @@ class FileOutputCommitterContainer exten
                 client.add_partitions(partitionsToAdd);
                 partitionsAdded = partitionsToAdd;
 
-                if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
+                if (dynamicPartitioningUsed && (partitionsAdded.size() > 0)) {
                     Path src = new Path(ptnRootLocation);
-                    moveTaskOutputs(fs, src, src, tblPath,false);
+                    moveTaskOutputs(fs, src, src, tblPath, false);
                     fs.delete(src, true);
                 }
 
             }
 
-            if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+            if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
                 getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
             }
 
-            if(Security.getInstance().isSecurityEnabled()) {
+            if (Security.getInstance().isSecurityEnabled()) {
                 Security.getInstance().cancelToken(client, context);
             }
         } catch (Exception e) {
 
-            if( partitionsAdded.size() > 0 ) {
+            if (partitionsAdded.size() > 0) {
                 try {
                     //baseCommitter.cleanupJob failed, try to clean up the metastore
-                    for (Partition p : partitionsAdded){
+                    for (Partition p : partitionsAdded) {
                         client.dropPartition(tableInfo.getDatabaseName(),
-                                tableInfo.getTableName(), p.getValues());
+                            tableInfo.getTableName(), p.getValues());
                     }
-                } catch(Exception te) {
+                } catch (Exception te) {
                     //Keep cause as the original exception
                     throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
                 }
             }
 
-            if( e instanceof HCatException ) {
+            if (e instanceof HCatException) {
                 throw (HCatException) e;
             } else {
                 throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
@@ -386,11 +384,11 @@ class FileOutputCommitterContainer exten
         }
     }
 
-    private String getPartitionRootLocation(String ptnLocn,int numPtnKeys) {
-        if (ptnRootLocation  == null){
+    private String getPartitionRootLocation(String ptnLocn, int numPtnKeys) {
+        if (ptnRootLocation == null) {
             // we only need to calculate it once, it'll be the same for other partitions in this job.
             Path ptnRoot = new Path(ptnLocn);
-            for (int i = 0; i < numPtnKeys; i++){
+            for (int i = 0; i < numPtnKeys; i++) {
 //          LOG.info("Getting parent of "+ptnRoot.getName());
                 ptnRoot = ptnRoot.getParent();
             }
@@ -416,11 +414,11 @@ class FileOutputCommitterContainer exten
      */
 
     private Partition constructPartition(
-            JobContext context,
-            String partLocnRoot, Map<String,String> partKVs,
-            HCatSchema outputSchema, Map<String, String> params,
-            Table table, FileSystem fs,
-            String grpName, FsPermission perms) throws IOException {
+        JobContext context,
+        String partLocnRoot, Map<String, String> partKVs,
+        HCatSchema outputSchema, Map<String, String> params,
+        Table table, FileSystem fs,
+        String grpName, FsPermission perms) throws IOException {
 
         Partition partition = new Partition();
         partition.setDbName(table.getDbName());
@@ -428,7 +426,7 @@ class FileOutputCommitterContainer exten
         partition.setSd(new StorageDescriptor(table.getTTable().getSd()));
 
         List<FieldSchema> fields = new ArrayList<FieldSchema>();
-        for(HCatFieldSchema fieldSchema : outputSchema.getFields()) {
+        for (HCatFieldSchema fieldSchema : outputSchema.getFields()) {
             fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema));
         }
 
@@ -450,16 +448,16 @@ class FileOutputCommitterContainer exten
         }
         // Apply the group and permissions to the leaf partition and files.
         applyGroupAndPerms(fs, partPath, perms, grpName, true);
-        if (dynamicPartitioningUsed){
-            String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs);
-            if (harProcessor.isEnabled()){
+        if (dynamicPartitioningUsed) {
+            String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table, partKVs);
+            if (harProcessor.isEnabled()) {
                 harProcessor.exec(context, partition, partPath);
                 partition.getSd().setLocation(
-                        harProcessor.getProcessedLocation(new Path(dynamicPartitionDestination)));
-            }else{
+                    harProcessor.getProcessedLocation(new Path(dynamicPartitionDestination)));
+            } else {
                 partition.getSd().setLocation(dynamicPartitionDestination);
             }
-        }else{
+        } else {
             partition.getSd().setLocation(partPath.toString());
         }
 
@@ -467,8 +465,8 @@ class FileOutputCommitterContainer exten
     }
 
     private void applyGroupAndPerms(FileSystem fs, Path dir, FsPermission permission,
-            String group, boolean recursive)
-            throws IOException {
+                                    String group, boolean recursive)
+        throws IOException {
         fs.setPermission(dir, permission);
         try {
             fs.setOwner(dir, null, group);
@@ -491,11 +489,11 @@ class FileOutputCommitterContainer exten
         }
     }
 
-    private String getFinalDynamicPartitionDestination(Table table, Map<String,String> partKVs) {
+    private String getFinalDynamicPartitionDestination(Table table, Map<String, String> partKVs) {
         // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA  ->
         // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA
         Path partPath = new Path(table.getTTable().getSd().getLocation());
-        for(FieldSchema partKey : table.getPartitionKeys()){
+        for (FieldSchema partKey : table.getPartitionKeys()) {
             partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
         }
         return partPath.toString();
@@ -505,13 +503,13 @@ class FileOutputCommitterContainer exten
         Map<String, String> params = new HashMap<String, String>();
 
         //Copy table level hcat.* keys to the partition
-        for(Entry<Object, Object> entry : storer.getProperties().entrySet()) {
+        for (Entry<Object, Object> entry : storer.getProperties().entrySet()) {
             params.put(entry.getKey().toString(), entry.getValue().toString());
         }
         return params;
     }
 
-    private Path constructPartialPartPath(Path partialPath, String partKey, Map<String,String> partKVs){
+    private Path constructPartialPartPath(Path partialPath, String partKey, Map<String, String> partKVs) {
 
         StringBuilder sb = new StringBuilder(FileUtils.escapePathName(partKey));
         sb.append("=");
@@ -534,7 +532,7 @@ class FileOutputCommitterContainer exten
 
         List<FieldSchema> newColumns = HCatUtil.validatePartitionSchema(table, partitionSchema);
 
-        if( newColumns.size() != 0 ) {
+        if (newColumns.size() != 0) {
             List<FieldSchema> tableColumns = new ArrayList<FieldSchema>(table.getTTable().getSd().getCols());
             tableColumns.addAll(newColumns);
 
@@ -561,12 +559,12 @@ class FileOutputCommitterContainer exten
         if (fs.isFile(file)) {
             Path finalOutputPath = getFinalPath(file, src, dest);
 
-            if (dryRun){
+            if (dryRun) {
 //        LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem");
-                if (fs.exists(finalOutputPath)){
+                if (fs.exists(finalOutputPath)) {
                     throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible.");
                 }
-            }else{
+            } else {
 //        LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]");
                 if (!fs.rename(file, finalOutputPath)) {
                     if (!fs.delete(finalOutputPath, true)) {
@@ -577,15 +575,15 @@ class FileOutputCommitterContainer exten
                     }
                 }
             }
-        } else if(fs.getFileStatus(file).isDir()) {
+        } else if (fs.getFileStatus(file).isDir()) {
             FileStatus[] paths = fs.listStatus(file);
             Path finalOutputPath = getFinalPath(file, src, dest);
-            if (!dryRun){
+            if (!dryRun) {
                 fs.mkdirs(finalOutputPath);
             }
             if (paths != null) {
                 for (FileStatus path : paths) {
-                    moveTaskOutputs(fs, path.getPath(), src, dest,dryRun);
+                    moveTaskOutputs(fs, path.getPath(), src, dest, dryRun);
                 }
             }
         }
@@ -606,7 +604,7 @@ class FileOutputCommitterContainer exten
         URI relativePath = src.toUri().relativize(taskOutputUri);
         if (taskOutputUri == relativePath) {
             throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Can not get the relative path: base = " +
-                    src + " child = " + file);
+                src + " child = " + file);
         }
         if (relativePath.getPath().length() > 0) {
             return new Path(dest, relativePath.getPath());
@@ -619,7 +617,7 @@ class FileOutputCommitterContainer exten
      * Run to discover dynamic partitions available
      */
     private void discoverPartitions(JobContext context) throws IOException {
-        if (!partitionsDiscovered){
+        if (!partitionsDiscovered) {
             //      LOG.info("discover ptns called");
             OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
 
@@ -639,33 +637,33 @@ class FileOutputCommitterContainer exten
             Path pathPattern = new Path(dynPathSpec);
             FileStatus[] status = fs.globStatus(pathPattern);
 
-            partitionsDiscoveredByPath = new LinkedHashMap<String,Map<String, String>>();
-            contextDiscoveredByPath = new LinkedHashMap<String,JobContext>();
+            partitionsDiscoveredByPath = new LinkedHashMap<String, Map<String, String>>();
+            contextDiscoveredByPath = new LinkedHashMap<String, JobContext>();
 
 
             if (status.length == 0) {
                 //        LOG.warn("No partition found genereated by dynamic partitioning in ["
                 //            +loadPath+"] with depth["+jobInfo.getTable().getPartitionKeysSize()
                 //            +"], dynSpec["+dynPathSpec+"]");
-            }else{
-                if ((maxDynamicPartitions != -1) && (status.length > maxDynamicPartitions)){
+            } else {
+                if ((maxDynamicPartitions != -1) && (status.length > maxDynamicPartitions)) {
                     this.partitionsDiscovered = true;
                     throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
-                            "Number of dynamic partitions being created "
-                                    + "exceeds configured max allowable partitions["
-                                    + maxDynamicPartitions
-                                    + "], increase parameter ["
-                                    + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
-                                    + "] if needed.");
+                        "Number of dynamic partitions being created "
+                            + "exceeds configured max allowable partitions["
+                            + maxDynamicPartitions
+                            + "], increase parameter ["
+                            + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+                            + "] if needed.");
                 }
 
-                for (FileStatus st : status){
+                for (FileStatus st : status) {
                     LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>();
                     Warehouse.makeSpecFromName(fullPartSpec, st.getPath());
-                    partitionsDiscoveredByPath.put(st.getPath().toString(),fullPartSpec);
-                    JobContext currContext = HCatHadoopShims.Instance.get().createJobContext(context.getConfiguration(),context.getJobID());
+                    partitionsDiscoveredByPath.put(st.getPath().toString(), fullPartSpec);
+                    JobContext currContext = HCatHadoopShims.Instance.get().createJobContext(context.getConfiguration(), context.getJobID());
                     HCatOutputFormat.configureOutputStorageHandler(context, jobInfo, fullPartSpec);
-                    contextDiscoveredByPath.put(st.getPath().toString(),currContext);
+                    contextDiscoveredByPath.put(st.getPath().toString(), currContext);
                 }
             }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java Mon Sep 10 23:28:55 2012
@@ -58,11 +58,11 @@ import java.util.Map;
  */
 class FileOutputFormatContainer extends OutputFormatContainer {
 
-    private static final PathFilter hiddenFileFilter = new PathFilter(){
-      public boolean accept(Path p){
-        String name = p.getName();
-        return !name.startsWith("_") && !name.startsWith(".");
-      }
+    private static final PathFilter hiddenFileFilter = new PathFilter() {
+        public boolean accept(Path p) {
+            String name = p.getName();
+            return !name.startsWith("_") && !name.startsWith(".");
+        }
     };
 
     /**
@@ -80,32 +80,32 @@ class FileOutputFormatContainer extends 
         //Configure the output key and value classes.
         // This is required for writing null as key for file based tables.
         context.getConfiguration().set("mapred.output.key.class",
-                NullWritable.class.getName());
+            NullWritable.class.getName());
         String jobInfoString = context.getConfiguration().get(
-                HCatConstants.HCAT_KEY_OUTPUT_INFO);
+            HCatConstants.HCAT_KEY_OUTPUT_INFO);
         OutputJobInfo jobInfo = (OutputJobInfo) HCatUtil
-                .deserialize(jobInfoString);
+            .deserialize(jobInfoString);
         StorerInfo storeInfo = jobInfo.getTableInfo().getStorerInfo();
         HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
-                context.getConfiguration(), storeInfo);
+            context.getConfiguration(), storeInfo);
         Class<? extends SerDe> serde = storageHandler.getSerDeClass();
         SerDe sd = (SerDe) ReflectionUtils.newInstance(serde,
-                context.getConfiguration());
+            context.getConfiguration());
         context.getConfiguration().set("mapred.output.value.class",
-                sd.getSerializedClass().getName());
+            sd.getSerializedClass().getName());
 
         // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. Can use null.
         // (That's because records can't be written until the values of the dynamic partitions are deduced.
         // By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.)
         RecordWriter<WritableComparable<?>, HCatRecord> rw =
             new FileRecordWriterContainer(
-                HCatBaseOutputFormat.getJobInfo(context).isDynamicPartitioningUsed()?
-                    null:
+                HCatBaseOutputFormat.getJobInfo(context).isDynamicPartitioningUsed() ?
+                    null :
                     getBaseOutputFormat()
-                            .getRecordWriter(null,
-                                                     new JobConf(context.getConfiguration()),
-                                                                         FileOutputFormat.getUniqueName(new JobConf(context.getConfiguration()), "part"),
-                                                                         InternalUtil.createReporter(context)),
+                        .getRecordWriter(null,
+                            new JobConf(context.getConfiguration()),
+                            FileOutputFormat.getUniqueName(new JobConf(context.getConfiguration()), "part"),
+                            InternalUtil.createReporter(context)),
                 context);
         return rw;
     }
@@ -118,9 +118,9 @@ class FileOutputFormatContainer extends 
             HiveConf hiveConf = HCatUtil.getHiveConf(context.getConfiguration());
             client = HCatUtil.getHiveClient(hiveConf);
             handleDuplicatePublish(context,
-                    jobInfo,
-                    client,
-                    new Table(jobInfo.getTableInfo().getTable()));
+                jobInfo,
+                client,
+                new Table(jobInfo.getTableInfo().getTable()));
         } catch (MetaException e) {
             throw new IOException(e);
         } catch (TException e) {
@@ -131,11 +131,11 @@ class FileOutputFormatContainer extends 
             HCatUtil.closeHiveClientQuietly(client);
         }
 
-        if(!jobInfo.isDynamicPartitioningUsed()) {
+        if (!jobInfo.isDynamicPartitioningUsed()) {
             JobConf jobConf = new JobConf(context.getConfiguration());
             getBaseOutputFormat().checkOutputSpecs(null, jobConf);
             //checkoutputspecs might've set some properties we need to have context reflect that
-            HCatUtil.copyConf(jobConf,context.getConfiguration());
+            HCatUtil.copyConf(jobConf, context.getConfiguration());
         }
     }
 
@@ -144,9 +144,9 @@ class FileOutputFormatContainer extends 
         //this needs to be manually set, under normal circumstances MR Task does this
         setWorkOutputPath(context);
         return new FileOutputCommitterContainer(context,
-               HCatBaseOutputFormat.getJobInfo(context).isDynamicPartitioningUsed()?
-                       null:
-                       new JobConf(context.getConfiguration()).getOutputCommitter());
+            HCatBaseOutputFormat.getJobInfo(context).isDynamicPartitioningUsed() ?
+                null :
+                new JobConf(context.getConfiguration()).getOutputCommitter());
     }
 
     /**
@@ -162,7 +162,7 @@ class FileOutputFormatContainer extends 
      * @throws org.apache.thrift.TException
      */
     private static void handleDuplicatePublish(JobContext context, OutputJobInfo outputInfo,
-      HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException, NoSuchObjectException {
+                                               HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException, NoSuchObjectException {
 
         /*
         * For fully specified ptn, follow strict checks for existence of partitions in metadata
@@ -173,32 +173,32 @@ class FileOutputFormatContainer extends 
         *    there are a large number of partitions that match the partial specifications
         */
 
-        if( table.getPartitionKeys().size() > 0 ) {
-            if (!outputInfo.isDynamicPartitioningUsed()){
+        if (table.getPartitionKeys().size() > 0) {
+            if (!outputInfo.isDynamicPartitioningUsed()) {
                 List<String> partitionValues = getPartitionValueList(
-                        table, outputInfo.getPartitionValues());
+                    table, outputInfo.getPartitionValues());
                 // fully-specified partition
                 List<String> currentParts = client.listPartitionNames(outputInfo.getDatabaseName(),
-                        outputInfo.getTableName(), partitionValues, (short) 1);
+                    outputInfo.getTableName(), partitionValues, (short) 1);
 
-                if( currentParts.size() > 0 ) {
+                if (currentParts.size() > 0) {
                     throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION);
                 }
             }
         } else {
             List<String> partitionValues = getPartitionValueList(
-                    table, outputInfo.getPartitionValues());
+                table, outputInfo.getPartitionValues());
             // non-partitioned table
 
             Path tablePath = new Path(table.getTTable().getSd().getLocation());
             FileSystem fs = tablePath.getFileSystem(context.getConfiguration());
 
-            if ( fs.exists(tablePath) ) {
+            if (fs.exists(tablePath)) {
                 FileStatus[] status = fs.globStatus(new Path(tablePath, "*"), hiddenFileFilter);
 
-                if( status.length > 0 ) {
+                if (status.length > 0) {
                     throw new HCatException(ErrorType.ERROR_NON_EMPTY_TABLE,
-                            table.getDbName() + "." + table.getTableName());
+                        table.getDbName() + "." + table.getTableName());
                 }
             }
         }
@@ -213,22 +213,22 @@ class FileOutputFormatContainer extends 
      */
     static List<String> getPartitionValueList(Table table, Map<String, String> valueMap) throws IOException {
 
-        if( valueMap.size() != table.getPartitionKeys().size() ) {
+        if (valueMap.size() != table.getPartitionKeys().size()) {
             throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
-                    "Table "
-                            + table.getTableName() + " has " +
-                            table.getPartitionKeys().size() + " partition keys, got "+
-                            valueMap.size());
+                "Table "
+                    + table.getTableName() + " has " +
+                    table.getPartitionKeys().size() + " partition keys, got " +
+                    valueMap.size());
         }
 
         List<String> values = new ArrayList<String>();
 
-        for(FieldSchema schema : table.getPartitionKeys()) {
+        for (FieldSchema schema : table.getPartitionKeys()) {
             String value = valueMap.get(schema.getName().toLowerCase());
 
-            if( value == null ) {
+            if (value == null) {
                 throw new HCatException(ErrorType.ERROR_MISSING_PARTITION_KEY,
-                        "Key " + schema.getName() + " of table " + table.getTableName());
+                    "Key " + schema.getName() + " of table " + table.getTableName());
             }
 
             values.add(value);
@@ -241,8 +241,8 @@ class FileOutputFormatContainer extends 
         String outputPath = context.getConfiguration().get("mapred.output.dir");
         //we need to do this to get the task path and set it for mapred implementation
         //since it can't be done automatically because of mapreduce->mapred abstraction
-        if(outputPath != null)
+        if (outputPath != null)
             context.getConfiguration().set("mapred.work.output.dir",
-                    new FileOutputCommitter(new Path(outputPath), context).getWorkPath().toString());
+                new FileOutputCommitter(new Path(outputPath), context).getWorkPath().toString());
     }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java Mon Sep 10 23:28:55 2012
@@ -80,17 +80,17 @@ class FileRecordWriterContainer extends 
      */
     public FileRecordWriterContainer(org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
                                      TaskAttemptContext context) throws IOException, InterruptedException {
-        super(context,baseWriter);
+        super(context, baseWriter);
         this.context = context;
         jobInfo = HCatOutputFormat.getJobInfo(context);
 
         storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
-        serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),context.getConfiguration());
+        serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration());
         objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema());
         try {
             InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo);
         } catch (SerDeException e) {
-            throw new IOException("Failed to inialize SerDe",e);
+            throw new IOException("Failed to inialize SerDe", e);
         }
 
         // If partition columns occur in data, we want to remove them.
@@ -99,9 +99,9 @@ class FileRecordWriterContainer extends 
         dynamicPartCols = jobInfo.getPosOfDynPartCols();
         maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
 
-        if((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))){
+        if ((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))) {
             throw new HCatException("It seems that setSchema() is not called on " +
-                    "HCatOutputFormat. Please make sure that method is called.");
+                "HCatOutputFormat. Please make sure that method is called.");
         }
 
 
@@ -112,10 +112,9 @@ class FileRecordWriterContainer extends 
             this.dynamicContexts = null;
             this.dynamicObjectInspectors = null;
             this.dynamicOutputJobInfo = null;
-        }
-        else {
-            this.baseDynamicSerDe = new HashMap<String,SerDe>();
-            this.baseDynamicWriters = new HashMap<String,org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>>();
+        } else {
+            this.baseDynamicSerDe = new HashMap<String, SerDe>();
+            this.baseDynamicWriters = new HashMap<String, org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>>();
             this.baseDynamicCommitters = new HashMap<String, org.apache.hadoop.mapred.OutputCommitter>();
             this.dynamicContexts = new HashMap<String, org.apache.hadoop.mapred.TaskAttemptContext>();
             this.dynamicObjectInspectors = new HashMap<String, ObjectInspector>();
@@ -132,17 +131,17 @@ class FileRecordWriterContainer extends 
 
     @Override
     public void close(TaskAttemptContext context) throws IOException,
-            InterruptedException {
+        InterruptedException {
         Reporter reporter = InternalUtil.createReporter(context);
-        if (dynamicPartitioningUsed){
-            for (org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()){
+        if (dynamicPartitioningUsed) {
+            for (org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()) {
                 //We are in RecordWriter.close() make sense that the context would be TaskInputOutput
                 bwriter.close(reporter);
             }
-            for(Map.Entry<String,org.apache.hadoop.mapred.OutputCommitter>entry : baseDynamicCommitters.entrySet()) {
+            for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> entry : baseDynamicCommitters.entrySet()) {
                 org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey());
                 OutputCommitter baseOutputCommitter = entry.getValue();
-                if (baseOutputCommitter.needsTaskCommit(currContext)){
+                if (baseOutputCommitter.needsTaskCommit(currContext)) {
                     baseOutputCommitter.commitTask(currContext);
                 }
                 org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currContext);
@@ -155,93 +154,92 @@ class FileRecordWriterContainer extends 
 
     @Override
     public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
-            InterruptedException {
+        InterruptedException {
 
         org.apache.hadoop.mapred.RecordWriter localWriter;
         ObjectInspector localObjectInspector;
         SerDe localSerDe;
         OutputJobInfo localJobInfo = null;
 
-        if (dynamicPartitioningUsed){
+        if (dynamicPartitioningUsed) {
             // calculate which writer to use from the remaining values - this needs to be done before we delete cols
             List<String> dynamicPartValues = new ArrayList<String>();
-            for (Integer colToAppend :  dynamicPartCols){
+            for (Integer colToAppend : dynamicPartCols) {
                 dynamicPartValues.add(value.get(colToAppend).toString());
             }
 
             String dynKey = dynamicPartValues.toString();
-            if (!baseDynamicWriters.containsKey(dynKey)){
-                if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)){
+            if (!baseDynamicWriters.containsKey(dynKey)) {
+                if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) {
                     throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
-                            "Number of dynamic partitions being created "
-                                    + "exceeds configured max allowable partitions["
-                                    + maxDynamicPartitions
-                                    + "], increase parameter ["
-                                    + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
-                                    + "] if needed.");
+                        "Number of dynamic partitions being created "
+                            + "exceeds configured max allowable partitions["
+                            + maxDynamicPartitions
+                            + "], increase parameter ["
+                            + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+                            + "] if needed.");
                 }
 
                 org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context);
                 configureDynamicStorageHandler(currTaskContext, dynamicPartValues);
-                localJobInfo= HCatBaseOutputFormat.getJobInfo(currTaskContext);
+                localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext);
 
                 //setup serDe
                 SerDe currSerDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf());
                 try {
                     InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), localJobInfo);
                 } catch (SerDeException e) {
-                    throw new IOException("Failed to initialize SerDe",e);
+                    throw new IOException("Failed to initialize SerDe", e);
                 }
 
                 //create base OutputFormat
                 org.apache.hadoop.mapred.OutputFormat baseOF =
-                        ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf());
+                    ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf());
                 //check outputSpecs
-                baseOF.checkOutputSpecs(null,currTaskContext.getJobConf());
+                baseOF.checkOutputSpecs(null, currTaskContext.getJobConf());
                 //get Output Committer
-                org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter =  currTaskContext.getJobConf().getOutputCommitter();
+                org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter();
                 //create currJobContext the latest so it gets all the config changes
                 org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext);
                 //setupJob()
                 baseOutputCommitter.setupJob(currJobContext);
                 //recreate to refresh jobConf of currTask context
                 currTaskContext =
-                        HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(),
-                                                                                        currTaskContext.getTaskAttemptID(),
-                                                                                        currTaskContext.getProgressible());
+                    HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(),
+                        currTaskContext.getTaskAttemptID(),
+                        currTaskContext.getProgressible());
                 //set temp location
                 currTaskContext.getConfiguration().set("mapred.work.output.dir",
-                                new FileOutputCommitter(new Path(localJobInfo.getLocation()),currTaskContext).getWorkPath().toString());
+                    new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext).getWorkPath().toString());
                 //setupTask()
                 baseOutputCommitter.setupTask(currTaskContext);
 
                 org.apache.hadoop.mapred.RecordWriter baseRecordWriter =
-                        baseOF.getRecordWriter(null,
-                                                            currTaskContext.getJobConf(),
-                                                            FileOutputFormat.getUniqueFile(currTaskContext, "part", ""),
-                                                            InternalUtil.createReporter(currTaskContext));
+                    baseOF.getRecordWriter(null,
+                        currTaskContext.getJobConf(),
+                        FileOutputFormat.getUniqueFile(currTaskContext, "part", ""),
+                        InternalUtil.createReporter(currTaskContext));
 
                 baseDynamicWriters.put(dynKey, baseRecordWriter);
-                baseDynamicSerDe.put(dynKey,currSerDe);
-                baseDynamicCommitters.put(dynKey,baseOutputCommitter);
-                dynamicContexts.put(dynKey,currTaskContext);
-                dynamicObjectInspectors.put(dynKey,InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()));
+                baseDynamicSerDe.put(dynKey, currSerDe);
+                baseDynamicCommitters.put(dynKey, baseOutputCommitter);
+                dynamicContexts.put(dynKey, currTaskContext);
+                dynamicObjectInspectors.put(dynKey, InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()));
                 dynamicOutputJobInfo.put(dynKey, HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey)));
             }
-            
+
             localJobInfo = dynamicOutputJobInfo.get(dynKey);
             localWriter = baseDynamicWriters.get(dynKey);
             localSerDe = baseDynamicSerDe.get(dynKey);
             localObjectInspector = dynamicObjectInspectors.get(dynKey);
-        }
-        else{
+        } else {
             localJobInfo = jobInfo;
             localWriter = getBaseRecordWriter();
             localSerDe = serDe;
             localObjectInspector = objectInspector;
         }
 
-        for(Integer colToDel : partColsToDel){
+        for (Integer colToDel : partColsToDel) {
             value.remove(colToDel);
         }
 
@@ -250,7 +248,7 @@ class FileRecordWriterContainer extends 
         try {
             localWriter.write(NullWritable.get(), localSerDe.serialize(value.getAll(), localObjectInspector));
         } catch (SerDeException e) {
-            throw new IOException("Failed to serialize object",e);
+            throw new IOException("Failed to serialize object", e);
         }
     }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java Mon Sep 10 23:28:55 2012
@@ -52,18 +52,19 @@ public class FosterStorageHandler extend
     /** The directory under which data is initially written for a non partitioned table */
     protected static final String TEMP_DIR_NAME = "_TEMP";
 
-   private Class<? extends InputFormat> ifClass;
-   private Class<? extends OutputFormat> ofClass;
-   private Class<? extends SerDe> serDeClass;
+    private Class<? extends InputFormat> ifClass;
+    private Class<? extends OutputFormat> ofClass;
+    private Class<? extends SerDe> serDeClass;
 
     public FosterStorageHandler(String ifName, String ofName, String serdeName) throws ClassNotFoundException {
         this((Class<? extends InputFormat>) Class.forName(ifName),
-                (Class<? extends OutputFormat>) Class.forName(ofName),
-                (Class<? extends SerDe>) Class.forName(serdeName));
+            (Class<? extends OutputFormat>) Class.forName(ofName),
+            (Class<? extends SerDe>) Class.forName(serdeName));
     }
+
     public FosterStorageHandler(Class<? extends InputFormat> ifClass,
-                                               Class<? extends OutputFormat> ofClass,
-                                               Class<? extends SerDe> serDeClass) {
+                                Class<? extends OutputFormat> ofClass,
+                                Class<? extends SerDe> serDeClass) {
         this.ifClass = ifClass;
         this.ofClass = ofClass;
         this.serDeClass = serDeClass;
@@ -97,36 +98,35 @@ public class FosterStorageHandler extend
 
     @Override
     public void configureOutputJobProperties(TableDesc tableDesc,
-                                      Map<String, String> jobProperties) {
+                                             Map<String, String> jobProperties) {
         try {
             OutputJobInfo jobInfo = (OutputJobInfo)
-              HCatUtil.deserialize(tableDesc.getJobProperties().get(
-                                      HCatConstants.HCAT_KEY_OUTPUT_INFO));
+                HCatUtil.deserialize(tableDesc.getJobProperties().get(
+                    HCatConstants.HCAT_KEY_OUTPUT_INFO));
             String parentPath = jobInfo.getTableInfo().getTableLocation();
             String dynHash = tableDesc.getJobProperties().get(
-                                      HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
+                HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
 
             // For dynamic partitioned writes without all keyvalues specified,
             // we create a temp dir for the associated write job
-            if (dynHash != null){
+            if (dynHash != null) {
                 parentPath = new Path(parentPath,
-                                      DYNTEMP_DIR_NAME+dynHash).toString();
+                    DYNTEMP_DIR_NAME + dynHash).toString();
             }
 
             String outputLocation;
 
             // For non-partitioned tables, we send them to the temp dir
-            if(dynHash == null && jobInfo.getPartitionValues().size() == 0) {
+            if (dynHash == null && jobInfo.getPartitionValues().size() == 0) {
                 outputLocation = TEMP_DIR_NAME;
-            }
-            else {
+            } else {
                 List<String> cols = new ArrayList<String>();
                 List<String> values = new ArrayList<String>();
 
                 //Get the output location in the order partition keys are defined for the table.
-                for(String name:
+                for (String name :
                     jobInfo.getTableInfo().
-                    getPartitionColumns().getFieldNames()) {
+                        getPartitionColumns().getFieldNames()) {
                     String value = jobInfo.getPartitionValues().get(name);
                     cols.add(name);
                     values.add(value);
@@ -134,29 +134,29 @@ public class FosterStorageHandler extend
                 outputLocation = FileUtils.makePartName(cols, values);
             }
 
-            jobInfo.setLocation(new Path(parentPath,outputLocation).toString());
+            jobInfo.setLocation(new Path(parentPath, outputLocation).toString());
 
             //only set output dir if partition is fully materialized
-            if(jobInfo.getPartitionValues().size()
+            if (jobInfo.getPartitionValues().size()
                 == jobInfo.getTableInfo().getPartitionColumns().size()) {
                 jobProperties.put("mapred.output.dir", jobInfo.getLocation());
             }
 
             //TODO find a better home for this, RCFile specifc
             jobProperties.put(RCFile.COLUMN_NUMBER_CONF_STR,
-                              Integer.toOctalString(
-                                jobInfo.getOutputSchema().getFields().size()));
+                Integer.toOctalString(
+                    jobInfo.getOutputSchema().getFields().size()));
             jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO,
-                              HCatUtil.serialize(jobInfo));
+                HCatUtil.serialize(jobInfo));
         } catch (IOException e) {
-            throw new IllegalStateException("Failed to set output path",e);
+            throw new IllegalStateException("Failed to set output path", e);
         }
 
     }
 
     @Override
     OutputFormatContainer getOutputFormatContainer(
-              org.apache.hadoop.mapred.OutputFormat outputFormat) {
+        org.apache.hadoop.mapred.OutputFormat outputFormat) {
         return new FileOutputFormatContainer(outputFormat);
     }
 
@@ -172,7 +172,7 @@ public class FosterStorageHandler extend
 
     @Override
     public HiveAuthorizationProvider getAuthorizationProvider()
-      throws HiveException {
+        throws HiveException {
         return new DefaultHiveAuthorizationProvider();
     }
 



Mime
View raw message