beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [46/50] [abbrv] beam git commit: Dynamic sizing of Datastore write RPCs.
Date Thu, 20 Jul 2017 19:53:42 GMT
Dynamic sizing of Datastore write RPCs.

This implements the same behaviour recently added to Java SDK:
- start at 200 entities per RPC;
- size subsequent requests based on observed latency of previous requests.
Includes a MovingSum class to track recent latency.
Report RPC success & failure counts as metrics (again, as in the Java SDK).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a5157e7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a5157e7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a5157e7

Branch: refs/heads/DSL_SQL
Commit: 0a5157e7e75eb1e1dbfeff4888857154150cc6b6
Parents: 2e51bde
Author: Colin Phipps <fipsy@google.com>
Authored: Fri Jul 14 16:10:23 2017 +0000
Committer: chamikara@google.com <chamikara@google.com>
Committed: Thu Jul 20 10:18:38 2017 -0700

----------------------------------------------------------------------
 .../io/gcp/datastore/v1/datastoreio.py          | 84 ++++++++++++++---
 .../io/gcp/datastore/v1/datastoreio_test.py     | 53 +++++++++--
 .../apache_beam/io/gcp/datastore/v1/helper.py   | 35 ++++++--
 .../apache_beam/io/gcp/datastore/v1/util.py     | 95 ++++++++++++++++++++
 .../io/gcp/datastore/v1/util_test.py            | 67 ++++++++++++++
 5 files changed, 310 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0a5157e7/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index 89c2a93..0258814 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -18,6 +18,7 @@
 """A connector for reading from and writing to Google Cloud Datastore"""
 
 import logging
+import time
 
 # Protect against environments where datastore library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
@@ -30,6 +31,7 @@ except ImportError:
 
 from apache_beam.io.gcp.datastore.v1 import helper
 from apache_beam.io.gcp.datastore.v1 import query_splitter
+from apache_beam.io.gcp.datastore.v1 import util
 from apache_beam.transforms import Create
 from apache_beam.transforms import DoFn
 from apache_beam.transforms import FlatMap
@@ -38,6 +40,7 @@ from apache_beam.transforms import Map
 from apache_beam.transforms import PTransform
 from apache_beam.transforms import ParDo
 from apache_beam.transforms.util import Values
+from apache_beam.metrics.metric import Metrics
 
 __all__ = ['ReadFromDatastore', 'WriteToDatastore', 'DeleteFromDatastore']
 
@@ -313,12 +316,15 @@ class _Mutate(PTransform):
   supported, as the commits are retried when failures occur.
   """
 
+  _WRITE_BATCH_INITIAL_SIZE = 200
   # Max allowed Datastore writes per batch, and max bytes per batch.
   # Note that the max bytes per batch set here is lower than the 10MB limit
   # actually enforced by the API, to leave space for the CommitRequest wrapper
   # around the mutations.
-  _WRITE_BATCH_SIZE = 500
-  _WRITE_BATCH_BYTES_SIZE = 9000000
+  _WRITE_BATCH_MAX_SIZE = 500
+  _WRITE_BATCH_MAX_BYTES_SIZE = 9000000
+  _WRITE_BATCH_MIN_SIZE = 10
+  _WRITE_BATCH_TARGET_LATENCY_MS = 5000
 
   def __init__(self, project, mutation_fn):
     """Initializes a Mutate transform.
@@ -342,48 +348,102 @@ class _Mutate(PTransform):
     return {'project': self._project,
             'mutation_fn': self._mutation_fn.__class__.__name__}
 
+  class _DynamicBatchSizer(object):
+    """Determines request sizes for future Datastore RPCS."""
+    def __init__(self):
+      self._commit_time_per_entity_ms = util.MovingSum(window_ms=120000,
+                                                       bucket_ms=10000)
+
+    def get_batch_size(self, now):
+      """Returns the recommended size for datastore RPCs at this time."""
+      if not self._commit_time_per_entity_ms.has_data(now):
+        return _Mutate._WRITE_BATCH_INITIAL_SIZE
+
+      recent_mean_latency_ms = (self._commit_time_per_entity_ms.sum(now)
+                                / self._commit_time_per_entity_ms.count(now))
+      return max(_Mutate._WRITE_BATCH_MIN_SIZE,
+                 min(_Mutate._WRITE_BATCH_MAX_SIZE,
+                     _Mutate._WRITE_BATCH_TARGET_LATENCY_MS
+                     / max(recent_mean_latency_ms, 1)
+                    ))
+
+    def report_latency(self, now, latency_ms, num_mutations):
+      """Reports the latency of an RPC to Datastore.
+
+      Args:
+        now: double, completion time of the RPC as seconds since the epoch.
+        latency_ms: double, the observed latency in milliseconds for this RPC.
+        num_mutations: int, number of mutations contained in the RPC.
+      """
+      self._commit_time_per_entity_ms.add(now, latency_ms / num_mutations)
+
   class DatastoreWriteFn(DoFn):
     """A ``DoFn`` that write mutations to Datastore.
 
     Mutations are written in batches, where the maximum batch size is
-    `Mutate._WRITE_BATCH_SIZE`.
+    `_Mutate._WRITE_BATCH_SIZE`.
 
     Commits are non-transactional. If a commit fails because of a conflict over
     an entity group, the commit will be retried. This means that the mutation
     should be idempotent (`upsert` and `delete` mutations) to prevent duplicate
     data or errors.
     """
-    def __init__(self, project):
+    def __init__(self, project, fixed_batch_size=None):
+      """
+      Args:
+        project: str, the cloud project id.
+        fixed_batch_size: int, for testing only, this forces all batches of
+           writes to be a fixed size, for easier unittesting.
+      """
       self._project = project
       self._datastore = None
-      self._mutations = []
-      self._mutations_size = 0  # Total size of entries in _mutations.
+      self._fixed_batch_size = fixed_batch_size
+      self._rpc_successes = Metrics.counter(
+          _Mutate.DatastoreWriteFn, "datastoreRpcSuccesses")
+      self._rpc_errors = Metrics.counter(
+          _Mutate.DatastoreWriteFn, "datastoreRpcErrors")
+
+    def _update_rpc_stats(self, successes=0, errors=0):
+      self._rpc_successes.inc(successes)
+      self._rpc_errors.inc(errors)
 
     def start_bundle(self):
       self._mutations = []
       self._mutations_size = 0
       self._datastore = helper.get_datastore(self._project)
+      if self._fixed_batch_size:
+        self._target_batch_size = self._fixed_batch_size
+      else:
+        self._batch_sizer = _Mutate._DynamicBatchSizer()
+        self._target_batch_size = self._batch_sizer.get_batch_size(time.time())
 
     def process(self, element):
       size = element.ByteSize()
       if (self._mutations and
-          size + self._mutations_size > _Mutate._WRITE_BATCH_BYTES_SIZE):
+          size + self._mutations_size > _Mutate._WRITE_BATCH_MAX_BYTES_SIZE):
         self._flush_batch()
       self._mutations.append(element)
       self._mutations_size += size
-      if len(self._mutations) >= _Mutate._WRITE_BATCH_SIZE:
+      if len(self._mutations) >= self._target_batch_size:
         self._flush_batch()
 
     def finish_bundle(self):
       if self._mutations:
         self._flush_batch()
-      self._mutations = []
-      self._mutations_size = 0
 
     def _flush_batch(self):
       # Flush the current batch of mutations to Cloud Datastore.
-      helper.write_mutations(self._datastore, self._project, self._mutations)
-      logging.debug("Successfully wrote %d mutations.", len(self._mutations))
+      _, latency_ms = helper.write_mutations(
+          self._datastore, self._project, self._mutations,
+          self._update_rpc_stats)
+      logging.debug("Successfully wrote %d mutations in %dms.",
+                    len(self._mutations), latency_ms)
+
+      if not self._fixed_batch_size:
+        now = time.time()
+        self._batch_sizer.report_latency(now, latency_ms, len(self._mutations))
+        self._target_batch_size = self._batch_sizer.get_batch_size(now)
+
       self._mutations = []
       self._mutations_size = 0
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0a5157e7/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
index 94cac3e..72c4c8c 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio_test.py
@@ -155,15 +155,15 @@ class DatastoreioTest(unittest.TestCase):
     self.check_DatastoreWriteFn(0)
 
   def test_DatastoreWriteFn_with_one_batch(self):
-    num_entities_to_write = _Mutate._WRITE_BATCH_SIZE * 1 - 50
+    num_entities_to_write = _Mutate._WRITE_BATCH_INITIAL_SIZE * 1 - 50
     self.check_DatastoreWriteFn(num_entities_to_write)
 
   def test_DatastoreWriteFn_with_multiple_batches(self):
-    num_entities_to_write = _Mutate._WRITE_BATCH_SIZE * 3 + 50
+    num_entities_to_write = _Mutate._WRITE_BATCH_INITIAL_SIZE * 3 + 50
     self.check_DatastoreWriteFn(num_entities_to_write)
 
   def test_DatastoreWriteFn_with_batch_size_exact_multiple(self):
-    num_entities_to_write = _Mutate._WRITE_BATCH_SIZE * 2
+    num_entities_to_write = _Mutate._WRITE_BATCH_INITIAL_SIZE * 2
     self.check_DatastoreWriteFn(num_entities_to_write)
 
   def check_DatastoreWriteFn(self, num_entities):
@@ -180,7 +180,8 @@ class DatastoreioTest(unittest.TestCase):
       self._mock_datastore.commit.side_effect = (
           fake_datastore.create_commit(actual_mutations))
 
-      datastore_write_fn = _Mutate.DatastoreWriteFn(self._PROJECT)
+      datastore_write_fn = _Mutate.DatastoreWriteFn(
+          self._PROJECT, fixed_batch_size=_Mutate._WRITE_BATCH_INITIAL_SIZE)
 
       datastore_write_fn.start_bundle()
       for mutation in expected_mutations:
@@ -188,8 +189,9 @@ class DatastoreioTest(unittest.TestCase):
       datastore_write_fn.finish_bundle()
 
       self.assertEqual(actual_mutations, expected_mutations)
-      self.assertEqual((num_entities - 1) / _Mutate._WRITE_BATCH_SIZE + 1,
-                       self._mock_datastore.commit.call_count)
+      self.assertEqual(
+          (num_entities - 1) / _Mutate._WRITE_BATCH_INITIAL_SIZE + 1,
+          self._mock_datastore.commit.call_count)
 
   def test_DatastoreWriteLargeEntities(self):
     """100*100kB entities gets split over two Commit RPCs."""
@@ -197,7 +199,8 @@ class DatastoreioTest(unittest.TestCase):
                       return_value=self._mock_datastore):
       entities = [e.entity for e in fake_datastore.create_entities(100)]
 
-      datastore_write_fn = _Mutate.DatastoreWriteFn(self._PROJECT)
+      datastore_write_fn = _Mutate.DatastoreWriteFn(
+          self._PROJECT, fixed_batch_size=_Mutate._WRITE_BATCH_INITIAL_SIZE)
       datastore_write_fn.start_bundle()
       for entity in entities:
         datastore_helper.add_properties(
@@ -258,5 +261,41 @@ class DatastoreioTest(unittest.TestCase):
     return split_queries
 
 
+@unittest.skipIf(datastore_pb2 is None, 'GCP dependencies are not installed')
+class DynamicWriteBatcherTest(unittest.TestCase):
+
+  def setUp(self):
+    self._batcher = _Mutate._DynamicBatchSizer()
+
+  # If possible, keep these test cases aligned with the Java test cases in
+  # DatastoreV1Test.java
+  def test_no_data(self):
+    self.assertEquals(_Mutate._WRITE_BATCH_INITIAL_SIZE,
+                      self._batcher.get_batch_size(0))
+
+  def test_fast_queries(self):
+    self._batcher.report_latency(0, 1000, 200)
+    self._batcher.report_latency(0, 1000, 200)
+    self.assertEquals(_Mutate._WRITE_BATCH_MAX_SIZE,
+                      self._batcher.get_batch_size(0))
+
+  def test_slow_queries(self):
+    self._batcher.report_latency(0, 10000, 200)
+    self._batcher.report_latency(0, 10000, 200)
+    self.assertEquals(100, self._batcher.get_batch_size(0))
+
+  def test_size_not_below_minimum(self):
+    self._batcher.report_latency(0, 30000, 50)
+    self._batcher.report_latency(0, 30000, 50)
+    self.assertEquals(_Mutate._WRITE_BATCH_MIN_SIZE,
+                      self._batcher.get_batch_size(0))
+
+  def test_sliding_window(self):
+    self._batcher.report_latency(0, 30000, 50)
+    self._batcher.report_latency(50000, 5000, 200)
+    self._batcher.report_latency(100000, 5000, 200)
+    self.assertEquals(200, self._batcher.get_batch_size(150000))
+
+
 if __name__ == '__main__':
   unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/0a5157e7/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index 996dace..da14cc4 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -23,6 +23,7 @@ For internal use only; no backwards-compatibility guarantees.
 import errno
 from socket import error as SocketError
 import sys
+import time
 
 # Protect against environments where datastore library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
@@ -166,13 +167,25 @@ def is_key_valid(key):
   return key.path[-1].HasField('id') or key.path[-1].HasField('name')
 
 
-def write_mutations(datastore, project, mutations):
+def write_mutations(datastore, project, mutations, rpc_stats_callback=None):
   """A helper function to write a batch of mutations to Cloud Datastore.
 
   If a commit fails, it will be retried upto 5 times. All mutations in the
   batch will be committed again, even if the commit was partially successful.
   If the retry limit is exceeded, the last exception from Cloud Datastore will
   be raised.
+
+  Args:
+    datastore: googledatastore.connection.Datastore
+    project: str, project id
+    mutations: list of google.cloud.proto.datastore.v1.datastore_pb2.Mutation
+    rpc_stats_callback: a function to call with arguments `successes` and
+        `failures`; this is called to record successful and failed RPCs to
+        Datastore.
+
+  Returns a tuple of:
+    CommitResponse, the response from Datastore;
+    int, the latency of the successful RPC in milliseconds.
   """
   commit_request = datastore_pb2.CommitRequest()
   commit_request.mode = datastore_pb2.CommitRequest.NON_TRANSACTIONAL
@@ -182,10 +195,22 @@ def write_mutations(datastore, project, mutations):
 
   @retry.with_exponential_backoff(num_retries=5,
                                   retry_filter=retry_on_rpc_error)
-  def commit(req):
-    datastore.commit(req)
-
-  commit(commit_request)
+  def commit(request):
+    try:
+      start_time = time.time()
+      response = datastore.commit(request)
+      end_time = time.time()
+      rpc_stats_callback(successes=1)
+
+      commit_time_ms = int((end_time-start_time)*1000)
+      return response, commit_time_ms
+    except (RPCError, SocketError):
+      if rpc_stats_callback:
+        rpc_stats_callback(errors=1)
+      raise
+
+  response, commit_time_ms = commit(commit_request)
+  return response, commit_time_ms
 
 
 def make_latest_timestamp_query(namespace):

http://git-wip-us.apache.org/repos/asf/beam/blob/0a5157e7/sdks/python/apache_beam/io/gcp/datastore/v1/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/util.py b/sdks/python/apache_beam/io/gcp/datastore/v1/util.py
new file mode 100644
index 0000000..5670a24
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/util.py
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Utility functions & classes that are _not_ specific to the datastore client.
+#
+# For internal use only; no backwards-compatibility guarantees.
+
+import math
+
+
+class MovingSum(object):
+  """Class that keeps track of a rolling window sum.
+
+  For use in tracking recent performance of the connector.
+
+  Intended to be similar to
+  org.apache.beam.sdk.util.MovingFunction(..., Sum.ofLongs()), but for
+  convenience we expose the count of entries as well so this doubles as a
+  moving average tracker.
+  """
+
+  def __init__(self, window_ms, bucket_ms):
+    if window_ms <= bucket_ms or bucket_ms <= 0:
+      raise ValueError("window_ms > bucket_ms > 0 please")
+    self._num_buckets = int(math.ceil(window_ms / bucket_ms))
+    self._bucket_ms = bucket_ms
+    self._Reset(now=0)  # initialize the moving window members
+
+  def _Reset(self, now):
+    self._current_index = 0  # pointer into self._buckets
+    self._current_ms_since_epoch = math.floor(
+        now / self._bucket_ms) * self._bucket_ms
+
+    # _buckets is a list where each element is a list [sum, num_samples]
+    # This is a circular buffer where
+    # [_current_index] represents the time range
+    #     [_current_ms_since_epoch, _current_ms_since_epoch+_bucket_ms)
+    # [_current_index-1] represents immediatly prior time range
+    #     [_current_ms_since_epoch-_bucket_ms, _current_ms_since_epoch)
+    # etc, wrapping around from the start to the end of the array, so
+    # [_current_index+1] is the element representing the oldest bucket.
+    self._buckets = [[0, 0] for _ in range(0, self._num_buckets)]
+
+  def _Flush(self, now):
+    """
+
+    Args:
+      now: int, milliseconds since epoch
+    """
+    if now >= (self._current_ms_since_epoch
+               + self._bucket_ms * self._num_buckets):
+      # Time moved forward so far that all currently held data is outside of
+      # the window.  It is faster to simply reset our data.
+      self._Reset(now)
+      return
+
+    while now > self._current_ms_since_epoch + self._bucket_ms:
+      # Advance time by one _bucket_ms, setting the new bucket's counts to 0.
+      self._current_ms_since_epoch += self._bucket_ms
+      self._current_index = (self._current_index+1) % self._num_buckets
+      self._buckets[self._current_index] = [0, 0]
+      # Intentional dead reckoning here; we don't care about staying precisely
+      # aligned with multiples of _bucket_ms since the epoch, we just need our
+      # buckets to represent the most recent _window_ms time window.
+
+  def sum(self, now):
+    self._Flush(now)
+    return sum(bucket[0] for bucket in self._buckets)
+
+  def add(self, now, inc):
+    self._Flush(now)
+    bucket = self._buckets[self._current_index]
+    bucket[0] += inc
+    bucket[1] += 1
+
+  def count(self, now):
+    self._Flush(now)
+    return sum(bucket[1] for bucket in self._buckets)
+
+  def has_data(self, now):
+    return self.count(now) > 0

http://git-wip-us.apache.org/repos/asf/beam/blob/0a5157e7/sdks/python/apache_beam/io/gcp/datastore/v1/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/util_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/util_test.py
new file mode 100644
index 0000000..8f17c21
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/util_test.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for util.py."""
+import unittest
+
+from apache_beam.io.gcp.datastore.v1 import util
+
+
+class MovingSumTest(unittest.TestCase):
+
+  TIMESTAMP = 1500000000
+
+  def test_bad_bucket_size(self):
+    with self.assertRaises(ValueError):
+      _ = util.MovingSum(1, 0)
+
+  def test_bad_window_size(self):
+    with self.assertRaises(ValueError):
+      _ = util.MovingSum(1, 2)
+
+  def test_no_data(self):
+    ms = util.MovingSum(10, 1)
+    self.assertEqual(0, ms.sum(MovingSumTest.TIMESTAMP))
+    self.assertEqual(0, ms.count(MovingSumTest.TIMESTAMP))
+    self.assertFalse(ms.has_data(MovingSumTest.TIMESTAMP))
+
+  def test_one_data_point(self):
+    ms = util.MovingSum(10, 1)
+    ms.add(MovingSumTest.TIMESTAMP, 5)
+    self.assertEqual(5, ms.sum(MovingSumTest.TIMESTAMP))
+    self.assertEqual(1, ms.count(MovingSumTest.TIMESTAMP))
+    self.assertTrue(ms.has_data(MovingSumTest.TIMESTAMP))
+
+  def test_aggregates_within_window(self):
+    ms = util.MovingSum(10, 1)
+    ms.add(MovingSumTest.TIMESTAMP, 5)
+    ms.add(MovingSumTest.TIMESTAMP+1, 3)
+    ms.add(MovingSumTest.TIMESTAMP+2, 7)
+    self.assertEqual(15, ms.sum(MovingSumTest.TIMESTAMP+3))
+    self.assertEqual(3, ms.count(MovingSumTest.TIMESTAMP+3))
+
+  def test_data_expires_from_moving_window(self):
+    ms = util.MovingSum(5, 1)
+    ms.add(MovingSumTest.TIMESTAMP, 5)
+    ms.add(MovingSumTest.TIMESTAMP+3, 3)
+    ms.add(MovingSumTest.TIMESTAMP+6, 7)
+    self.assertEqual(10, ms.sum(MovingSumTest.TIMESTAMP+7))
+    self.assertEqual(2, ms.count(MovingSumTest.TIMESTAMP+7))
+
+
+if __name__ == '__main__':
+  unittest.main()


Mime
View raw message