phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject phoenix git commit: Implement UPSERT SELECT from client side (part of PHOENIX-2197 Support DML in Phoenix/Calcite integration)
Date Tue, 03 May 2016 19:35:27 GMT
Repository: phoenix
Updated Branches:
  refs/heads/calcite 3425347e3 -> 49c53e07f


Implement UPSERT SELECT from client side (part of PHOENIX-2197 Support DML in Phoenix/Calcite
integration)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/49c53e07
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/49c53e07
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/49c53e07

Branch: refs/heads/calcite
Commit: 49c53e07f2cd9ce7eac7259bed1fbe5acaeeb040
Parents: 3425347
Author: maryannxue <maryann.xue@gmail.com>
Authored: Tue May 3 15:35:18 2016 -0400
Committer: maryannxue <maryann.xue@gmail.com>
Committed: Tue May 3 15:35:18 2016 -0400

----------------------------------------------------------------------
 .../apache/phoenix/calcite/CalciteDMLIT.java    |   5 +-
 .../calcite/jdbc/PhoenixCalciteFactory.java     |  11 ++
 .../apache/phoenix/calcite/CalciteRuntime.java  |  64 ++++++--
 .../phoenix/calcite/rel/PhoenixTableModify.java | 157 +++++++++++++++----
 .../rel/PhoenixToEnumerableConverter.java       |   2 +-
 .../calcite/rules/PhoenixConverterRules.java    |   6 +-
 6 files changed, 198 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/49c53e07/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
index eb18a45..72e6ca3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
@@ -23,7 +23,10 @@ public class CalciteDMLIT extends BaseCalciteIT {
                        "  PhoenixTableModify(table=[[phoenix, ATABLE]], operation=[INSERT],
updateColumnList=[[]], flattened=[false])\n" +
                        "    PhoenixClientProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[null],
B_STRING=[null], A_INTEGER=[null], A_DATE=[null], A_TIME=[null], A_TIMESTAMP=[null], X_DECIMAL=[null],
X_LONG=[null], X_INTEGER=[null], Y_INTEGER=[null], A_BYTE=[null], A_SHORT=[null], A_FLOAT=[null],
A_DOUBLE=[null], A_UNSIGNED_FLOAT=[null], A_UNSIGNED_DOUBLE=[null])\n" +
                        "      PhoenixValues(tuples=[[{ '1              ', '1            
 ' }]])\n")
-            //.executeUpdate()
+            .executeUpdate()
             .close();
+//        start(false, 1L).sql("select organization_id, entity_id from aTable")
+//            .resultIs(new Object[][] {{"1              ", "1              "}})
+//            .close();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49c53e07/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java
b/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java
index 5b9fd1d..5f86303 100644
--- a/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java
+++ b/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java
@@ -97,6 +97,17 @@ public class PhoenixCalciteFactory extends CalciteFactory {
                     CalciteSchema.createRootSchema(true, false), typeFactory);
         }
         
+        public void commit() throws SQLException {
+            for (String subSchemaName : getRootSchema().getSubSchemaNames()) {          
    
+                try {
+                    PhoenixSchema phoenixSchema = getRootSchema()
+                            .getSubSchema(subSchemaName).unwrap(PhoenixSchema.class);
+                    phoenixSchema.pc.commit();
+                } catch (ClassCastException e) {
+                }
+            }
+        }
+        
         public void close() throws SQLException {
             for (String subSchemaName : getRootSchema().getSubSchemaNames()) {          
    
                 try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49c53e07/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
index 000a0c6..159df6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java
@@ -5,8 +5,10 @@ import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementPlan;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
@@ -36,24 +38,30 @@ import java.sql.Timestamp;
  * Methods used by code generated by Calcite.
  */
 public class CalciteRuntime {
-    public static Enumerable<Object> toEnumerable2(final ResultIterator iterator, final
RowProjector rowProjector) {
+    public static Enumerable<Object> toEnumerable(final StatementPlan plan) {
         return new AbstractEnumerable<Object>() {
             @Override
             public Enumerator<Object> enumerator() {
-                return toEnumerator(iterator, rowProjector);
+                try {
+                    if (plan instanceof QueryPlan) {
+                        return toEnumerator((QueryPlan) plan);
+                    }
+                    
+                    if (plan instanceof MutationPlan) {
+                        return toEnumerator((MutationPlan) plan);
+                    }
+                    
+                    throw new RuntimeException("Unexpected StatementPlan type: " + plan);
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                }
             }
         };
     }
 
-    public static Enumerable<Object> toEnumerable(final QueryPlan plan) {
-        try {
-            return toEnumerable2(plan.iterator(), plan.getProjector());
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public static Enumerator<Object> toEnumerator(final ResultIterator iterator, final
RowProjector rowProjector) {
+    public static Enumerator<Object> toEnumerator(QueryPlan plan) throws SQLException
{
+        final ResultIterator iterator = plan.iterator();
+        final RowProjector rowProjector = plan.getProjector();
         final int count = rowProjector.getColumnCount();
         return new Enumerator<Object>() {
             Object current;
@@ -153,4 +161,38 @@ public class CalciteRuntime {
             }
         };
     }
+    
+    public static Enumerator<Object> toEnumerator(final MutationPlan plan) {
+        return new Enumerator<Object>() {
+            Long updateCount = null;
+
+            @Override
+            public Object current() {
+                return updateCount;
+            }
+
+            @Override
+            public boolean moveNext() {
+                if (updateCount != null) {
+                    return false;
+                }
+
+                try {
+                    updateCount = plan.execute().getUpdateCount();
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                }
+                return true;
+            }
+
+            @Override
+            public void reset() {
+                throw new UnsupportedOperationException();                            
+            }
+
+            @Override
+            public void close() {
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49c53e07/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
index d5e4836..23262cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@ -1,8 +1,10 @@
 package org.apache.phoenix.calcite.rel;
 
 import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.calcite.plan.RelOptCluster;
@@ -11,22 +13,39 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.prepare.Prepare.CatalogReader;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableModify;
-import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementPlan;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.RowMutationState;
+import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class PhoenixTableModify extends TableModify implements PhoenixRel {
 
@@ -55,7 +74,22 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel
{
         }
         
         final QueryPlan queryPlan = implementor.visitInput(0, (PhoenixQueryRel) input);
-        final TableRef targetTableRef = getTable().unwrap(PhoenixTable.class).tableMapping.getTableRef();
+        final RowProjector projector = implementor.getTableMapping().createRowProjector();
+        final TableMapping tableMapping = getTable().unwrap(PhoenixTable.class).tableMapping;
+        final TableRef targetTableRef = tableMapping.getTableRef();
+        final List<PColumn> mappedColumns = tableMapping.getMappedColumns();
+        final int[] columnIndexes = new int[mappedColumns.size()];
+        final int[] pkSlotIndexes = new int[mappedColumns.size()];
+        for (int i = 0; i < columnIndexes.length; i++) {
+            PColumn column = mappedColumns.get(i);
+            if (SchemaUtil.isPKColumn(column)) {
+                pkSlotIndexes[i] = column.getPosition();
+            }
+            columnIndexes[i] = column.getPosition();
+        }
+        // TODO
+        final boolean useServerTimestamp = false;
+        
         return new MutationPlan() {
             @Override
             public ParameterMetaData getParameterMetaData() {
@@ -85,35 +119,58 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel
{
 
             @Override
             public MutationState execute() throws SQLException {
-//                ResultIterator iterator = queryPlan.iterator();
-//                if (parallelIteratorFactory == null) {
-//                    return upsertSelect(new StatementContext(statement), tableRef, projector,
iterator, columnIndexes, pkSlotIndexes, useServerTimestamp);
-//                }
-//                try {
-//                    parallelIteratorFactory.setRowProjector(projector);
-//                    parallelIteratorFactory.setColumnIndexes(columnIndexes);
-//                    parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
-//                    Tuple tuple;
-//                    long totalRowCount = 0;
-//                    StatementContext context = queryPlan.getContext();
-//                    while ((tuple=iterator.next()) != null) {// Runs query
-//                        Cell kv = tuple.getValue(0);
-//                        totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(),
kv.getValueOffset(), SortOrder.getDefault());
-//                    }
-//                    // Return total number of rows that have been updated. In the case
of auto commit being off
-//                    // the mutations will all be in the mutation state of the current connection.
-//                    MutationState mutationState = new MutationState(maxSize, statement.getConnection(),
totalRowCount);
-//                    /*
-//                     *  All the metrics collected for measuring the reads done by the parallel
mutating iterators
-//                     *  is included in the ReadMetricHolder of the statement context. Include
these metrics in the
-//                     *  returned mutation state so they can be published on commit. 
-//                     */
-//                    mutationState.setReadMetricQueue(context.getReadMetricsQueue());
-//                    return mutationState; 
-//                } finally {
-//                    iterator.close();
-//                }
-                return null;
+                ResultIterator iterator = queryPlan.iterator();
+                // simplest version, no run-on-server, no pipelined update
+                StatementContext childContext = queryPlan.getContext();
+                PhoenixStatement statement = childContext.getStatement();
+                PhoenixConnection connection = statement.getConnection();
+                ConnectionQueryServices services = connection.getQueryServices();
+                int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+                        QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+                int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+                boolean isAutoCommit = connection.getAutoCommit();
+                byte[][] values = new byte[columnIndexes.length][];
+                int rowCount = 0;
+                Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+                PTable table = targetTableRef.getTable();
+                try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext))
{
+                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                    while (rs.next()) {
+                        for (int i = 0; i < values.length; i++) {
+                            PColumn column = table.getColumns().get(columnIndexes[i]);
+                            byte[] bytes = rs.getBytes(i + 1);
+                            ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
+                            Object value = rs.getObject(i + 1);
+                            int rsPrecision = rs.getMetaData().getPrecision(i + 1);
+                            Integer precision = rsPrecision == 0 ? null : rsPrecision;
+                            int rsScale = rs.getMetaData().getScale(i + 1);
+                            Integer scale = rsScale == 0 ? null : rsScale;
+                            // We are guaranteed that the two column will have compatible
types,
+                            // as we checked that before.
+                            if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
precision, scale,
+                                    column.getMaxLength(), column.getScale())) { throw new
SQLExceptionInfo.Builder(
+                                    SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+                                    .setMessage("value=" + column.getDataType().toStringLiteral(ptr,
null)).build()
+                                    .buildException(); }
+                            column.getDataType().coerceBytes(ptr, value, column.getDataType(),

+                                    precision, scale, SortOrder.getDefault(), 
+                                    column.getMaxLength(), column.getScale(), column.getSortOrder(),
+                                    table.rowKeyOrderOptimizable());
+                            values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                        }
+                        setValues(values, pkSlotIndexes, columnIndexes, table, mutation,
statement, useServerTimestamp);
+                        rowCount++;
+                        // Commit a batch if auto commit is true and we're at our batch size
+                        if (isAutoCommit && rowCount % batchSize == 0) {
+                            MutationState state = new MutationState(targetTableRef, mutation,
0, maxSize, connection);
+                            connection.getMutationState().join(state);
+                            connection.getMutationState().send();
+                            mutation.clear();
+                        }
+                    }
+                    // If auto commit is true, this last batch will be committed upon return
+                    return new MutationState(targetTableRef, mutation, rowCount / batchSize
* batchSize, maxSize, connection);
+                }
             }
 
             @Override
@@ -127,5 +184,39 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel
{
             
         };
     }
+    
+    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement,
boolean useServerTimestamp) {
+        Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
+        byte[][] pkValues = new byte[table.getPKColumns().size()][];
+        // If the table uses salting, the first byte is the salting byte, set to an empty
array
+        // here and we will fill in the byte later in PRowImpl.
+        if (table.getBucketNum() != null) {
+            pkValues[0] = new byte[] {0};
+        }
+        Long rowTimestamp = null; // case when the table doesn't have a row timestamp column
+        RowTimestampColInfo rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
+        for (int i = 0; i < values.length; i++) {
+            byte[] value = values[i];
+            PColumn column = table.getColumns().get(columnIndexes[i]);
+            if (SchemaUtil.isPKColumn(column)) {
+                pkValues[pkSlotIndex[i]] = value;
+                if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos())
{
+                    if (!useServerTimestamp) {
+                        PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
+                        rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
+                        if (rowTimestamp < 0) {
+                            throw new IllegalDataException("Value of a column designated
as ROW_TIMESTAMP cannot be less than zero");
+                        }
+                        rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
+                    } 
+                }
+            } else {
+                columnValues.put(column, value);
+            }
+        }
+        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+        table.newKey(ptr, pkValues);
+        mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(),
rowTsColInfo));
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49c53e07/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
index 5e750b1..bffdf2f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
@@ -74,7 +74,7 @@ public class PhoenixToEnumerableConverter extends ConverterImpl implements
Enume
         //   return CalciteRuntime.toEnumerable(iterator);
         final BlockBuilder list = new BlockBuilder();
         StatementPlan plan = makePlan((PhoenixRel)getInput());
-        Expression var = stash(implementor, plan, QueryPlan.class);
+        Expression var = stash(implementor, plan, StatementPlan.class);
         final RelDataType rowType = getRowType();
         final PhysType physType =
             PhysTypeImpl.of(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/49c53e07/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index c42df99..119dad1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -75,6 +75,7 @@ public class PhoenixConverterRules {
         PhoenixToEnumerableConverterRule.SERVER,
         PhoenixToEnumerableConverterRule.SERVERJOIN,
         PhoenixToEnumerableConverterRule.CLIENT,
+        PhoenixToEnumerableConverterRule.MUTATION,
         PhoenixClientSortRule.INSTANCE,
         PhoenixServerSortRule.SERVER,
         PhoenixServerSortRule.SERVERJOIN,
@@ -100,6 +101,7 @@ public class PhoenixConverterRules {
         PhoenixToEnumerableConverterRule.SERVER,
         PhoenixToEnumerableConverterRule.SERVERJOIN,
         PhoenixToEnumerableConverterRule.CLIENT,
+        PhoenixToEnumerableConverterRule.MUTATION,
         PhoenixClientSortRule.INSTANCE,
         PhoenixServerSortRule.SERVER,
         PhoenixServerSortRule.SERVERJOIN,
@@ -842,7 +844,7 @@ public class PhoenixConverterRules {
             
             return new PhoenixTableModify(
                     modify.getCluster(),
-                    modify.getTraitSet().replace(PhoenixConvention.CLIENT),
+                    modify.getTraitSet().replace(PhoenixConvention.MUTATION),
                     modify.getTable(),
                     modify.getCatalogReader(),
                     convert(
@@ -866,6 +868,8 @@ public class PhoenixConverterRules {
                 new PhoenixToEnumerableConverterRule(PhoenixConvention.SERVERJOIN);
         public static final ConverterRule CLIENT =
                 new PhoenixToEnumerableConverterRule(PhoenixConvention.CLIENT);
+        public static final ConverterRule MUTATION =
+                new PhoenixToEnumerableConverterRule(PhoenixConvention.MUTATION);
 
         private PhoenixToEnumerableConverterRule(Convention inputConvention) {
             super(RelNode.class, inputConvention, EnumerableConvention.INSTANCE,


Mime
View raw message