cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/4] cassandra git commit: Limit size of windows with DTCS
Date Fri, 20 Nov 2015 10:10:04 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.1 28e51d66f -> 36019ab17


Limit size of windows with DTCS

Patch by marcuse; reviewed by Branimir Lambov for CASSANDRA-10280


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/99617a52
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/99617a52
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/99617a52

Branch: refs/heads/cassandra-3.1
Commit: 99617a529378f00cb86ab733959c7be9966860c9
Parents: 4f2337f
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Tue Sep 8 13:50:16 2015 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Fri Nov 20 10:59:46 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 pylib/cqlshlib/cql3handling.py                  |  1 +
 .../DateTieredCompactionStrategy.java           | 33 +++++++++--------
 .../DateTieredCompactionStrategyOptions.java    | 30 ++++++++++++++--
 .../DateTieredCompactionStrategyTest.java       | 38 +++++++++++++-------
 5 files changed, 73 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/99617a52/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 94a9ae2..66423c7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Limit window size in DTCS (CASSANDRA-10280)
  * sstableloader does not use MAX_HEAP_SIZE env parameter (CASSANDRA-10188)
  * (cqlsh) Improve COPY TO performance and error handling (CASSANDRA-9304)
  * Don't remove level info when running upgradesstables (CASSANDRA-10692)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99617a52/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 49970e4..38f118f 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -472,6 +472,7 @@ def cf_prop_val_mapkey_completer(ctxt, cass):
             opts.add('max_sstable_age_days')
             opts.add('timestamp_resolution')
             opts.add('min_threshold')
+            opts.add('max_window_size_seconds')
         return map(escape_value, opts)
     return ()
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99617a52/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index ece596f..ae684ec 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -131,7 +131,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
     {
         Iterable<SSTableReader> candidates = filterOldSSTables(Lists.newArrayList(candidateSSTables),
options.maxSSTableAge, now);
 
-        List<List<SSTableReader>> buckets = getBuckets(createSSTableAndMinTimestampPairs(candidates),
options.baseTime, base, now);
+        List<List<SSTableReader>> buckets = getBuckets(createSSTableAndMinTimestampPairs(candidates),
options.baseTime, base, now, options.maxWindowSize);
         logger.debug("Compaction buckets are {}", buckets);
         updateEstimatedCompactionsByTasks(buckets);
         List<SSTableReader> mostInteresting = newestBucket(buckets,
@@ -139,6 +139,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
                                                            cfs.getMaximumCompactionThreshold(),
                                                            now,
                                                            options.baseTime,
+                                                           options.maxWindowSize,
                                                            stcsOptions);
         if (!mostInteresting.isEmpty())
             return mostInteresting;
@@ -217,10 +218,13 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         // A timestamp t hits the target iff t / size == divPosition.
         public final long divPosition;
 
-        public Target(long size, long divPosition)
+        public final long maxWindowSize;
+
+        public Target(long size, long divPosition, long maxWindowSize)
         {
             this.size = size;
             this.divPosition = divPosition;
+            this.maxWindowSize = maxWindowSize;
         }
 
         /**
@@ -250,10 +254,10 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
          */
         public Target nextTarget(int base)
         {
-            if (divPosition % base > 0)
-                return new Target(size, divPosition - 1);
+            if (divPosition % base > 0 || size * base > maxWindowSize)
+                return new Target(size, divPosition - 1, maxWindowSize);
             else
-                return new Target(size * base, divPosition / base - 1);
+                return new Target(size * base, divPosition / base - 1, maxWindowSize);
         }
     }
 
@@ -270,7 +274,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
      *         Each bucket is also a list of files ordered from newest to oldest.
      */
     @VisibleForTesting
-    static <T> List<List<T>> getBuckets(Collection<Pair<T, Long>>
files, long timeUnit, int base, long now)
+    static <T> List<List<T>> getBuckets(Collection<Pair<T, Long>>
files, long timeUnit, int base, long now, long maxWindowSize)
     {
         // Sort files by age. Newest first.
         final List<Pair<T, Long>> sortedFiles = Lists.newArrayList(files);
@@ -283,7 +287,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         }));
 
         List<List<T>> buckets = Lists.newArrayList();
-        Target target = getInitialTarget(now, timeUnit);
+        Target target = getInitialTarget(now, timeUnit, maxWindowSize);
         PeekingIterator<Pair<T, Long>> it = Iterators.peekingIterator(sortedFiles.iterator());
 
         outerLoop:
@@ -302,7 +306,6 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
                 else // If the file is too old for the target, switch targets.
                     target = target.nextTarget(base);
             }
-
             List<T> bucket = Lists.newArrayList();
             while (target.onTarget(it.peek().right))
             {
@@ -318,9 +321,9 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
     }
 
     @VisibleForTesting
-    static Target getInitialTarget(long now, long timeUnit)
+    static Target getInitialTarget(long now, long timeUnit, long maxWindowSize)
     {
-        return new Target(timeUnit, now / timeUnit);
+        return new Target(timeUnit, now / timeUnit, maxWindowSize);
     }
 
 
@@ -329,8 +332,9 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         int n = 0;
         for (List<SSTableReader> bucket : tasks)
         {
-            if (bucket.size() >= cfs.getMinimumCompactionThreshold())
-                n += getSTCSBuckets(bucket, stcsOptions).size();
+            for (List<SSTableReader> stcsBucket : getSTCSBuckets(bucket, stcsOptions))
+                if (stcsBucket.size() >= cfs.getMinimumCompactionThreshold())
+                    n += Math.ceil((double)stcsBucket.size() / cfs.getMaximumCompactionThreshold());
         }
         estimatedRemainingTasks = n;
     }
@@ -343,12 +347,12 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
      * @return a bucket (list) of sstables to compact.
      */
     @VisibleForTesting
-    static List<SSTableReader> newestBucket(List<List<SSTableReader>> buckets,
int minThreshold, int maxThreshold, long now, long baseTime, SizeTieredCompactionStrategyOptions
stcsOptions)
+    static List<SSTableReader> newestBucket(List<List<SSTableReader>> buckets,
int minThreshold, int maxThreshold, long now, long baseTime, long maxWindowSize, SizeTieredCompactionStrategyOptions
stcsOptions)
     {
         // If the "incoming window" has at least minThreshold SSTables, choose that one.
         // For any other bucket, at least 2 SSTables is enough.
         // In any case, limit to maxThreshold SSTables.
-        Target incomingWindow = getInitialTarget(now, baseTime);
+        Target incomingWindow = getInitialTarget(now, baseTime, maxWindowSize);
         for (List<SSTableReader> bucket : buckets)
         {
             boolean inFirstWindow = incomingWindow.onTarget(bucket.get(0).getMinTimestamp());
@@ -412,7 +416,6 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         return Long.MAX_VALUE;
     }
 
-
     public static Map<String, String> validateOptions(Map<String, String> options)
throws ConfigurationException
     {
         Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99617a52/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
index 0cbf90e..5803115 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyOptions.java
@@ -25,17 +25,24 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 public final class DateTieredCompactionStrategyOptions
 {
     protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = TimeUnit.MICROSECONDS;
-    protected static final double DEFAULT_MAX_SSTABLE_AGE_DAYS = 365;
+    @Deprecated
+    protected static final double DEFAULT_MAX_SSTABLE_AGE_DAYS = 365*1000;
     protected static final long DEFAULT_BASE_TIME_SECONDS = 60;
+    protected static final long DEFAULT_MAX_WINDOW_SIZE_SECONDS = TimeUnit.SECONDS.convert(1,
TimeUnit.DAYS);
+
     protected static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10;
     protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
+    @Deprecated
     protected static final String MAX_SSTABLE_AGE_KEY = "max_sstable_age_days";
     protected static final String BASE_TIME_KEY = "base_time_seconds";
     protected static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds";
+    protected static final String MAX_WINDOW_SIZE_KEY = "max_window_size_seconds";
 
+    @Deprecated
     protected final long maxSSTableAge;
     protected final long baseTime;
     protected final long expiredSSTableCheckFrequency;
+    protected final long maxWindowSize;
 
     public DateTieredCompactionStrategyOptions(Map<String, String> options)
     {
@@ -48,13 +55,16 @@ public final class DateTieredCompactionStrategyOptions
         baseTime = timestampResolution.convert(optionValue == null ? DEFAULT_BASE_TIME_SECONDS
: Long.parseLong(optionValue), TimeUnit.SECONDS);
         optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
         expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(optionValue == null
? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS);
+        optionValue = options.get(MAX_WINDOW_SIZE_KEY);
+        maxWindowSize = timestampResolution.convert(optionValue == null ? DEFAULT_MAX_WINDOW_SIZE_SECONDS
: Long.parseLong(optionValue), TimeUnit.SECONDS);
     }
 
     public DateTieredCompactionStrategyOptions()
     {
-        maxSSTableAge = Math.round(DEFAULT_MAX_SSTABLE_AGE_DAYS * DEFAULT_TIMESTAMP_RESOLUTION.convert(1,
TimeUnit.DAYS));
+        maxSSTableAge = Math.round(DEFAULT_MAX_SSTABLE_AGE_DAYS * DEFAULT_TIMESTAMP_RESOLUTION.convert((long)
DEFAULT_MAX_SSTABLE_AGE_DAYS, TimeUnit.DAYS));
         baseTime = DEFAULT_TIMESTAMP_RESOLUTION.convert(DEFAULT_BASE_TIME_SECONDS, TimeUnit.SECONDS);
         expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS,
TimeUnit.SECONDS);
+        maxWindowSize = DEFAULT_TIMESTAMP_RESOLUTION.convert(1, TimeUnit.DAYS);
     }
 
     public static Map<String, String> validateOptions(Map<String, String> options,
Map<String, String> uncheckedOptions) throws  ConfigurationException
@@ -112,10 +122,26 @@ public final class DateTieredCompactionStrategyOptions
             throw new ConfigurationException(String.format("%s is not a parsable int (base10)
for %s", optionValue, EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY), e);
         }
 
+        optionValue = options.get(MAX_WINDOW_SIZE_KEY);
+        try
+        {
+            long maxWindowSize = optionValue == null ? DEFAULT_MAX_WINDOW_SIZE_SECONDS :
Long.parseLong(optionValue);
+            if (maxWindowSize < 0)
+            {
+                throw new ConfigurationException(String.format("%s must not be negative,
but was %d", MAX_WINDOW_SIZE_KEY, maxWindowSize));
+            }
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable int (base10)
for %s", optionValue, MAX_WINDOW_SIZE_KEY), e);
+        }
+
+
         uncheckedOptions.remove(MAX_SSTABLE_AGE_KEY);
         uncheckedOptions.remove(BASE_TIME_KEY);
         uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY);
         uncheckedOptions.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+        uncheckedOptions.remove(MAX_WINDOW_SIZE_KEY);
 
         return uncheckedOptions;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99617a52/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
index 368101d..5afd575 100644
--- a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
@@ -88,6 +88,17 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
             options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "0");
         }
 
+        try
+        {
+            options.put(DateTieredCompactionStrategyOptions.MAX_WINDOW_SIZE_KEY, "-1");
+            validateOptions(options);
+            fail(String.format("Negative %s should be rejected", DateTieredCompactionStrategyOptions.MAX_WINDOW_SIZE_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            options.put(DateTieredCompactionStrategyOptions.MAX_WINDOW_SIZE_KEY, "0");
+        }
+
         options.put("bad_option", "1.0");
         unvalidated = validateOptions(options);
         assertTrue(unvalidated.containsKey("bad_option"));
@@ -101,11 +112,11 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
         options.put(DateTieredCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "SECONDS");
 
         DateTieredCompactionStrategyOptions opts = new DateTieredCompactionStrategyOptions(options);
-        assertEquals(opts.maxSSTableAge, TimeUnit.SECONDS.convert(365, TimeUnit.DAYS));
+        assertEquals(opts.maxSSTableAge, TimeUnit.SECONDS.convert(365*1000, TimeUnit.DAYS));
 
         options.put(DateTieredCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS");
         opts = new DateTieredCompactionStrategyOptions(options);
-        assertEquals(opts.maxSSTableAge, TimeUnit.MILLISECONDS.convert(365, TimeUnit.DAYS));
+        assertEquals(opts.maxSSTableAge, TimeUnit.MILLISECONDS.convert(365*1000, TimeUnit.DAYS));
 
         options.put(DateTieredCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MICROSECONDS");
         options.put(DateTieredCompactionStrategyOptions.MAX_SSTABLE_AGE_KEY, "10");
@@ -132,7 +143,7 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
                 Pair.create("a", 1L),
                 Pair.create("b", 201L)
         );
-        List<List<String>> buckets = getBuckets(pairs, 100L, 2, 200L);
+        List<List<String>> buckets = getBuckets(pairs, 100L, 2, 200L, Long.MAX_VALUE);
         assertEquals(2, buckets.size());
 
         for (List<String> bucket : buckets)
@@ -151,7 +162,7 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
                 Pair.create("b", 3899L),
                 Pair.create("c", 3900L)
         );
-        buckets = getBuckets(pairs, 100L, 3, 4050L);
+        buckets = getBuckets(pairs, 100L, 3, 4050L, Long.MAX_VALUE);
         // targets (divPosition, size): (40, 100), (39, 100), (12, 300), (3, 900), (0, 2700)
         // in other words: 0 - 2699, 2700 - 3599, 3600 - 3899, 3900 - 3999, 4000 - 4099
         assertEquals(3, buckets.size());
@@ -177,7 +188,7 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
                 Pair.create("e", 3950L),
                 Pair.create("too new", 4125L)
         );
-        buckets = getBuckets(pairs, 100L, 1, 4050L);
+        buckets = getBuckets(pairs, 100L, 1, 4050L, Long.MAX_VALUE);
 
         assertEquals(5, buckets.size());
 
@@ -193,7 +204,6 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
-        cfs.truncateBlocking();
         cfs.disableAutoCompaction();
 
         ByteBuffer value = ByteBuffer.wrap(new byte[100]);
@@ -212,15 +222,16 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
 
         List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
 
-        List<SSTableReader> newBucket = newestBucket(Collections.singletonList(sstrs.subList(0,
2)), 4, 32, 9, 10, new SizeTieredCompactionStrategyOptions());
+        List<SSTableReader> newBucket = newestBucket(Collections.singletonList(sstrs.subList(0,
2)), 4, 32, 9, 10, Long.MAX_VALUE, new SizeTieredCompactionStrategyOptions());
         assertTrue("incoming bucket should not be accepted when it has below the min threshold
SSTables", newBucket.isEmpty());
 
-        newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 10,
10, new SizeTieredCompactionStrategyOptions());
+        newBucket = newestBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32, 10,
10, Long.MAX_VALUE, new SizeTieredCompactionStrategyOptions());
         assertFalse("non-incoming bucket should be accepted when it has at least 2 SSTables",
newBucket.isEmpty());
 
         assertEquals("an sstable with a single value should have equal min/max timestamps",
sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp());
         assertEquals("an sstable with a single value should have equal min/max timestamps",
sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp());
         assertEquals("an sstable with a single value should have equal min/max timestamps",
sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp());
+        cfs.truncateBlocking();
     }
 
     @Test
@@ -228,7 +239,6 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
-        cfs.truncateBlocking();
         cfs.disableAutoCompaction();
 
         ByteBuffer value = ByteBuffer.wrap(new byte[100]);
@@ -259,6 +269,7 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
 
         filtered = filterOldSSTables(sstrs, 1, 4);
         assertEquals("no sstables should remain when all are too old", 0, Iterables.size(filtered));
+        cfs.truncateBlocking();
     }
 
 
@@ -267,7 +278,6 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
-        cfs.truncateBlocking();
         cfs.disableAutoCompaction();
 
         ByteBuffer value = ByteBuffer.wrap(new byte[100]);
@@ -305,6 +315,7 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
         SSTableReader sstable = t.sstables.iterator().next();
         assertEquals(sstable, expiredSSTable);
         cfs.getDataTracker().unmarkCompacting(cfs.getSSTables());
+        cfs.truncateBlocking();
     }
 
     @Test
@@ -312,7 +323,6 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
-        cfs.truncateBlocking();
         cfs.disableAutoCompaction();
         ByteBuffer bigValue = ByteBuffer.wrap(new byte[10000]);
         ByteBuffer value = ByteBuffer.wrap(new byte[100]);
@@ -344,7 +354,9 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
         DateTieredCompactionStrategy dtcs = new DateTieredCompactionStrategy(cfs, options);
         for (SSTableReader sstable : cfs.getSSTables())
             dtcs.addSSTable(sstable);
-        assertEquals(20, dtcs.getNextBackgroundTask(0).sstables.size());
+        AbstractCompactionTask task = dtcs.getNextBackgroundTask(0);
+        assertEquals(20, task.sstables.size());
+        cfs.getDataTracker().unmarkCompacting(task.sstables);
+        cfs.truncateBlocking();
     }
-
 }


Mime
View raw message