druid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] jon-wei closed pull request #6339: Tweak isInvalidRows behavior in HadoopTuningConfig
Date Mon, 24 Sep 2018 23:13:15 GMT
jon-wei closed pull request #6339: Tweak isInvalidRows behavior in HadoopTuningConfig
URL: https://github.com/apache/incubator-druid/pull/6339
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
index 6fb8a81bf12..0716e162fee 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
@@ -284,8 +284,7 @@ protected void setup(Context context)
     @Override
     protected void innerMap(
         InputRow inputRow,
-        Context context,
-        boolean reportParseExceptions
+        Context context
     ) throws IOException
     {
 
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
index ec1039dde39..27486129110 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
@@ -313,8 +313,7 @@ protected void setup(Context context)
     @Override
     protected void innerMap(
         InputRow inputRow,
-        Context context,
-        boolean reportParseExceptions
+        Context context
     ) throws IOException, InterruptedException
     {
       final List<Object> groupKey = Rows.toGroupKey(
@@ -395,8 +394,7 @@ protected void setup(Context context)
     @Override
     protected void innerMap(
         InputRow inputRow,
-        Context context,
-        boolean reportParseExceptions
+        Context context
     ) throws IOException, InterruptedException
     {
       final Map<String, Iterable<String>> dims = Maps.newHashMap();
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
index 2c41a3fc07e..b1b2f7fc894 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
@@ -308,11 +308,6 @@ public boolean isOverwriteFiles()
     return schema.getTuningConfig().isOverwriteFiles();
   }
 
-  public boolean isIgnoreInvalidRows()
-  {
-    return schema.getTuningConfig().isIgnoreInvalidRows();
-  }
-
   public void setShardSpecs(Map<Long, List<HadoopyShardSpec>> shardSpecs)
   {
     this.schema = schema.withTuningConfig(schema.getTuningConfig().withShardSpecs(shardSpecs));
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java
index 48f6b1da4e0..02ced6cf10c 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java
@@ -45,7 +45,6 @@
   protected HadoopDruidIndexerConfig config;
   private InputRowParser parser;
   protected GranularitySpec granularitySpec;
-  private boolean reportParseExceptions;
 
   @Override
   protected void setup(Context context)
@@ -54,7 +53,6 @@ protected void setup(Context context)
     config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
     parser = config.getParser();
     granularitySpec = config.getGranularitySpec();
-    reportParseExceptions = !config.isIgnoreInvalidRows();
   }
 
   public HadoopDruidIndexerConfig getConfig()
@@ -88,7 +86,7 @@ protected void map(Object key, Object value, Context context) throws IOException
           if (!granularitySpec.bucketIntervals().isPresent()
               || granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()))
                                 .isPresent()) {
-            innerMap(inputRow, context, reportParseExceptions);
+            innerMap(inputRow, context);
           } else {
             context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1);
           }
@@ -147,7 +145,7 @@ private void handleParseException(ParseException pe, Context context)
     }
   }
 
-  protected abstract void innerMap(InputRow inputRow, Context context, boolean reportParseExceptions)
+  protected abstract void innerMap(InputRow inputRow, Context context)
       throws IOException, InterruptedException;
 
 }
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
index 1f17fb89fe8..502066ce1de 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
@@ -85,7 +85,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
   private final boolean leaveIntermediate;
   private final Boolean cleanupOnFailure;
   private final boolean overwriteFiles;
-  private final boolean ignoreInvalidRows;
+  private final Boolean ignoreInvalidRows;
   private final Map<String, String> jobProperties;
   private final boolean combineText;
   private final boolean useCombiner;
@@ -108,7 +108,7 @@ public HadoopTuningConfig(
       final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
       final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
       final @JsonProperty("overwriteFiles") boolean overwriteFiles,
-      final @Deprecated @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
+      final @Deprecated @JsonProperty("ignoreInvalidRows") Boolean ignoreInvalidRows,
       final @JsonProperty("jobProperties") Map<String, String> jobProperties,
       final @JsonProperty("combineText") boolean combineText,
       final @JsonProperty("useCombiner") Boolean useCombiner,
@@ -138,7 +138,6 @@ public HadoopTuningConfig(
     this.leaveIntermediate = leaveIntermediate;
     this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
     this.overwriteFiles = overwriteFiles;
-    this.ignoreInvalidRows = ignoreInvalidRows;
     this.jobProperties = (jobProperties == null
                           ? ImmutableMap.of()
                           : ImmutableMap.copyOf(jobProperties));
@@ -152,10 +151,16 @@ public HadoopTuningConfig(
     this.useExplicitVersion = useExplicitVersion;
     this.allowedHadoopPrefix = allowedHadoopPrefix == null ? ImmutableList.of() : allowedHadoopPrefix;
 
-    if (!this.ignoreInvalidRows) {
-      this.maxParseExceptions = 0;
+
+    this.ignoreInvalidRows = ignoreInvalidRows;
+    if (maxParseExceptions != null) {
+      this.maxParseExceptions = maxParseExceptions;
     } else {
-      this.maxParseExceptions = maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS
: maxParseExceptions;
+      if (ignoreInvalidRows == null || !ignoreInvalidRows) {
+        this.maxParseExceptions = 0;
+      } else {
+        this.maxParseExceptions = TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS;
+      }
     }
     this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS
: logParseExceptions;
   }
@@ -221,7 +226,7 @@ public boolean isOverwriteFiles()
   }
 
   @JsonProperty
-  public boolean isIgnoreInvalidRows()
+  public Boolean isIgnoreInvalidRows()
   {
     return ignoreInvalidRows;
   }
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
index ac34c755ae9..6bf786be5a6 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
@@ -286,7 +286,7 @@ private static IncrementalIndex makeIncrementalIndex(
 
     IncrementalIndex newIndex = new IncrementalIndex.Builder()
         .setIndexSchema(indexSchema)
-        .setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows())
+        .setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows()) // only used by OffHeapIncrementalIndex
         .setMaxRowCount(tuningConfig.getRowFlushBoundary())
         .setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()))
         .buildOnheap();
@@ -334,8 +334,7 @@ protected void setup(Context context)
     @Override
     protected void innerMap(
         InputRow inputRow,
-        Context context,
-        boolean reportParseExceptions
+        Context context
     ) throws IOException, InterruptedException
     {
       // Group by bucket, sort by timestamp
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java
index 8630d13bdca..43ca77d6217 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java
@@ -262,8 +262,7 @@ public void readFields(DataInput in) throws IOException
     @Override
     protected void innerMap(
         final InputRow inputRow,
-        final Context context,
-        final boolean reportParseExceptions
+        final Context context
     )
     {
       rows.add(inputRow);
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index c496d023714..f95fba86713 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -231,7 +231,7 @@ public ColumnCapabilities getColumnCapabilities(String columnName)
   private final AggregatorFactory[] metrics;
   private final AggregatorType[] aggs;
   private final boolean deserializeComplexMetrics;
-  private final boolean reportParseExceptions;
+  private final boolean reportParseExceptions; // only used by OffHeapIncrementalIndex
   private final Metadata metadata;
 
   private final Map<String, MetricDesc> metricDescs;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Mime
View raw message