flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-5727] [table] Unify API of batch and stream TableEnvironments.
Date Tue, 07 Feb 2017 23:59:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master cba85db64 -> e24a866bf


[FLINK-5727] [table] Unify API of batch and stream TableEnvironments.

This closes #3281.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d0cb135
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d0cb135
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d0cb135

Branch: refs/heads/master
Commit: 2d0cb135972301926d62366dd3bb8b888d6d972d
Parents: cba85db
Author: Kurt Young <ykt836@gmail.com>
Authored: Tue Feb 7 14:15:02 2017 +0800
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Feb 7 23:08:31 2017 +0100

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala | 57 ++++---------------
 .../table/api/StreamTableEnvironment.scala      | 60 ++++----------------
 .../flink/table/api/TableEnvironment.scala      | 44 +++++++++++++-
 .../table/api/java/BatchTableEnvironment.scala  |  4 +-
 .../table/api/java/StreamTableEnvironment.scala |  8 +--
 .../api/scala/StreamTableEnvironment.scala      |  4 +-
 .../flink/table/TableEnvironmentTest.scala      |  3 +-
 .../table/api/scala/batch/TableSourceTest.scala |  4 +-
 .../api/scala/stream/TableSourceITCase.scala    |  2 +-
 9 files changed, 76 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d0cb135/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 4b9936d..dd0487a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -29,15 +29,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.table.calcite.FlinkPlannerImpl
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.Expression
-import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
 import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
-import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.sources.{BatchTableSource, TableSource}
 import org.apache.flink.types.Row
 
 /**
@@ -85,55 +83,22 @@ abstract class BatchTableEnvironment(
   protected def createUniqueTableName(): String = "_DataSetTable_" + nameCntr.getAndIncrement()
 
   /**
-    * Scans a registered table and returns the resulting [[Table]].
-    *
-    * The table to scan must be registered in the [[TableEnvironment]]'s catalog.
-    *
-    * @param tableName The name of the table to scan.
-    * @throws ValidationException if no table is registered under the given name.
-    * @return The scanned table.
-    */
-  @throws[ValidationException]
-  def scan(tableName: String): Table = {
-    if (isRegistered(tableName)) {
-      new Table(this, CatalogNode(tableName, getRowType(tableName)))
-    } else {
-      throw new TableException(s"Table \'$tableName\' was not found in the registry.")
-    }
-  }
-
-  /**
     * Registers an external [[BatchTableSource]] in this [[TableEnvironment]]'s catalog.
     * Registered tables can be referenced in SQL queries.
     *
-    * @param name The name under which the [[BatchTableSource]] is registered.
-    * @param tableSource The [[BatchTableSource]] to register.
+    * @param name        The name under which the [[TableSource]] is registered.
+    * @param tableSource The [[TableSource]] to register.
     */
-  def registerTableSource(name: String, tableSource: BatchTableSource[_]): Unit = {
-
+  override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
     checkValidTableName(name)
-    registerTableInternal(name, new TableSourceTable(tableSource))
-  }
 
-  /**
-    * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
-    *
-    * All tables referenced by the query must be registered in the TableEnvironment.
-    *
-    * @param query The SQL query to evaluate.
-    * @return The result of the query as Table.
-    */
-  override def sql(query: String): Table = {
-
-    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
-    // parse the sql query
-    val parsed = planner.parse(query)
-    // validate the sql query
-    val validated = planner.validate(parsed)
-    // transform to a relational tree
-    val relational = planner.rel(validated)
-
-    new Table(this, LogicalRelNode(relational.rel))
+    tableSource match {
+      case batchTableSource: BatchTableSource[_] =>
+        registerTableInternal(name, new TableSourceTable(batchTableSource))
+      case _ =>
+        throw new TableException("Only BatchTableSource can be registered in " +
+            "BatchTableEnvironment")
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2d0cb135/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index c08b502..81e884d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -29,15 +29,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.calcite.FlinkPlannerImpl
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.Expression
-import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable}
 import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
-import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.types.Row
 
 /**
@@ -126,59 +124,23 @@ abstract class StreamTableEnvironment(
   }
 
   /**
-    * Ingests a registered table and returns the resulting [[Table]].
-    *
-    * The table to ingest must be registered in the [[TableEnvironment]]'s catalog.
-    *
-    * @param tableName The name of the table to ingest.
-    * @throws ValidationException if no table is registered under the given name.
-    * @return The ingested table.
-    */
-  @throws[ValidationException]
-  def ingest(tableName: String): Table = {
-
-    if (isRegistered(tableName)) {
-      new Table(this, CatalogNode(tableName, getRowType(tableName)))
-    }
-    else {
-      throw new ValidationException(s"Table \'$tableName\' was not found in the registry.")
-    }
-  }
-
-  /**
     * Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog.
     * Registered tables can be referenced in SQL queries.
     *
-    * @param name        The name under which the [[StreamTableSource]] is registered.
-    * @param tableSource The [[org.apache.flink.table.sources.StreamTableSource]] to register.
+    * @param name        The name under which the [[TableSource]] is registered.
+    * @param tableSource The [[TableSource]] to register.
     */
-  def registerTableSource(name: String, tableSource: StreamTableSource[_]): Unit = {
-
+  override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
     checkValidTableName(name)
-    registerTableInternal(name, new TableSourceTable(tableSource))
-  }
-
-  /**
-    * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
-    *
-    * All tables referenced by the query must be registered in the TableEnvironment.
-    *
-    * @param query The SQL query to evaluate.
-    * @return The result of the query as Table.
-    */
-  override def sql(query: String): Table = {
-
-    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
-    // parse the sql query
-    val parsed = planner.parse(query)
-    // validate the sql query
-    val validated = planner.validate(parsed)
-    // transform to a relational tree
-    val relational = planner.rel(validated)
 
-    new Table(this, LogicalRelNode(relational.rel))
+    tableSource match {
+      case streamTableSource: StreamTableSource[_] =>
+        registerTableInternal(name, new TableSourceTable(streamTableSource))
+      case _ =>
+        throw new TableException("Only StreamTableSource can be registered in " +
+            "StreamTableEnvironment")
+    }
   }
-
   /**
     * Writes a [[Table]] to a [[TableSink]].
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/2d0cb135/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 2dcfc95..bcff1fb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -41,13 +41,14 @@ import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment
=>
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
 import java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment =>
JavaStreamTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv,
StreamTableEnvironment => ScalaStreamTableEnv}
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory,
FlinkTypeSystem}
 import org.apache.flink.table.codegen.ExpressionReducer
 import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation,
checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.cost.DataSetCostFactory
-import org.apache.flink.table.plan.schema.RelTable
+import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
+import org.apache.flink.table.plan.schema.{RelTable, TableSourceTable}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.validate.FunctionCatalog
@@ -219,6 +220,15 @@ abstract class TableEnvironment(val config: TableConfig) {
   }
 
   /**
+    * Registers an external [[TableSource]] in this [[TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * @param name        The name under which the [[TableSource]] is registered.
+    * @param tableSource The [[TableSource]] to register.
+    */
+  def registerTableSource(name: String, tableSource: TableSource[_]): Unit
+
+  /**
     * Unregisters a [[Table]] in the TableEnvironment's catalog.
     * Unregistered tables cannot be referenced in SQL queries anymore.
     *
@@ -252,6 +262,24 @@ abstract class TableEnvironment(val config: TableConfig) {
   }
 
   /**
+    * Scans a registered table and returns the resulting [[Table]].
+    *
+    * The table to scan must be registered in the [[TableEnvironment]]'s catalog.
+    *
+    * @param tableName The name of the table to scan.
+    * @throws ValidationException if no table is registered under the given name.
+    * @return The scanned table.
+    */
+  @throws[ValidationException]
+  def scan(tableName: String): Table = {
+    if (isRegistered(tableName)) {
+      new Table(this, CatalogNode(tableName, getRowType(tableName)))
+    } else {
+      throw new TableException(s"Table \'$tableName\' was not found in the registry.")
+    }
+  }
+
+  /**
     * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
     *
     * All tables referenced by the query must be registered in the TableEnvironment.
@@ -259,7 +287,17 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @param query The SQL query to evaluate.
     * @return The result of the query as Table.
     */
-  def sql(query: String): Table
+  def sql(query: String): Table = {
+    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
+    // parse the sql query
+    val parsed = planner.parse(query)
+    // validate the sql query
+    val validated = planner.validate(parsed)
+    // transform to a relational tree
+    val relational = planner.rel(validated)
+
+    new Table(this, LogicalRelNode(relational.rel))
+  }
 
   /**
     * Writes a [[Table]] to a [[TableSink]].

http://git-wip-us.apache.org/repos/asf/flink/blob/2d0cb135/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
index 15e842e..de5f789 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
@@ -133,7 +133,7 @@ class BatchTableEnvironment(
     * Converts the given [[Table]] into a [[DataSet]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
     *
@@ -150,7 +150,7 @@ class BatchTableEnvironment(
     * Converts the given [[Table]] into a [[DataSet]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/2d0cb135/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index 428dcae..4d9f1e1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -59,7 +59,7 @@ class StreamTableEnvironment(
 
     val name = createUniqueTableName()
     registerDataStreamInternal(name, dataStream)
-    ingest(name)
+    scan(name)
   }
 
   /**
@@ -84,7 +84,7 @@ class StreamTableEnvironment(
 
     val name = createUniqueTableName()
     registerDataStreamInternal(name, dataStream, exprs)
-    ingest(name)
+    scan(name)
   }
 
   /**
@@ -135,7 +135,7 @@ class StreamTableEnvironment(
     * Converts the given [[Table]] into a [[DataStream]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
-    * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
     *
@@ -152,7 +152,7 @@ class StreamTableEnvironment(
     * Converts the given [[Table]] into a [[DataStream]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
-    * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/2d0cb135/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 1e6749e..0113146 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -60,7 +60,7 @@ class StreamTableEnvironment(
 
     val name = createUniqueTableName()
     registerDataStreamInternal(name, dataStream.javaStream)
-    ingest(name)
+    scan(name)
   }
 
   /**
@@ -82,7 +82,7 @@ class StreamTableEnvironment(
 
     val name = createUniqueTableName()
     registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
-    ingest(name)
+    scan(name)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2d0cb135/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index f91aee9..1f73427 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableException}
 import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
 import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
 import org.junit.Test
 import org.junit.Assert.assertEquals
 
@@ -286,7 +287,7 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
 
   override protected def getBuiltInRuleSet: RuleSet = ???
 
-  override def sql(query: String): Table = ???
+  override def registerTableSource(name: String, tableSource: TableSource[_]) = ???
 }
 
 case class CClass(cf1: Int, cf2: String, cf3: Double)

http://git-wip-us.apache.org/repos/asf/flink/blob/2d0cb135/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
index 55da42a..609dc91 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
@@ -93,7 +93,7 @@ class TableSourceTest extends TableTestBase {
     tEnv.registerTableSource(tableName, csvTable)
 
     val result = tEnv
-      .ingest(tableName)
+      .scan(tableName)
       .select('last, 'id.floor(), 'score * 2)
 
     val expected = unaryNode(
@@ -132,7 +132,7 @@ class TableSourceTest extends TableTestBase {
     tEnv.registerTableSource(tableName, csvTable)
 
     val result = tEnv
-      .ingest(tableName)
+      .scan(tableName)
       .select('id, 'score, 'first)
 
     val expected = sourceStreamTableNode(tableName, noCalcFields)

http://git-wip-us.apache.org/repos/asf/flink/blob/2d0cb135/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 381bd5d..06d94aa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -68,7 +68,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
     tEnv.registerTableSource("csvTable", csvTable)
-    tEnv.ingest("csvTable")
+    tEnv.scan("csvTable")
       .where('id > 4)
       .select('last, 'score * 2)
       .toDataStream[Row]


Mime
View raw message