pinot-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehapa...@apache.org
Subject [incubator-pinot] 08/08: An attempt at server-side changes
Date Sat, 02 Jan 2021 23:52:25 GMT
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 0fa424eaaeae3049e47d7f494cf86c7f88b84a05
Author: Neha Pawar <neha.pawar18@gmail.com>
AuthorDate: Thu Dec 31 17:19:24 2020 -0800

    An attempt at server-side changes
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 22 +++++++++++++---------
 .../org/apache/pinot/spi/stream/FetchResult.java   |  5 +----
 .../pinot/spi/stream/PartitionGroupConsumer.java   |  2 +-
 3 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0cd1fba..dabe748 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -71,8 +71,10 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.FetchResult;
 import org.apache.pinot.spi.stream.MessageBatch;
-import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.PermanentConsumerException;
 import org.apache.pinot.spi.stream.RowMetadata;
@@ -249,10 +251,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager
{
   private Thread _consumerThread;
   private final String _streamTopic;
   private final int _partitionGroupId;
+  private final PartitionGroupMetadata _partitionGroupMetadata;
   final String _clientId;
   private final LLCSegmentName _llcSegmentName;
   private final RecordTransformer _recordTransformer;
-  private PartitionLevelConsumer _partitionLevelConsumer = null;
+  private PartitionGroupConsumer _partitionGroupConsumer = null;
   private StreamMetadataProvider _streamMetadataProvider = null;
   private final File _resourceTmpDir;
   private final String _tableNameWithType;
@@ -380,12 +383,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager
{
       // Update _currentOffset upon return from this method
       MessageBatch messageBatch;
       try {
-        messageBatch = _partitionLevelConsumer
+        FetchResult fetchResult = _partitionGroupConsumer
             .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
+        messageBatch = fetchResult.getMessages();
         consecutiveErrorCount = 0;
-      } catch (TimeoutException e) {
-        handleTransientStreamErrors(e);
-        continue;
       } catch (TransientConsumerException e) {
         handleTransientStreamErrors(e);
         continue;
@@ -898,7 +899,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager
{
 
   private void closePartitionLevelConsumer() {
     try {
-      _partitionLevelConsumer.close();
+      _partitionGroupConsumer.close();
     } catch (Exception e) {
       segmentLogger.warn("Could not close stream consumer", e);
     }
@@ -1130,6 +1131,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager
{
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _llcSegmentName = llcSegmentName;
     _partitionGroupId = _llcSegmentName.getPartitionGroupId();
+    _partitionGroupMetadata = new PartitionGroupMetadata(_partitionGroupId, _llcSegmentName.getSequenceNumber(),
+        _segmentZKMetadata.getStartOffset(), _segmentZKMetadata.getEndOffset(),
+        _segmentZKMetadata.getStatus().toString());
     _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
     _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId;
@@ -1305,11 +1309,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager
{
    * @param reason
    */
   private void makeStreamConsumer(String reason) {
-    if (_partitionLevelConsumer != null) {
+    if (_partitionGroupConsumer != null) {
       closePartitionLevelConsumer();
     }
     segmentLogger.info("Creating new stream consumer, reason: {}", reason);
-    _partitionLevelConsumer = _streamConsumerFactory.createPartitionLevelConsumer(_clientId,
_partitionGroupId);
+    _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_partitionGroupMetadata);
   }
 
   /**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
index b0ed6e5..7e8a911 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/FetchResult.java
@@ -18,10 +18,7 @@
  */
 package org.apache.pinot.spi.stream;
 
-import java.util.List;
-
-
 public interface FetchResult<T> {
   Checkpoint getLastCheckpoint();
-  List<T> getMessages();
+  MessageBatch<T> getMessages();
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index e096e67..bbbdaad 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -22,5 +22,5 @@ import java.io.Closeable;
 
 
 public interface PartitionGroupConsumer extends Closeable {
-  FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
+  FetchResult fetchMessages(Checkpoint start, Checkpoint end, long timeout);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Mime
View raw message