hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [07/23] hive git commit: HIVE-11553 : use basic file metadata cache in ETLSplitStrategy-related paths (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Date Mon, 05 Oct 2015 19:41:26 GMT
HIVE-11553 : use basic file metadata cache in ETLSplitStrategy-related paths (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bc1c434f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bc1c434f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bc1c434f

Branch: refs/heads/llap
Commit: bc1c434f1ffc70dc2d52f34839c6f54b0b0c8d10
Parents: 77f30d4
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Fri Oct 2 11:11:36 2015 -0700
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Fri Oct 2 13:11:28 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   9 +-
 .../hadoop/hive/metastore/HiveMetaStore.java    |  20 +-
 .../hive/metastore/HiveMetaStoreClient.java     |  94 +++++
 .../hadoop/hive/metastore/IMetaStoreClient.java |  20 ++
 .../hive/metastore/hbase/HBaseReadWrite.java    |   9 +-
 ql/pom.xml                                      |   3 +
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   2 +-
 .../hive/ql/exec/tez/HiveSplitGenerator.java    |   4 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 360 +++++++++++++++----
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  |   2 +-
 .../hadoop/hive/ql/io/orc/ReaderImpl.java       |   3 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  44 ++-
 12 files changed, 484 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e7ed07e..77ca613 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -153,7 +153,7 @@ public class HiveConf extends Configuration {
       HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
       HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS,
       HiveConf.ConfVars.METASTORE_PART_INHERIT_TBL_PROPS,
-      HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_TABLE_PARTITION_MAX,
+      HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_OBJECTS_MAX,
       HiveConf.ConfVars.METASTORE_INIT_HOOKS,
       HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS,
       HiveConf.ConfVars.HMSHANDLERATTEMPTS,
@@ -567,9 +567,9 @@ public class HiveConf extends Configuration {
         "Maximum number of objects (tables/partitions) can be retrieved from metastore in
one batch. \n" +
         "The higher the number, the less the number of round trips is needed to the Hive
metastore server, \n" +
         "but it may also cause higher memory requirement at the client side."),
-    METASTORE_BATCH_RETRIEVE_TABLE_PARTITION_MAX(
+    METASTORE_BATCH_RETRIEVE_OBJECTS_MAX(
         "hive.metastore.batch.retrieve.table.partition.max", 1000,
-        "Maximum number of table partitions that metastore internally retrieves in one batch."),
+        "Maximum number of objects that metastore internally retrieves in one batch."),
 
     METASTORE_INIT_HOOKS("hive.metastore.init.hooks", "",
         "A comma separated list of hooks to be invoked at the beginning of HMSHandler initialization.
\n" +
@@ -1077,6 +1077,9 @@ public class HiveConf extends Configuration {
         " (split generation reads and caches file footers). HYBRID chooses between the above
strategies" +
         " based on heuristics."),
 
+    HIVE_ORC_MS_FOOTER_CACHE_ENABLED("hive.orc.splits.ms.footer.cache.enabled", false,
+        "Whether to enable using file metadata cache in metastore for ORC file footers."),
+
     HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false,
         "If turned on splits generated by orc will include metadata about the stripes in
the file. This\n" +
         "data is read remotely (from the client or HS2 machine) and sent to all the tasks."),

http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 815f499..8cd1f52 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -208,6 +208,7 @@ import org.apache.thrift.transport.TTransportFactory;
 import javax.jdo.JDOException;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -5718,12 +5719,29 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       if (metadatas != null) {
         assert metadatas.length == fileIds.size();
         for (int i = 0; i < metadatas.length; ++i) {
-          result.putToMetadata(fileIds.get(i), metadatas[i]);
+          ByteBuffer bb = metadatas[i];
+          if (bb == null) continue;
+          bb = handleReadOnlyBufferForThrift(bb);
+          result.putToMetadata(fileIds.get(i), bb);
         }
       }
+      if (!result.isSetMetadata()) {
+        result.setMetadata(new HashMap<Long, ByteBuffer>()); // Set the required field.
+      }
       return result;
     }
 
+    private ByteBuffer handleReadOnlyBufferForThrift(ByteBuffer bb) {
+      if (!bb.isReadOnly()) return bb;
+      // Thrift cannot write read-only buffers... oh well.
+      // TODO: actually thrift never writes to the buffer, so we could use reflection to
+      //       unset the unnecessary read-only flag if allocation/copy perf becomes a problem.
+      ByteBuffer copy = ByteBuffer.allocate(bb.capacity());
+      copy.put(bb);
+      copy.flip();
+      return copy;
+    }
+
     @Override
     public PutFileMetadataResult put_file_metadata(PutFileMetadataRequest req) throws TException
{
       getMS().putFileMetadata(req.getFileIds(), req.getMetadata());

http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 6f15fd0..8e32966 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -35,9 +35,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -61,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.AddPartitionsResult;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
@@ -76,6 +80,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FireEventRequest;
 import org.apache.hadoop.hive.metastore.api.FireEventResponse;
 import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult;
 import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
@@ -116,6 +122,7 @@ import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
 import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
+import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest;
 import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
 import org.apache.hadoop.hive.metastore.api.Role;
 import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
@@ -170,6 +177,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
   private String tokenStrForm;
   private final boolean localMetaStore;
   private final MetaStoreFilterHook filterHook;
+  private final int fileMetadataBatchSize;
 
   private Map<String, String> currentMetaVars;
 
@@ -195,6 +203,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
     }
     this.conf = conf;
     filterHook = loadFilterHooks();
+    fileMetadataBatchSize = HiveConf.getIntVar(
+        conf, HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_OBJECTS_MAX);
 
     String msUri = conf.getVar(HiveConf.ConfVars.METASTOREURIS);
     localMetaStore = HiveConfUtil.isEmbeddedMetaStore(msUri);
@@ -2108,4 +2118,88 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
     PartitionsStatsRequest req = new PartitionsStatsRequest(dbName, tblName, colNames, partNames);
     return client.get_aggr_stats_for(req);
   }
+
+  @Override
+  public Iterable<Entry<Long, ByteBuffer>> getFileMetadata(
+      final List<Long> fileIds) throws TException {
+    return new MetastoreMapIterable<Long, ByteBuffer>() {
+      private int listIndex = 0;
+      @Override
+      protected Map<Long, ByteBuffer> fetchNextBatch() throws TException {
+        if (listIndex == fileIds.size()) return null;
+        int endIndex = Math.min(listIndex + fileMetadataBatchSize, fileIds.size());
+        List<Long> subList = fileIds.subList(listIndex, endIndex);
+        GetFileMetadataRequest req = new GetFileMetadataRequest();
+        req.setFileIds(subList);
+        GetFileMetadataResult resp = client.get_file_metadata(req);
+        listIndex = endIndex;
+        return resp.getMetadata();
+      }
+    };
+  }
+
+  public static abstract class MetastoreMapIterable<K, V>
+    implements Iterable<Entry<K, V>>, Iterator<Entry<K, V>> {
+    private Iterator<Entry<K, V>> currentIter;
+
+    protected abstract Map<K, V> fetchNextBatch() throws TException;
+
+    @Override
+    public Iterator<Entry<K, V>> iterator() {
+      return this;
+    }
+
+    @Override
+    public boolean hasNext() {
+      ensureCurrentBatch();
+      return currentIter != null;
+    }
+
+    private void ensureCurrentBatch() {
+      if (currentIter != null && currentIter.hasNext()) return;
+      currentIter = null;
+      Map<K, V> currentBatch;
+      do {
+        try {
+          currentBatch = fetchNextBatch();
+        } catch (TException ex) {
+          throw new RuntimeException(ex);
+        }
+        if (currentBatch == null) return; // No more data.
+      } while (currentBatch.isEmpty());
+      currentIter = currentBatch.entrySet().iterator();
+    }
+
+    @Override
+    public Entry<K, V> next() {
+      ensureCurrentBatch();
+      if (currentIter == null) throw new NoSuchElementException();
+      return currentIter.next();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public void clearFileMetadata(List<Long> fileIds) throws TException {
+    ClearFileMetadataRequest req = new ClearFileMetadataRequest();
+    req.setFileIds(fileIds);
+    client.clear_file_metadata(req);
+  }
+
+  @Override
+  public void putFileMetadata(List<Long> fileIds, List<ByteBuffer> metadata)
throws TException {
+    PutFileMetadataRequest req = new PutFileMetadataRequest();
+    req.setFileIds(fileIds);
+    req.setMetadata(metadata);
+    client.put_file_metadata(req);
+  }
+
+  @Override
+  public boolean isSameConfObj(HiveConf c) {
+    return conf == c;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index e4a6cdb..77820ae 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -79,8 +79,10 @@ import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * Wrapper around hive metastore thrift api
@@ -1459,4 +1461,22 @@ public interface IMetaStoreClient {
    * flush statistics objects.  This should be called at the beginning of each query.
    */
   void flushCache();
+
+  /**
+   * Gets file metadata, as cached by metastore, for respective file IDs.
+   * The metadata that is not cached in metastore may be missing.
+   */
+  Iterable<Entry<Long, ByteBuffer>> getFileMetadata(List<Long> fileIds)
throws TException;
+
+  /**
+   * Cleares the file metadata cache for respective file IDs.
+   */
+  void clearFileMetadata(List<Long> fileIds) throws TException;
+
+  /**
+   * Adds file metadata for respective file IDs to metadata cache in metastore.
+   */
+  void putFileMetadata(List<Long> fileIds, List<ByteBuffer> metadata) throws
TException;
+
+  boolean isSameConfObj(HiveConf c);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
index d38c561..f69b4c7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -1959,7 +1960,13 @@ public class HBaseReadWrite {
     Result[] results = htab.get(gets);
     for (int i = 0; i < results.length; ++i) {
       Result r = results[i];
-      resultDest[i] = (r.isEmpty() ? null : r.getValueAsByteBuffer(colFam, colName));
+      if (r.isEmpty()) {
+        resultDest[i] = null;
+      } else {
+        Cell cell = r.getColumnLatestCell(colFam, colName);
+        resultDest[i] = ByteBuffer.wrap(
+            cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index 36b3433..587e2ee 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -704,9 +704,12 @@
                   <include>org.apache.hive:hive-common</include>
                   <include>org.apache.hive:hive-exec</include>
                   <include>org.apache.hive:hive-serde</include>
+                  <include>org.apache.hive:hive-metastore</include>
                   <include>com.esotericsoftware.kryo:kryo</include>
                   <include>org.apache.parquet:parquet-hadoop-bundle</include>
                   <include>org.apache.thrift:libthrift</include>
+                  <include>org.apache.thrift:libfb303</include>
+                  <include>javax.jdo:jdo-api</include>
                   <include>commons-lang:commons-lang</include>
                   <include>org.apache.commons:commons-lang3</include>
                   <include>org.jodd:jodd-core</include>

http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 3511e73..74007e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -3635,7 +3635,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable
{
     }
 
     int partitionBatchSize = HiveConf.getIntVar(conf,
-        ConfVars.METASTORE_BATCH_RETRIEVE_TABLE_PARTITION_MAX);
+        ConfVars.METASTORE_BATCH_RETRIEVE_OBJECTS_MAX);
 
     // drop the table
     db.dropTable(dropTbl.getTableName(), dropTbl.getIfPurge());

http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 87881b6..0a43c15 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -91,10 +91,10 @@ public class HiveSplitGenerator extends InputInitializer {
     userPayloadProto =
         MRInputHelpers.parseMRInputPayload(initializerContext.getInputUserPayload());
 
-    this.conf =
-        TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
+    this.conf = TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
 
     this.jobConf = new JobConf(conf);
+
     // Read all credentials into the credentials instance stored in JobConf.
     ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 57bde3e..ef62a23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -19,11 +19,14 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
@@ -34,6 +37,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -57,10 +61,13 @@ import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
+import org.apache.hadoop.hive.ql.io.orc.ReaderImpl.FileMetaInfo;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -110,7 +117,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   InputFormatChecker, VectorizedInputFormatInterface,
     AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination
{
 
-  static enum SplitStrategyKind{
+  static enum SplitStrategyKind {
     HYBRID,
     BI,
     ETL
@@ -441,7 +448,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    */
   static class Context {
     private final Configuration conf;
-    private static Cache<Path, FileInfo> footerCache;
+
+    // We store all caches in variables to change the main one based on config.
+    // This is not thread safe between different split generations (and wasn't anyway).
+    private static FooterCache footerCache;
+    private static LocalCache localCache;
+    private static MetastoreCache metaCache;
     private static ExecutorService threadPool = null;
     private final int numBuckets;
     private final long maxSize;
@@ -490,13 +502,26 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
                   .setNameFormat("ORC_GET_SPLITS #%d").build());
         }
 
-        if (footerCache == null && cacheStripeDetails) {
-          footerCache = CacheBuilder.newBuilder()
-              .concurrencyLevel(numThreads)
-              .initialCapacity(cacheStripeDetailsSize)
-              .maximumSize(cacheStripeDetailsSize)
-              .softValues()
-              .build();
+        // TODO: local cache is created once, so the configs for future queries will not
be honored.
+        if (cacheStripeDetails) {
+          // Note that there's no FS check here; we implicitly only use metastore cache for
+          // HDFS, because only HDFS would return fileIds for us. If fileId is extended using
+          // size/mod time/etc. for other FSes, we might need to check FSes explicitly because
+          // using such an aggregate fileId cache is not bulletproof and should be disable-able.
+          boolean useMetastoreCache = HiveConf.getBoolVar(
+              conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED);
+          if (localCache == null) {
+            localCache = new LocalCache(numThreads, cacheStripeDetailsSize);
+          }
+          if (useMetastoreCache) {
+            if (metaCache == null) {
+              metaCache = new MetastoreCache(localCache);
+            }
+            assert conf instanceof HiveConf;
+            metaCache.configure((HiveConf)conf);
+          }
+          // Set footer cache for current split generation. See field comment - not thread
safe.
+          footerCache = useMetastoreCache ? metaCache : localCache;
         }
       }
       String value = conf.get(ValidTxnList.VALID_TXNS_KEY,
@@ -591,49 +616,37 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
       this.covered = covered;
     }
 
-    private FileInfo verifyCachedFileInfo(FileStatus file) {
-      FileInfo fileInfo = Context.footerCache.getIfPresent(file.getPath());
-      if (fileInfo != null) {
-        if (isDebugEnabled) {
-          LOG.debug("Info cached for path: " + file.getPath());
-        }
-        if (fileInfo.modificationTime == file.getModificationTime() &&
-            fileInfo.size == file.getLen()) {
-          // Cached copy is valid
-          context.cacheHitCounter.incrementAndGet();
-          return fileInfo;
-        } else {
-          // Invalidate
-          Context.footerCache.invalidate(file.getPath());
-          if (isDebugEnabled) {
-            LOG.debug("Meta-Info for : " + file.getPath() +
-                " changed. CachedModificationTime: "
-                + fileInfo.modificationTime + ", CurrentModificationTime: "
-                + file.getModificationTime()
-                + ", CachedLength: " + fileInfo.size + ", CurrentLength: " +
-                file.getLen());
-          }
-        }
-      } else {
-        if (isDebugEnabled) {
-          LOG.debug("Info not cached for path: " + file.getPath());
-        }
-      }
-      return null;
-    }
-
     @Override
     public List<SplitInfo> getSplits() throws IOException {
-      List<SplitInfo> result = Lists.newArrayList();
-      for (HdfsFileStatusWithId file : files) {
-        FileInfo info = null;
-        if (context.cacheStripeDetails) {
-          info = verifyCachedFileInfo(file.getFileStatus());
+      List<SplitInfo> result = new ArrayList<>(files.size());
+      // TODO: Right now, we do the metastore call here, so there will be a metastore call
per
+      //       partition. If we had a sync point after getting file lists, we could make
just one
+      //       call; this might be too much sync for many partitions and also cause issues
with the
+      //       huge metastore call result that cannot be handled with in-API batching. To
have an
+      //       optimal number of metastore calls, we should wait for batch-size number of
files (a
+      //       few hundreds) to become available, then call metastore.
+      if (context.cacheStripeDetails) {
+        FileInfo[] infos = Context.footerCache.getAndValidate(files);
+        for (int i = 0; i < files.size(); ++i) {
+          FileInfo info = infos[i];
+          if (info != null) {
+            // Cached copy is valid
+            context.cacheHitCounter.incrementAndGet();
+          }
+          HdfsFileStatusWithId file = files.get(i);
+          // ignore files of 0 length
+          if (file.getFileStatus().getLen() > 0) {
+            result.add(new SplitInfo(
+                context, fs, file, info, isOriginal, deltas, true, dir, covered));
+          }
         }
-        // ignore files of 0 length
-        if (file.getFileStatus().getLen() > 0) {
-          result.add(new SplitInfo(
-              context, fs, file, info, isOriginal, deltas, true, dir, covered));
+      } else {
+        for (HdfsFileStatusWithId file : files) {
+          // ignore files of 0 length
+          if (file.getFileStatus().getLen() > 0) {
+            result.add(new SplitInfo(
+                context, fs, file, null, isOriginal, deltas, true, dir, covered));
+          }
         }
       }
       return result;
@@ -808,7 +821,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       this.file = this.fileWithId.getFileStatus();
       this.blockSize = this.file.getBlockSize();
       this.fileInfo = splitInfo.fileInfo;
-      locations = SHIMS.getLocationsWithOffset(fs, file);
+      locations = SHIMS.getLocationsWithOffset(fs, file); // TODO: potential DFS call
       this.isOriginal = splitInfo.isOriginal;
       this.deltas = splitInfo.deltas;
       this.hasBase = splitInfo.hasBase;
@@ -990,41 +1003,51 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
     }
 
     private void populateAndCacheStripeDetails() throws IOException {
-      Reader orcReader = OrcFile.createReader(file.getPath(),
-          OrcFile.readerOptions(context.conf).filesystem(fs));
+      // Only create OrcReader if we are missing some information.
+      OrcProto.Footer footer;
       if (fileInfo != null) {
         stripes = fileInfo.stripeInfos;
         fileMetaInfo = fileInfo.fileMetaInfo;
         metadata = fileInfo.metadata;
-        types = fileInfo.types;
+        types = fileInfo.footer.getTypesList();
         writerVersion = fileInfo.writerVersion;
+        footer = fileInfo.footer;
         // For multiple runs, in case sendSplitsInFooter changes
         if (fileMetaInfo == null && context.footerInSplits) {
+          Reader orcReader = createOrcReader();
           fileInfo.fileMetaInfo = ((ReaderImpl) orcReader).getFileMetaInfo();
-          fileInfo.metadata = orcReader.getMetadata();
-          fileInfo.types = orcReader.getTypes();
-          fileInfo.writerVersion = orcReader.getWriterVersion();
+          assert fileInfo.metadata != null && fileInfo.footer != null
+              && fileInfo.writerVersion != null;
+          footer = fileInfo.footer;
+          // We assume that if we needed to create a reader, we need to cache it to meta
cache.
+          // TODO: This will also needlessly overwrite it in local cache for now.
+          Context.footerCache.put(fileWithId.getFileId(), file, fileInfo.fileMetaInfo, orcReader);
         }
       } else {
+        Reader orcReader = createOrcReader();
         stripes = orcReader.getStripes();
         metadata = orcReader.getMetadata();
         types = orcReader.getTypes();
         writerVersion = orcReader.getWriterVersion();
         fileMetaInfo = context.footerInSplits ?
             ((ReaderImpl) orcReader).getFileMetaInfo() : null;
+        footer = orcReader.getFooter();
         if (context.cacheStripeDetails) {
-          // Populate into cache.
-          Context.footerCache.put(file.getPath(),
-              new FileInfo(file.getModificationTime(), file.getLen(), stripes,
-                  metadata, types, fileMetaInfo, writerVersion));
+          Long fileId = fileWithId.getFileId();
+          Context.footerCache.put(fileId, file, fileMetaInfo, orcReader);
         }
       }
       includedCols = genIncludedColumns(types, context.conf, isOriginal);
-      projColsUncompressedSize = computeProjectionSize(orcReader, includedCols, isOriginal);
+      projColsUncompressedSize = computeProjectionSize(footer, includedCols, isOriginal);
+    }
+
+    private Reader createOrcReader() throws IOException {
+      return OrcFile.createReader(file.getPath(),
+          OrcFile.readerOptions(context.conf).filesystem(fs));
     }
 
-    private long computeProjectionSize(final Reader orcReader, final boolean[] includedCols,
-        final boolean isOriginal) {
+    private long computeProjectionSize(
+        OrcProto.Footer footer, final boolean[] includedCols, final boolean isOriginal) {
       final int rootIdx = getRootColumn(isOriginal);
       List<Integer> internalColIds = Lists.newArrayList();
       if (includedCols != null) {
@@ -1034,7 +1057,7 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
           }
         }
       }
-      return orcReader.getRawDataSizeFromColIndices(internalColIds);
+      return ReaderImpl.getRawDataSizeFromColIndices(internalColIds, footer);
     }
   }
 
@@ -1045,7 +1068,11 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
 
   static List<OrcSplit> generateSplitsInfo(Configuration conf, int numSplits)
       throws IOException {
-    // use threads to resolve directories into splits
+    // Use threads to resolve directories into splits.
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED)) {
+      // Create HiveConf once, since this is expensive.
+      conf = new HiveConf(conf, OrcInputFormat.class);
+    }
     Context context = new Context(conf, numSplits);
     List<OrcSplit> splits = Lists.newArrayList();
     List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();
@@ -1072,6 +1099,8 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
           LOG.debug(splitStrategy);
         }
 
+        // Hack note - different split strategies return differently typed lists, yay Java.
+        // This works purely by magic, because we know which strategy produces which type.
         if (splitStrategy instanceof ETLSplitStrategy) {
           List<SplitInfo> splitInfos = ((ETLSplitStrategy)splitStrategy).getSplits();
           for (SplitInfo splitInfo : splitInfos) {
@@ -1134,26 +1163,28 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
    *
    */
   private static class FileInfo {
-    long modificationTime;
-    long size;
-    List<StripeInformation> stripeInfos;
-    ReaderImpl.FileMetaInfo fileMetaInfo;
-    Metadata metadata;
-    List<OrcProto.Type> types;
+    private final long modificationTime;
+    private final long size;
+    private final Long fileId;
+    private final List<StripeInformation> stripeInfos;
+    private ReaderImpl.FileMetaInfo fileMetaInfo;
+    private Metadata metadata;
+    private OrcProto.Footer footer;
     private OrcFile.WriterVersion writerVersion;
 
 
     FileInfo(long modificationTime, long size,
              List<StripeInformation> stripeInfos,
-             Metadata metadata, List<OrcProto.Type> types,
+             Metadata metadata, OrcProto.Footer footer,
              ReaderImpl.FileMetaInfo fileMetaInfo,
-             OrcFile.WriterVersion writerVersion) {
+             OrcFile.WriterVersion writerVersion, Long fileId) {
       this.modificationTime = modificationTime;
       this.size = size;
+      this.fileId = fileId;
       this.stripeInfos = stripeInfos;
       this.fileMetaInfo = fileMetaInfo;
       this.metadata = metadata;
-      this.types = types;
+      this.footer = footer;
       this.writerVersion = writerVersion;
     }
   }
@@ -1513,5 +1544,186 @@ public class OrcInputFormat implements InputFormat<NullWritable,
OrcStruct>,
         bucket, validTxnList, new Reader.Options(), deltaDirectory);
   }
 
+  /**
+   * Represents footer cache.
+   */
+  public interface FooterCache {
+    FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) throws IOException;
+    void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
+        throws IOException;
+  }
+
+  /** Local footer cache using Guava. Stores convoluted Java objects. */
+  private static class LocalCache implements FooterCache {
+    private Cache<Path, FileInfo> cache;
+
+    public LocalCache(int numThreads, int cacheStripeDetailsSize) {
+      cache = CacheBuilder.newBuilder()
+        .concurrencyLevel(numThreads)
+        .initialCapacity(cacheStripeDetailsSize)
+        .maximumSize(cacheStripeDetailsSize)
+        .softValues()
+        .build();
+    }
+
+    @Override
+    public FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) {
+      // TODO: should local cache also be by fileId? Preserve the original logic for now.
+      FileInfo[] result = new FileInfo[files.size()];
+      int i = -1;
+      for (HdfsFileStatusWithId fileWithId : files) {
+        ++i;
+        FileStatus file = fileWithId.getFileStatus();
+        Path path = file.getPath();
+        Long fileId = fileWithId.getFileId();
+        FileInfo fileInfo = cache.getIfPresent(path);
+        if (isDebugEnabled) {
+          LOG.debug("Info " + (fileInfo == null ? "not " : "") + "cached for path: " + path);
+        }
+        if (fileInfo == null) continue;
+        if ((fileId != null && fileInfo.fileId != null && fileId == fileInfo.fileId)
+            || (fileInfo.modificationTime == file.getModificationTime() &&
+            fileInfo.size == file.getLen())) {
+          result[i] = fileInfo;
+          continue;
+        }
+        // Invalidate
+        cache.invalidate(path);
+        if (isDebugEnabled) {
+          LOG.debug("Meta-Info for : " + path + " changed. CachedModificationTime: "
+              + fileInfo.modificationTime + ", CurrentModificationTime: "
+              + file.getModificationTime() + ", CachedLength: " + fileInfo.size
+              + ", CurrentLength: " + file.getLen());
+        }
+      }
+      return result;
+    }
+
+    public void put(Path path, FileInfo fileInfo) {
+      cache.put(path, fileInfo);
+    }
+
+    @Override
+    public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
+        throws IOException {
+      cache.put(file.getPath(), new FileInfo(file.getModificationTime(), file.getLen(),
+          orcReader.getStripes(), orcReader.getMetadata(), orcReader.getFooter(), fileMetaInfo,
+          orcReader.getWriterVersion(), fileId));
+    }
+  }
+
+  /** Metastore-based footer cache storing serialized footers. Also has a local cache. */
+  public static class MetastoreCache implements FooterCache {
+    private final LocalCache localCache;
+    private boolean isWarnLogged = false;
+    private HiveConf conf;
+
+    public MetastoreCache(LocalCache lc) {
+      localCache = lc;
+    }
 
+    @Override
+    public FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) throws IOException
{
+      // First, check the local cache.
+      FileInfo[] result = localCache.getAndValidate(files);
+      assert result.length == files.size();
+      // This is an unfortunate consequence of batching/iterating thru MS results.
+      // TODO: maybe have a direct map call for small lists if this becomes a perf issue.
+      HashMap<Long, Integer> posMap = new HashMap<>(files.size());
+      for (int i = 0; i < result.length; ++i) {
+        if (result[i] != null) continue;
+        HdfsFileStatusWithId file = files.get(i);
+        Long fileId = file.getFileId();
+        if (fileId == null) {
+          if (!isWarnLogged || isDebugEnabled) {
+            LOG.warn("Not using metastore cache because fileId is missing: "
+                + file.getFileStatus().getPath());
+            isWarnLogged = true;
+          }
+          continue;
+        }
+        posMap.put(fileId, i);
+      }
+      Iterator<Entry<Long, ByteBuffer>> iter = null;
+      Hive hive;
+      try {
+        hive = getHive();
+        iter = hive.getFileMetadata(Lists.newArrayList(posMap.keySet()), conf).iterator();
+      } catch (HiveException ex) {
+        throw new IOException(ex);
+      }
+      List<Long> corruptIds = null;
+      while (iter.hasNext()) {
+        Entry<Long, ByteBuffer> e = iter.next();
+        int ix = posMap.get(e.getKey());
+        assert result[ix] == null;
+        HdfsFileStatusWithId file = files.get(ix);
+        assert file.getFileId() == e.getKey();
+        result[ix] = createFileInfoFromMs(file, e.getValue());
+        if (result[ix] == null) {
+          if (corruptIds == null) {
+            corruptIds = new ArrayList<>();
+          }
+          corruptIds.add(file.getFileId());
+        } else {
+          localCache.put(file.getFileStatus().getPath(), result[ix]);
+        }
+      }
+      if (corruptIds != null) {
+        try {
+          hive.clearFileMetadata(corruptIds);
+        } catch (HiveException ex) {
+          LOG.error("Failed to clear corrupt cache data", ex);
+        }
+      }
+      return result;
+    }
+
+    private Hive getHive() throws HiveException {
+      // TODO: we wish we could cache the Hive object, but it's not thread safe, and each
+      //       threadlocal we "cache" would need to be reinitialized for every query. This
is
+      //       a huge PITA. Hive object will be cached internally, but the compat check will
be
+      //       done every time inside get().
+      return Hive.getWithFastCheck(conf);
+    }
+
+    private static FileInfo createFileInfoFromMs(
+        HdfsFileStatusWithId file, ByteBuffer bb) throws IOException {
+      FileStatus fs = file.getFileStatus();
+      ReaderImpl.FooterInfo fi = null;
+      ByteBuffer original = bb.duplicate();
+      try {
+        fi = ReaderImpl.extractMetaInfoFromFooter(bb, fs.getPath());
+      } catch (Exception ex) {
+        byte[] data = new byte[original.remaining()];
+        System.arraycopy(original.array(), original.arrayOffset() + original.position(),
+            data, 0, data.length);
+        String msg = "Failed to parse the footer stored in cache for file ID "
+            + file.getFileId() + " " + original + " [ " + Hex.encodeHexString(data) + " ]";
+        LOG.error(msg, ex);
+        return null;
+      }
+      return new FileInfo(fs.getModificationTime(), fs.getLen(), fi.getStripes(),
+          fi.getMetadata(), fi.getFooter(), fi.getFileMetaInfo(),
+          fi.getFileMetaInfo().writerVersion, file.getFileId());
+    }
+
+    @Override
+    public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
+        throws IOException {
+      localCache.put(fileId, file, fileMetaInfo, orcReader);
+      if (fileId != null) {
+        try {
+          getHive().putFileMetadata(Lists.newArrayList(fileId),
+              Lists.newArrayList(((ReaderImpl)orcReader).getSerializedFileFooter()));
+        } catch (HiveException e) {
+          throw new IOException(e);
+        }
+      }
+    }
+
+    public void configure(HiveConf queryConfig) {
+      this.conf = queryConfig;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index cc03df7..8bcda75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -153,7 +153,7 @@ public class OrcSplit extends FileSplit {
     }
   }
 
-  ReaderImpl.FileMetaInfo getFileMetaInfo(){
+  public ReaderImpl.FileMetaInfo getFileMetaInfo() {
     return fileMetaInfo;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 3bac48a..a36027e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -233,8 +233,7 @@ public class ReaderImpl implements Reader {
       throw new FileFormatException("Malformed ORC file " + path +
           ". Invalid postscript length " + psLen);
     }
-    int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1
-        - len;
+    int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - len;
     byte[] array = buffer.array();
     // now look for the magic string at the end of the postscript.
     if (!Text.decode(array, offset, len).equals(OrcFile.MAGIC)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/bc1c434f/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 8efbb05..3ea8e25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -103,6 +103,7 @@ import org.apache.thrift.TException;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -241,15 +242,32 @@ public class Hive {
    *
    */
   public static Hive get(HiveConf c) throws HiveException {
+    return getInternal(c, false);
+  }
+
+  /**
+   * Same as {@link #get(HiveConf)}, except that it checks only the object identity of existing
+   * MS client, assuming the relevant settings would be unchanged within the same conf object.
+   */
+  public static Hive getWithFastCheck(HiveConf c) throws HiveException {
+    return getInternal(c, true);
+  }
+
+  private static Hive getInternal(HiveConf c, boolean isFastCheck) throws HiveException {
     Hive db = hiveDB.get();
     if (db == null || !db.isCurrentUserOwner() ||
-        (db.metaStoreClient != null && !db.metaStoreClient.isCompatibleWith(c)))
{
+        (db.metaStoreClient != null && !isCompatible(db, c, isFastCheck))) {
       return get(c, true);
     }
     db.conf = c;
     return db;
   }
 
+  private static boolean isCompatible(Hive db, HiveConf c, boolean isFastCheck) {
+    return isFastCheck
+        ? db.metaStoreClient.isSameConfObj(c) : db.metaStoreClient.isCompatibleWith(c);
+  }
+
   /**
    * get a connection to metastore. see get(HiveConf) function for comments
    *
@@ -3339,4 +3357,28 @@ private void constructOneLBLocationMap(FileStatus fSta,
     return true;
   }
 
+  public Iterable<Map.Entry<Long, ByteBuffer>> getFileMetadata(
+      List<Long> fileIds, Configuration conf) throws HiveException {
+    try {
+      return getMSC().getFileMetadata(fileIds);
+    } catch (TException e) {
+      throw new HiveException(e);
+    }
+  }
+
+  public void clearFileMetadata(List<Long> fileIds) throws HiveException {
+    try {
+      getMSC().clearFileMetadata(fileIds);
+    } catch (TException e) {
+      throw new HiveException(e);
+    }
+  }
+
+  public void putFileMetadata(List<Long> fileIds, List<ByteBuffer> metadata)
throws HiveException {
+    try {
+      getMSC().putFileMetadata(fileIds, metadata);
+    } catch (TException e) {
+      throw new HiveException(e);
+    }
+  }
 };


Mime
View raw message