impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/3] incubator-impala git commit: IMPALA-4856: Port data stream service to KRPC
Date Thu, 09 Nov 2017 22:55:36 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master a772f8456 -> b4ea57a7e


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index a6e935a..cd8c936 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -21,7 +21,6 @@
 #include <memory>
 #include <boost/scoped_ptr.hpp>
 
-#include "gen-cpp/Results_types.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.h"
@@ -32,6 +31,9 @@
 #include "util/fixed-size-hash-table.h"
 #include "util/scope-exit-trigger.h"
 
+#include "gen-cpp/Results_types.h"
+#include "gen-cpp/row_batch.pb.h"
+
 #include "common/names.h"
 
 namespace impala {
@@ -76,21 +78,56 @@ RowBatch::RowBatch(
     tuple_data_pool_(mem_tracker),
     row_desc_(row_desc),
     mem_tracker_(mem_tracker) {
-  DCHECK(mem_tracker_ != NULL);
-  tuple_ptrs_size_ = num_rows_ * input_batch.row_tuples.size() * sizeof(Tuple*);
-  DCHECK_EQ(input_batch.row_tuples.size(), row_desc->tuple_descriptors().size());
-  DCHECK_GT(tuple_ptrs_size_, 0);
+  DCHECK(mem_tracker_ != nullptr);
+  kudu::Slice tuple_data =
+      kudu::Slice(input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
+  kudu::Slice tuple_offsets = kudu::Slice(
+      reinterpret_cast<const char*>(input_batch.tuple_offsets.data()),
+      input_batch.tuple_offsets.size() * sizeof(int32_t));
+  const THdfsCompression::type& compression_type = input_batch.compression_type;
+  DCHECK(compression_type == THdfsCompression::NONE ||
+      compression_type == THdfsCompression::LZ4)
+      << "Unexpected compression type: " << input_batch.compression_type;
+  Deserialize(tuple_offsets, tuple_data, input_batch.uncompressed_size,
+      compression_type == THdfsCompression::LZ4);
+}
+
+RowBatch::RowBatch(const RowDescriptor* row_desc, const RowBatchHeaderPB& header,
+    const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data,
+    MemTracker* mem_tracker)
+  : num_rows_(header.num_rows()),
+    capacity_(header.num_rows()),
+    flush_(FlushMode::NO_FLUSH_RESOURCES),
+    needs_deep_copy_(false),
+    num_tuples_per_row_(header.num_tuples_per_row()),
+    attached_buffer_bytes_(0),
+    tuple_data_pool_(mem_tracker),
+    row_desc_(row_desc),
+    mem_tracker_(mem_tracker) {
+  DCHECK(mem_tracker_ != nullptr);
+  const CompressionType& compression_type = header.compression_type();
+  DCHECK(compression_type == CompressionType::NONE ||
+      compression_type == CompressionType::LZ4)
+      << "Unexpected compression type: " << compression_type;
+  Deserialize(tuple_offsets, tuple_data, header.uncompressed_size(),
+      compression_type == CompressionType::LZ4);
+}
+
+void RowBatch::Deserialize(const kudu::Slice& input_tuple_offsets,
+    const kudu::Slice& input_tuple_data, int64_t uncompressed_size, bool is_compressed)
{
   // TODO: switch to Init() pattern so we can check memory limit and return Status.
+  DCHECK_EQ(num_tuples_per_row_, row_desc_->tuple_descriptors().size());
+  tuple_ptrs_size_ = num_rows_ * num_tuples_per_row_ * sizeof(Tuple*);
+  DCHECK_GT(tuple_ptrs_size_, 0);
   mem_tracker_->Consume(tuple_ptrs_size_);
   tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_));
-  DCHECK(tuple_ptrs_ != NULL);
+  DCHECK(tuple_ptrs_ != nullptr);
+
   uint8_t* tuple_data;
-  if (input_batch.compression_type != THdfsCompression::NONE) {
-    DCHECK_EQ(THdfsCompression::LZ4, input_batch.compression_type)
-        << "Unexpected compression type: " << input_batch.compression_type;
+  if (is_compressed) {
     // Decompress tuple data into data pool
-    uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str();
-    size_t compressed_size = input_batch.tuple_data.size();
+    const uint8_t* compressed_data = input_tuple_data.data();
+    size_t compressed_size = input_tuple_data.size();
 
     Lz4Decompressor decompressor(nullptr, false);
     Status status = decompressor.Init();
@@ -98,7 +135,6 @@ RowBatch::RowBatch(
     auto compressor_cleanup =
         MakeScopeExitTrigger([&decompressor]() { decompressor.Close(); });
 
-    int64_t uncompressed_size = input_batch.uncompressed_size;
     DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed";
     tuple_data = tuple_data_pool_.Allocate(uncompressed_size);
     status = decompressor.ProcessBlock(
@@ -106,18 +142,21 @@ RowBatch::RowBatch(
     DCHECK(status.ok()) << "RowBatch decompression failed.";
   } else {
     // Tuple data uncompressed, copy directly into data pool
-    tuple_data = tuple_data_pool_.Allocate(input_batch.tuple_data.size());
-    memcpy(tuple_data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
+    tuple_data = tuple_data_pool_.Allocate(input_tuple_data.size());
+    memcpy(tuple_data, input_tuple_data.data(), input_tuple_data.size());
   }
 
   // Convert input_batch.tuple_offsets into pointers
-  int tuple_idx = 0;
-  for (vector<int32_t>::const_iterator offset = input_batch.tuple_offsets.begin();
-       offset != input_batch.tuple_offsets.end(); ++offset) {
-    if (*offset == -1) {
-      tuple_ptrs_[tuple_idx++] = NULL;
+  const int32_t* tuple_offsets =
+      reinterpret_cast<const int32_t*>(input_tuple_offsets.data());
+  DCHECK_EQ(input_tuple_offsets.size() % sizeof(int32_t), 0);
+  int num_tuples = input_tuple_offsets.size() / sizeof(int32_t);
+  for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
+    int32_t offset = tuple_offsets[tuple_idx];
+    if (offset == -1) {
+      tuple_ptrs_[tuple_idx] = nullptr;
     } else {
-      tuple_ptrs_[tuple_idx++] = reinterpret_cast<Tuple*>(tuple_data + *offset);
+      tuple_ptrs_[tuple_idx] = reinterpret_cast<Tuple*>(tuple_data + offset);
     }
   }
 
@@ -126,9 +165,9 @@ RowBatch::RowBatch(
 
   // For every unique tuple, convert string offsets contained in tuple data into
   // pointers. Tuples were serialized in the order we are deserializing them in,
-  // so the first occurrence of a tuple will always have a higher offset than any tuple
-  // we already converted.
-  Tuple* last_converted = NULL;
+  // so the first occurrence of a tuple will always have a higher offset than any
+  // tuple we already converted.
+  Tuple* last_converted = nullptr;
   for (int i = 0; i < num_rows_; ++i) {
     for (int j = 0; j < num_tuples_per_row_; ++j) {
       const TupleDescriptor* desc = row_desc_->tuple_descriptors()[j];
@@ -148,10 +187,10 @@ RowBatch::~RowBatch() {
     ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
         buffer_info.client, &buffer_info.buffer);
   }
-  DCHECK(tuple_ptrs_ != NULL);
+  DCHECK(tuple_ptrs_ != nullptr);
   free(tuple_ptrs_);
   mem_tracker_->Release(tuple_ptrs_size_);
-  tuple_ptrs_ = NULL;
+  tuple_ptrs_ = nullptr;
 }
 
 Status RowBatch::Serialize(TRowBatch* output_batch) {
@@ -162,11 +201,41 @@ Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup)
{
   // why does Thrift not generate a Clear() function?
   output_batch->row_tuples.clear();
   output_batch->tuple_offsets.clear();
-  output_batch->compression_type = THdfsCompression::NONE;
-
-  output_batch->num_rows = num_rows_;
+  int64_t uncompressed_size;
+  bool is_compressed;
+  RETURN_IF_ERROR(Serialize(full_dedup, &output_batch->tuple_offsets,
+      &output_batch->tuple_data, &uncompressed_size, &is_compressed));
+  // TODO: max_size() is much larger than the amount of memory we could feasibly
+  // allocate. Need better way to detect problem.
+  DCHECK_LE(uncompressed_size, output_batch->tuple_data.max_size());
+  output_batch->__set_num_rows(num_rows_);
+  output_batch->__set_uncompressed_size(uncompressed_size);
+  output_batch->__set_compression_type(
+      is_compressed ? THdfsCompression::LZ4 : THdfsCompression::NONE);
   row_desc_->ToThrift(&output_batch->row_tuples);
+  return Status::OK();
+}
 
+Status RowBatch::Serialize(OutboundRowBatch* output_batch) {
+  int64_t uncompressed_size;
+  bool is_compressed;
+  output_batch->tuple_offsets_.clear();
+  RETURN_IF_ERROR(Serialize(UseFullDedup(), &output_batch->tuple_offsets_,
+      &output_batch->tuple_data_, &uncompressed_size, &is_compressed));
+
+  // Initialize the RowBatchHeaderPB
+  RowBatchHeaderPB* header = &output_batch->header_;
+  header->Clear();
+  header->set_num_rows(num_rows_);
+  header->set_num_tuples_per_row(row_desc_->tuple_descriptors().size());
+  header->set_uncompressed_size(uncompressed_size);
+  header->set_compression_type(
+      is_compressed ? CompressionType::LZ4 : CompressionType::NONE);
+  return Status::OK();
+}
+
+Status RowBatch::Serialize(bool full_dedup, vector<int32_t>* tuple_offsets,
+    string* tuple_data, int64_t* uncompressed_size, bool* is_compressed) {
   // As part of the serialization process we deduplicate tuples to avoid serializing a
   // Tuple multiple times for the RowBatch. By default we only detect duplicate tuples
   // in adjacent rows only. If full deduplication is enabled, we will build a
@@ -179,11 +248,13 @@ Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup)
{
     RETURN_IF_ERROR(distinct_tuples.Init(num_rows_ * num_tuples_per_row_ * 2, 0));
     size = TotalByteSize(&distinct_tuples);
     distinct_tuples.Clear(); // Reuse allocated hash table.
-    SerializeInternal(size, &distinct_tuples, output_batch);
+    SerializeInternal(size, &distinct_tuples, tuple_offsets, tuple_data);
   } else {
-    size = TotalByteSize(NULL);
-    SerializeInternal(size, NULL, output_batch);
+    size = TotalByteSize(nullptr);
+    SerializeInternal(size, nullptr, tuple_offsets, tuple_data);
   }
+  *uncompressed_size = size;
+  *is_compressed = false;
 
   if (size > 0) {
     // Try compressing tuple_data to compression_scratch_, swap if compressed data is
@@ -197,15 +268,14 @@ Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup)
{
     if (compression_scratch_.size() < compressed_size) {
       compression_scratch_.resize(compressed_size);
     }
-    uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str();
+    uint8_t* input = (uint8_t*)tuple_data->c_str();
     uint8_t* compressed_output = (uint8_t*)compression_scratch_.c_str();
     RETURN_IF_ERROR(
         compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output));
-
     if (LIKELY(compressed_size < size)) {
       compression_scratch_.resize(compressed_size);
-      output_batch->tuple_data.swap(compression_scratch_);
-      output_batch->compression_type = THdfsCompression::LZ4;
+      tuple_data->swap(compression_scratch_);
+      *is_compressed = true;
     }
     VLOG_ROW << "uncompressed size: " << size << ", compressed size: "
<< compressed_size;
   }
@@ -227,54 +297,52 @@ bool RowBatch::UseFullDedup() {
 }
 
 void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples,
-    TRowBatch* output_batch) {
-  DCHECK(distinct_tuples == NULL || distinct_tuples->size() == 0);
+    vector<int32_t>* tuple_offsets, string* tuple_data_str) {
+  DCHECK(distinct_tuples == nullptr || distinct_tuples->size() == 0);
   // TODO: max_size() is much larger than the amount of memory we could feasibly
   // allocate. Need better way to detect problem.
-  DCHECK_LE(size, output_batch->tuple_data.max_size());
+  DCHECK_LE(size, tuple_data_str->max_size());
 
   // TODO: track memory usage
   // TODO: detect if serialized size is too large to allocate and return proper error.
-  output_batch->tuple_data.resize(size);
-  output_batch->uncompressed_size = size;
-  output_batch->tuple_offsets.reserve(num_rows_ * num_tuples_per_row_);
+  tuple_data_str->resize(size);
+  tuple_offsets->reserve(num_rows_ * num_tuples_per_row_);
 
   // Copy tuple data of unique tuples, including strings, into output_batch (converting
   // string pointers into offsets in the process).
   int offset = 0; // current offset into output_batch->tuple_data
-  char* tuple_data = const_cast<char*>(output_batch->tuple_data.c_str());
+  char* tuple_data = const_cast<char*>(tuple_data_str->c_str());
 
   for (int i = 0; i < num_rows_; ++i) {
     vector<TupleDescriptor*>::const_iterator desc =
         row_desc_->tuple_descriptors().begin();
     for (int j = 0; desc != row_desc_->tuple_descriptors().end(); ++desc, ++j) {
       Tuple* tuple = GetRow(i)->GetTuple(j);
-      if (UNLIKELY(tuple == NULL)) {
+      if (UNLIKELY(tuple == nullptr)) {
         // NULLs are encoded as -1
-        output_batch->tuple_offsets.push_back(-1);
+        tuple_offsets->push_back(-1);
         continue;
       } else if (LIKELY(i > 0) && UNLIKELY(GetRow(i - 1)->GetTuple(j) == tuple))
{
         // Fast tuple deduplication for adjacent rows.
-        int prev_row_idx = output_batch->tuple_offsets.size() - num_tuples_per_row_;
-        output_batch->tuple_offsets.push_back(
-            output_batch->tuple_offsets[prev_row_idx]);
+        int prev_row_idx = tuple_offsets->size() - num_tuples_per_row_;
+        tuple_offsets->push_back((*tuple_offsets)[prev_row_idx]);
         continue;
-      } else if (UNLIKELY(distinct_tuples != NULL)) {
+      } else if (UNLIKELY(distinct_tuples != nullptr)) {
         if ((*desc)->byte_size() == 0) {
-          // Zero-length tuples can be represented as NULL.
-          output_batch->tuple_offsets.push_back(-1);
+          // Zero-length tuples can be represented as nullptr.
+          tuple_offsets->push_back(-1);
           continue;
         }
         int* dedupd_offset = distinct_tuples->FindOrInsert(tuple, offset);
         if (*dedupd_offset != offset) {
           // Repeat of tuple
           DCHECK_GE(*dedupd_offset, 0);
-          output_batch->tuple_offsets.push_back(*dedupd_offset);
+          tuple_offsets->push_back(*dedupd_offset);
           continue;
         }
       }
       // Record offset before creating copy (which increments offset and tuple_data)
-      output_batch->tuple_offsets.push_back(offset);
+      tuple_offsets->push_back(offset);
       tuple->DeepCopy(**desc, &tuple_data, &offset, /* convert_ptrs */ true);
       DCHECK_LE(offset, size);
     }
@@ -323,10 +391,7 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
 }
 
 int64_t RowBatch::GetDeserializedSize(const TRowBatch& batch) {
-  int64_t result = batch.uncompressed_size;
-  result += batch.row_tuples.size() * sizeof(TTupleId);
-  result += batch.tuple_offsets.size() * sizeof(int32_t);
-  return result;
+  return batch.uncompressed_size + batch.tuple_offsets.size() * sizeof(Tuple*);
 }
 
 int64_t RowBatch::GetSerializedSize(const TRowBatch& batch) {
@@ -336,6 +401,21 @@ int64_t RowBatch::GetSerializedSize(const TRowBatch& batch) {
   return result;
 }
 
+int64_t RowBatch::GetDeserializedSize(const RowBatchHeaderPB& header,
+    const kudu::Slice& tuple_offsets) {
+  DCHECK_EQ(tuple_offsets.size() % sizeof(int32_t), 0);
+  return header.uncompressed_size() +
+      (tuple_offsets.size() / sizeof(int32_t)) * sizeof(Tuple*);
+}
+
+int64_t RowBatch::GetDeserializedSize(const OutboundRowBatch& batch) {
+  return batch.header_.uncompressed_size() + batch.tuple_offsets_.size() * sizeof(Tuple*);
+}
+
+int64_t RowBatch::GetSerializedSize(const OutboundRowBatch& batch) {
+  return batch.tuple_data_.size() + batch.tuple_offsets_.size() * sizeof(int32_t);
+}
+
 void RowBatch::AcquireState(RowBatch* src) {
   DCHECK(row_desc_->LayoutEquals(*src->row_desc_));
   DCHECK_EQ(num_tuples_per_row_, src->num_tuples_per_row_);
@@ -370,7 +450,7 @@ void RowBatch::DeepCopyTo(RowBatch* dst) {
 
 // TODO: consider computing size of batches as they are built up
 int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) {
-  DCHECK(distinct_tuples == NULL || distinct_tuples->size() == 0);
+  DCHECK(distinct_tuples == nullptr || distinct_tuples->size() == 0);
   int64_t result = 0;
   vector<int> tuple_count(row_desc_->tuple_descriptors().size(), 0);
 
@@ -378,12 +458,12 @@ int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) {
   for (int i = 0; i < num_rows_; ++i) {
     for (int j = 0; j < num_tuples_per_row_; ++j) {
       Tuple* tuple = GetRow(i)->GetTuple(j);
-      if (UNLIKELY(tuple == NULL)) continue;
+      if (UNLIKELY(tuple == nullptr)) continue;
       // Only count the data of unique tuples.
       if (LIKELY(i > 0) && UNLIKELY(GetRow(i - 1)->GetTuple(j) == tuple)) {
         // Fast tuple deduplication for adjacent rows.
         continue;
-      } else if (UNLIKELY(distinct_tuples != NULL)) {
+      } else if (UNLIKELY(distinct_tuples != nullptr)) {
         if (row_desc_->tuple_descriptors()[j]->byte_size() == 0) continue;
         bool inserted = distinct_tuples->InsertIfNotPresent(tuple, -1);
         if (!inserted) continue;
@@ -413,7 +493,7 @@ Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state, MemPool*
pool
   }
   *buffer_size = static_cast<int64_t>(row_size) * *capacity;
   *buffer = pool->TryAllocate(*buffer_size);
-  if (*buffer == NULL) {
+  if (*buffer == nullptr) {
     return pool->mem_tracker()->MemLimitExceeded(
         state, "Failed to allocate tuple buffer", *buffer_size);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 5a7edd4..d246024 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -25,11 +25,17 @@
 #include "codegen/impala-ir.h"
 #include "common/compiler-util.h"
 #include "common/logging.h"
+#include "gen-cpp/row_batch.pb.h"
+#include "kudu/util/slice.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/descriptors.h"
 #include "runtime/disk-io-mgr.h"
 #include "runtime/mem-pool.h"
 
+namespace kudu {
+class Slice;
+} // namespace kudu
+
 namespace impala {
 
 template <typename K, typename V> class FixedSizeHashTable;
@@ -41,6 +47,47 @@ class Tuple;
 class TupleRow;
 class TupleDescriptor;
 
+/// A KRPC outbound row batch which contains the serialized row batch header and buffers
+/// for holding the tuple offsets and tuple data.
+class OutboundRowBatch {
+ public:
+  const RowBatchHeaderPB* header() const { return &header_; }
+
+  /// Returns the serialized tuple offsets' vector as a kudu::Slice.
+  /// The tuple offsets vector is sent as KRPC sidecar.
+  kudu::Slice TupleOffsetsAsSlice() const {
+    return kudu::Slice((uint8_t*)tuple_offsets_.data(),
+        tuple_offsets_.size() * sizeof(tuple_offsets_[0]));
+  }
+
+  /// Returns the serialized tuple data's buffer as a kudu::Slice.
+  /// The tuple data is sent as KRPC sidecar.
+  kudu::Slice TupleDataAsSlice() const {
+    return kudu::Slice((uint8_t*)tuple_data_.data(), tuple_data_.length());
+  }
+
+  /// Returns true if the header has been intialized and ready to be sent.
+  /// This entails setting some fields initialized in RowBatch::Serialize().
+  bool IsInitialized() const {
+     return header_.has_num_rows() && header_.has_uncompressed_size() &&
+         header_.has_compression_type();
+  }
+
+ private:
+  friend class RowBatch;
+
+  /// The serialized header which contains the meta-data of the row batch such as the
+  /// number of rows and compression scheme used etc.
+  RowBatchHeaderPB header_;
+
+  /// Contains offsets into 'tuple_data_' of all tuples in a row batch. -1 refers to
+  /// a NULL tuple.
+  vector<int32_t> tuple_offsets_;
+
+  /// Contains the actual data of all the tuples. The data could be compressed.
+  string tuple_data_;
+};
+
 /// A RowBatch encapsulates a batch of rows, each composed of a number of tuples.
 /// The maximum number of rows is fixed at the time of construction.
 /// The row batch can reference various types of memory.
@@ -55,10 +102,10 @@ class TupleDescriptor;
 ///      used.
 ///     TODO: IMPALA-4179: simplify the ownership transfer model.
 ///
-/// In order to minimize memory allocations, RowBatches and TRowBatches that have been
-/// serialized and sent over the wire should be reused (this prevents compression_scratch_
-/// from being needlessly reallocated).
-//
+/// In order to minimize memory allocations, RowBatches and TRowBatches or
+/// OutboundRowBatch that have been serialized and sent over the wire should be reused
+/// (this prevents compression_scratch_ from being needlessly reallocated).
+///
 /// Row batches and memory usage: We attempt to stream row batches through the plan
 /// tree without copying the data. This means that row batches are often not-compact
 /// and reference memory outside of the row batch. This results in most row batches
@@ -88,13 +135,21 @@ class RowBatch {
   /// tracker cannot be NULL.
   RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* tracker);
 
-  /// Populate a row batch from input_batch by copying input_batch's
-  /// tuple_data into the row batch's mempool and converting all offsets
-  /// in the data back into pointers.
+  /// Populate a row batch from a serialized thrift input_batch by copying
+  /// input_batch's tuple_data into the row batch's mempool and converting all
+  /// offsets in the data back into pointers.
   /// TODO: figure out how to transfer the data from input_batch to this RowBatch
   /// (so that we don't need to make yet another copy)
-  RowBatch(
-      const RowDescriptor* row_desc, const TRowBatch& input_batch, MemTracker* tracker);
+  RowBatch(const RowDescriptor* row_desc, const TRowBatch& input_batch,
+      MemTracker* tracker);
+
+  /// Populate a row batch from the serialized row batch header, decompress / copy
+  /// the tuple's data into a buffer and convert all offsets in 'tuple_offsets' back
+  /// into pointers into the tuple data's buffer. The tuple data's buffer is allocated
+  /// from the row batch's MemPool tracked by 'mem_tracker'.
+  RowBatch(const RowDescriptor* row_desc, const RowBatchHeaderPB& header,
+      const kudu::Slice& input_tuple_data, const kudu::Slice& input_tuple_offsets,
+      MemTracker* mem_tracker);
 
   /// Releases all resources accumulated at this row batch.  This includes
   ///  - tuple_ptrs
@@ -288,12 +343,13 @@ class RowBatch {
   void DeepCopyTo(RowBatch* dst);
 
   /// Create a serialized version of this row batch in output_batch, attaching all of the
-  /// data it references to output_batch.tuple_data. This function attempts to
-  /// detect duplicate tuples in the row batch to reduce the serialized size.
-  /// output_batch.tuple_data will be snappy-compressed unless the compressed data is
-  /// larger than the uncompressed data. Use output_batch.is_compressed to determine
+  /// data it references to output_batch.tuple_data. This function attempts to detect
+  /// duplicate tuples in the row batch to reduce the serialized size.
+  /// output_batch.tuple_data will be LZ4-compressed unless the compressed data is larger
+  /// larger than the uncompressed data. Use output_batch.compression_type to determine
   /// whether tuple_data is compressed. If an in-flight row is present in this row batch,
   /// it is ignored. This function does not Reset().
+  Status Serialize(OutboundRowBatch* output_batch);
   Status Serialize(TRowBatch* output_batch);
 
   /// Utility function: returns total byte size of a batch in either serialized or
@@ -301,6 +357,10 @@ class RowBatch {
   /// less than the deserialized size.
   static int64_t GetSerializedSize(const TRowBatch& batch);
   static int64_t GetDeserializedSize(const TRowBatch& batch);
+  static int64_t GetSerializedSize(const OutboundRowBatch& batch);
+  static int64_t GetDeserializedSize(const OutboundRowBatch& batch);
+  static int64_t GetDeserializedSize(const RowBatchHeaderPB& header,
+      const kudu::Slice& tuple_offsets);
 
   int ALWAYS_INLINE num_rows() const { return num_rows_; }
   int ALWAYS_INLINE capacity() const { return capacity_; }
@@ -353,6 +413,38 @@ class RowBatch {
   /// Overload for testing that allows the test to force the deduplication level.
   Status Serialize(TRowBatch* output_batch, bool full_dedup);
 
+  /// Shared implementation between thrift and protobuf to serialize this row batch.
+  ///
+  /// 'full_dedup': true if full deduplication is used.
+  /// 'tuple_offsets': Updated to contain offsets of all tuples into 'tuple_data' upon
+  ///                  return. There are a total of num_rows * num_tuples_per_row offsets.
+  ///                  An offset of -1 records a NULL.
+  /// 'tuple_data': Updated to hold the serialized tuples' data. If 'is_compressed'
+  ///               is true, this is LZ4 compressed.
+  /// 'uncompressed_size': Updated with the uncompressed size of 'tuple_data'.
+  /// 'is_compressed': true if compression is applied on 'tuple_data'.
+  ///
+  /// Returns error status if serialization failed. Returns OK otherwise.
+  /// TODO: clean this up once the thrift RPC implementation is removed.
+  Status Serialize(bool full_dedup, vector<int32_t>* tuple_offsets, string* tuple_data,
+      int64_t* uncompressed_size, bool* is_compressed);
+
+  /// Shared implementation between thrift and protobuf to deserialize a row batch.
+  ///
+  /// 'input_tuple_offsets': an int32_t array of tuples; offsets into 'input_tuple_data'.
+  /// Used for populating the tuples in the row batch with actual pointers.
+  ///
+  /// 'input_tuple_data': contains pointer and size of tuples' data buffer.
+  /// If 'is_compressed' is true, the data is compressed.
+  ///
+  /// 'uncompressed_size': the uncompressed size of 'input_tuple_data' if it's compressed.
+  ///
+  /// 'is_compressed': True if 'input_tuple_data' is compressed.
+  ///
+  /// TODO: clean this up once the thrift RPC implementation is removed.
+  void Deserialize(const kudu::Slice& input_tuple_offsets,
+      const kudu::Slice& input_tuple_data, int64_t uncompressed_size, bool is_compressed);
+
   typedef FixedSizeHashTable<Tuple*, int> DedupMap;
 
   /// The total size of all data represented in this row batch (tuples and referenced
@@ -363,7 +455,7 @@ class RowBatch {
   int64_t TotalByteSize(DedupMap* distinct_tuples);
 
   void SerializeInternal(int64_t size, DedupMap* distinct_tuples,
-      TRowBatch* output_batch);
+      vector<int32_t>* tuple_offsets, string* tuple_data);
 
   /// All members below need to be handled in RowBatch::AcquireState()
 
@@ -419,10 +511,11 @@ class RowBatch {
   std::vector<BufferInfo> buffers_;
 
   /// String to write compressed tuple data to in Serialize().
-  /// This is a string so we can swap() with the string in the TRowBatch we're serializing
-  /// to (we don't compress directly into the TRowBatch in case the compressed data is
-  /// longer than the uncompressed data). Swapping avoids copying data to the TRowBatch
-  /// and avoids excess memory allocations: since we reuse RowBatchs and TRowBatchs, and
+  /// This is a string so we can swap() with the string in the serialized row batch
+  /// (i.e. TRowBatch or OutboundRowBatch) we're serializing to (we don't compress
+  /// directly into the serialized row batch in case the compressed data is longer than
+  /// the uncompressed data). Swapping avoids copying data to the serialized row batch
+  /// and avoids excess memory allocations: since we reuse the serialized row batches, and
   /// assuming all row batches are roughly the same size, all strings will eventually be
   /// allocated to the right size.
   std::string compression_scratch_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/service/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index ab51740..c78116c 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -21,20 +21,25 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/service")
 # where to put generated binaries.
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/service")
 
+# Mark the protobuf file as generated
+set_source_files_properties(${DATA_STREAM_SVC_PROTO_SRCS} PROPERTIES GENERATED TRUE)
+
 add_library(Service
+  child-query.cc
+  client-request-state.cc
+  ${DATA_STREAM_SVC_PROTO_SRCS}
+  data-stream-service.cc
   frontend.cc
   fe-support.cc
   hs2-util.cc
-  impala-server.cc
-  impala-http-handler.cc
-  impala-hs2-server.cc
   impala-beeswax-server.cc
+  impala-hs2-server.cc
+  impala-http-handler.cc
   impala-internal-service.cc
-  client-request-state.cc
+  impalad-main.cc
+  impala-server.cc
   query-options.cc
   query-result-set.cc
-  child-query.cc
-  impalad-main.cc
 )
 add_dependencies(Service gen-deps)
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
new file mode 100644
index 0000000..dcf0c1f
--- /dev/null
+++ b/be/src/service/data-stream-service.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/data-stream-service.h"
+
+#include "common/status.h"
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "rpc/rpc-mgr.h"
+#include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/exec-env.h"
+#include "runtime/row-batch.h"
+#include "testutil/fault-injection-util.h"
+
+#include "gen-cpp/data_stream_service.pb.h"
+
+#include "common/names.h"
+
+using kudu::rpc::RpcContext;
+
+namespace impala {
+
+DataStreamService::DataStreamService(RpcMgr* mgr)
+  : DataStreamServiceIf(mgr->metric_entity(), mgr->result_tracker()) {}
+
+void DataStreamService::EndDataStream(const EndDataStreamRequestPB* request,
+    EndDataStreamResponsePB* response, RpcContext* rpc_context) {
+  // CloseSender() is guaranteed to eventually respond to this RPC so we don't do it here.
+  ExecEnv::GetInstance()->KrpcStreamMgr()->CloseSender(request, response, rpc_context);
+}
+
+void DataStreamService::TransmitData(const TransmitDataRequestPB* request,
+    TransmitDataResponsePB* response, RpcContext* rpc_context) {
+  FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
+  // AddData() is guaranteed to eventually respond to this RPC so we don't do it here.
+  ExecEnv::GetInstance()->KrpcStreamMgr()->AddData(request, response, rpc_context);
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
new file mode 100644
index 0000000..7f3c6e4
--- /dev/null
+++ b/be/src/service/data-stream-service.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_SERVICE_DATA_STREAM_SERVICE_H
+#define IMPALA_SERVICE_DATA_STREAM_SERVICE_H
+
+#include "gen-cpp/data_stream_service.service.h"
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
+namespace impala {
+
+class RpcMgr;
+
+/// This is singleton class which provides data transmission services between fragment
+/// instances. The client for this service is implemented in KrpcDataStreamSender.
+/// The processing of incoming requests is implemented in KrpcDataStreamRecvr.
+/// KrpcDataStreamMgr is responsible for routing the incoming requests to the
+/// appropriate receivers.
+class DataStreamService : public DataStreamServiceIf {
+ public:
+  DataStreamService(RpcMgr* rpc_mgr);
+
+  /// Notifies the receiver to close the data stream specified in 'request'.
+  /// The receiver replies to the client with a status serialized in 'response'.
+  virtual void EndDataStream(const EndDataStreamRequestPB* request,
+      EndDataStreamResponsePB* response, kudu::rpc::RpcContext* context);
+
+  /// Sends a row batch to the receiver specified in 'request'.
+  /// The receiver replies to the client with a status serialized in 'response'.
+  virtual void TransmitData(const TransmitDataRequestPB* request,
+      TransmitDataResponsePB* response, kudu::rpc::RpcContext* context);
+};
+
+} // namespace impala
+#endif // IMPALA_SERVICE_DATA_STREAM_SERVICE_H

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 07a83eb..4c540b7 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1195,7 +1195,7 @@ void ImpalaServer::TransmitData(
   }
 
   if (params.eos) {
-    exec_env_->stream_mgr()->CloseSender(
+    exec_env_->ThriftStreamMgr()->CloseSender(
         params.dest_fragment_instance_id, params.dest_node_id,
         params.sender_id).SetTStatus(&return_val);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/cmake_modules/FindProtobuf.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindProtobuf.cmake b/cmake_modules/FindProtobuf.cmake
index 4c2ab2f..f300141 100644
--- a/cmake_modules/FindProtobuf.cmake
+++ b/cmake_modules/FindProtobuf.cmake
@@ -180,7 +180,7 @@ function(PROTOBUF_GENERATE_CPP SRCS HDRS TGTS)
     # This custom target enforces that there's just one invocation of protoc
     # when there are multiple consumers of the generated files. The target name
     # must be unique; adding parts of the filename helps ensure this.
-    set(TGT_NAME ${REL_DIR}${FIL})
+    set(TGT_NAME PROTO_${REL_DIR}${FIL})
     string(REPLACE "/" "-" TGT_NAME ${TGT_NAME})
     add_custom_target(${TGT_NAME}
       DEPENDS "${PROTO_CC_OUT}" "${PROTO_H_OUT}")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/common/protobuf/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/protobuf/CMakeLists.txt b/common/protobuf/CMakeLists.txt
index 4d5f121..2faec7e 100644
--- a/common/protobuf/CMakeLists.txt
+++ b/common/protobuf/CMakeLists.txt
@@ -20,6 +20,30 @@ cmake_minimum_required(VERSION 2.6)
 
 set(PROTOBUF_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp/)
 
+PROTOBUF_GENERATE_CPP(
+  COMMON_PROTO_SRCS COMMON_PROTO_HDRS COMMON_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
+  BINARY_ROOT ${PROTOBUF_OUTPUT_DIR}
+  PROTO_FILES common.proto)
+add_custom_target(common_proto DEPENDS ${COMMON_PROTO_TGTS})
+set(COMMON_PROTO_SRCS ${COMMON_PROTO_SRCS} PARENT_SCOPE)
+
+PROTOBUF_GENERATE_CPP(
+  ROW_BATCH_PROTO_SRCS ROW_BATCH_PROTO_HDRS ROW_BATCH_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
+  BINARY_ROOT ${PROTOBUF_OUTPUT_DIR}
+  PROTO_FILES row_batch.proto)
+add_custom_target(row_batch_proto DEPENDS ${ROW_BATCH_PROTO_TGTS})
+set(ROW_BATCH_PROTO_SRCS ${ROW_BATCH_PROTO_SRCS} PARENT_SCOPE)
+
+KRPC_GENERATE(DATA_STREAM_SVC_PROTO_SRCS DATA_STREAM_SVC_PROTO_HDRS
+  DATA_STREAM_SVC_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
+  BINARY_ROOT ${PROTOBUF_OUTPUT_DIR}
+  PROTO_FILES data_stream_service.proto)
+add_custom_target(data_stream_svc_proto DEPENDS ${DATA_STREAM_SVC_PROTO_TGTS})
+set(DATA_STREAM_SVC_PROTO_SRCS ${DATA_STREAM_SVC_PROTO_SRCS} PARENT_SCOPE)
+
 KRPC_GENERATE(RPC_TEST_PROTO_SRCS RPC_TEST_PROTO_HDRS
   RPC_TEST_PROTO_TGTS
   SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
@@ -28,4 +52,4 @@ KRPC_GENERATE(RPC_TEST_PROTO_SRCS RPC_TEST_PROTO_HDRS
 add_custom_target(rpc_test_proto_tgt DEPENDS ${RPC_TEST_PROTO_TGTS})
 set(RPC_TEST_PROTO_SRCS ${RPC_TEST_PROTO_SRCS} PARENT_SCOPE)
 
-add_custom_target(proto-deps ALL DEPENDS token_proto rpc_header_proto)
\ No newline at end of file
+add_custom_target(proto-deps DEPENDS token_proto rpc_header_proto common_proto row_batch_proto
data_stream_svc_proto)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/common/protobuf/common.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto
new file mode 100644
index 0000000..17f9fc6
--- /dev/null
+++ b/common/protobuf/common.proto
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Common protobuf definitions.
+
+package impala;
+
+// Proto-serialized version of Impala's Status object.
+message StatusPB {
+  optional int32 status_code = 1;
+  repeated string error_msgs = 2;
+}
+
+// 128-bit ID (equivalent to TUniqueID).
+message UniqueIdPB {
+  optional int64 hi = 1;
+  optional int64 lo = 2;
+}
+
+// The compression codec. Currently used in row batch's header to
+// indicate the type of compression applied to the row batch.
+enum CompressionType {
+  NONE = 0; // No compression.
+  LZ4 = 1;
+};

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/common/protobuf/data_stream_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
new file mode 100644
index 0000000..3aa3f28
--- /dev/null
+++ b/common/protobuf/data_stream_service.proto
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package impala;
+
+import "common.proto";
+import "row_batch.proto";
+
+// All fields are required in V1.
+message TransmitDataRequestPB {
+  // The fragment instance id of the receiver.
+  optional UniqueIdPB dest_fragment_instance_id = 1;
+
+  // Sender instance id, unique within a fragment.
+  optional int32 sender_id = 2;
+
+  // PlanNodeId of the exchange node which owns the receiver.
+  optional int32 dest_node_id = 3;
+
+  // The header which contains the meta-data of the row batch.
+  optional RowBatchHeaderPB row_batch_header = 4;
+
+  // The sidecar index of tuple offsets' buffer which is an array of int32 containing the
+  // offsets of tuples into the buffer pointed to by tuple data's sidecar below. There are
+  // num_rows * num_tuples_per_row offsets in total. An offset of -1 records a NULL.
+  optional int32 tuple_offsets_sidecar_idx = 5;
+
+  // The sidecar index of the tuple's data which is a (compressed) row batch.
+  // The details  of the row batch (e.g. # of rows) is in 'row_batch_header' above.
+  optional int32 tuple_data_sidecar_idx = 6;
+}
+
+// All fields are required in V1.
+message TransmitDataResponsePB {
+  // Status::OK() on success; Error status on failure.
+  optional StatusPB status = 1;
+}
+
+// All fields are required in V1.
+message EndDataStreamRequestPB {
+  // The fragment instance id of the receiver.
+  optional UniqueIdPB dest_fragment_instance_id = 1;
+
+  // Sender instance id, unique within a fragment.
+  optional int32 sender_id = 2;
+
+  // PlanNodeId of the exchange node which owns the receiver.
+  optional int32 dest_node_id = 3;
+}
+
+// All fields are required in V1.
+message EndDataStreamResponsePB {
+  optional StatusPB status = 1;
+}
+
+// Handles data transmission between fragment instances.
+service DataStreamService {
+  // Called by sender to transmit a single row batch. Returns error indication
+  // if params.fragmentId or params.destNodeId are unknown or if data couldn't
+  // be read.
+  rpc TransmitData(TransmitDataRequestPB) returns (TransmitDataResponsePB);
+
+  // Called by a sender to close the channel between fragment instances.
+  rpc EndDataStream(EndDataStreamRequestPB) returns (EndDataStreamResponsePB);
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/common/protobuf/row_batch.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/row_batch.proto b/common/protobuf/row_batch.proto
new file mode 100644
index 0000000..5aef55b
--- /dev/null
+++ b/common/protobuf/row_batch.proto
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package impala;
+
+import "common.proto";
+
+// The serialized version of a header of a RowBatch (in be/src/runtime/row-batch.h).
+// It contains the meta-data of a row batch. The actual data of a row batch is sent
+// as KRPC sidecars. Please see TransmitDataRequestPB for details.
+// All fields are required in V1.
+message RowBatchHeaderPB {
+  // Total number of rows contained in this batch.
+  optional int32 num_rows = 1;
+
+  // Number of tuples per row in this batch.
+  optional int32 num_tuples_per_row = 2;
+
+  // Size of 'tuple_data' in bytes before any compression is applied.
+  optional int64 uncompressed_size = 3;
+
+  // The compression codec (if any) used for compressing the row batch.
+  optional CompressionType compression_type = 4;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4ea57a7/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index ad07963..dd7afc5 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -336,7 +336,8 @@ error_codes = (
 
   ("DISK_IO_ERROR", 110, "Disk I/O error: $0"),
 
-
+  ("DATASTREAM_RECVR_CLOSED", 111,
+   "DataStreamRecvr for fragment=$0, node=$1 is closed already"),
 )
 
 import sys



Mime
View raw message