hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [incubator-hudi] ezhux commented on a change in pull request #1109: [HUDI-238] - Migrating to Scala 2.12
Date Sun, 12 Jan 2020 19:17:56 GMT
ezhux commented on a change in pull request #1109: [HUDI-238] - Migrating to Scala 2.12
URL: https://github.com/apache/incubator-hudi/pull/1109#discussion_r365604390
 
 

 ##########
 File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 ##########
 @@ -175,93 +155,101 @@ public static long totalNewMessages(OffsetRange[] ranges) {
 
     private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
     private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
-    private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST;
+    private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.latest;
     public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
     public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
   }
 
-  private final HashMap<String, String> kafkaParams;
+  private final HashMap<String, Object> kafkaParams;
   private final TypedProperties props;
   protected final String topicName;
 
   public KafkaOffsetGen(TypedProperties props) {
     this.props = props;
-    kafkaParams = new HashMap<String, String>();
+    kafkaParams = new HashMap<String, Object>();
     for (Object prop : props.keySet()) {
       kafkaParams.put(prop.toString(), props.getString(prop.toString()));
     }
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
     topicName = props.getString(Config.KAFKA_TOPIC_NAME);
   }
 
+  public HashMap<String, Object> getKafkaProperties() {
+    final HashMap<String, Object> kafkaParams;
+    kafkaParams = new HashMap<String, Object>();
+    for (Object prop : props.keySet()) {
+      kafkaParams.put(prop.toString(), props.get(prop));
+    }
+    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, String.valueOf(new Random().nextInt(10000)));
+    kafkaParams.put("auto.offset.reset", "earliest");
 
 Review comment:
   changed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message