hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From le...@apache.org
Subject [incubator-hudi] branch master updated: Revert "[HUDI-455] Redo hudi-client log statements using SLF4J (#1145)" (#1181)
Date Mon, 06 Jan 2020 13:13:38 GMT
This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b9fab0b  Revert "[HUDI-455] Redo hudi-client log statements using SLF4J (#1145)" (#1181)
b9fab0b is described below

commit b9fab0b933315d88e059a07a0dcf2397d5b69d14
Author: hejinbiao123 <38057507+hejinbiao123@users.noreply.github.com>
AuthorDate: Mon Jan 6 21:13:29 2020 +0800

    Revert "[HUDI-455] Redo hudi-client log statements using SLF4J (#1145)" (#1181)
    
    This reverts commit e637d9ed26fea1a336f2fd6139cde0dd192c429d.
---
 hudi-client/pom.xml                                |  5 --
 .../java/org/apache/hudi/AbstractHoodieClient.java |  6 +--
 .../org/apache/hudi/CompactionAdminClient.java     | 17 ++++---
 .../java/org/apache/hudi/HoodieCleanClient.java    | 16 +++----
 .../java/org/apache/hudi/HoodieReadClient.java     |  6 +--
 .../java/org/apache/hudi/HoodieWriteClient.java    | 56 +++++++++++-----------
 .../client/embedded/EmbeddedTimelineService.java   | 10 ++--
 .../hbase/DefaultHBaseQPSResourceAllocator.java    | 10 ++--
 .../org/apache/hudi/index/hbase/HBaseIndex.java    | 42 ++++++++--------
 .../org/apache/hudi/io/HoodieAppendHandle.java     | 20 ++++----
 .../java/org/apache/hudi/io/HoodieCleanHelper.java | 16 ++++---
 .../org/apache/hudi/io/HoodieCommitArchiveLog.java | 22 ++++-----
 .../org/apache/hudi/io/HoodieCreateHandle.java     | 17 +++----
 .../org/apache/hudi/io/HoodieKeyLookupHandle.java  | 19 ++++----
 .../java/org/apache/hudi/io/HoodieMergeHandle.java | 39 +++++++--------
 .../java/org/apache/hudi/io/HoodieWriteHandle.java | 10 ++--
 .../io/compact/HoodieRealtimeTableCompactor.java   | 28 +++++------
 .../org/apache/hudi/metrics/HoodieMetrics.java     | 17 ++++---
 .../apache/hudi/metrics/JmxMetricsReporter.java    |  6 +--
 .../main/java/org/apache/hudi/metrics/Metrics.java |  6 +--
 .../hudi/metrics/MetricsGraphiteReporter.java      |  6 +--
 .../hudi/metrics/MetricsReporterFactory.java       |  8 ++--
 .../apache/hudi/table/HoodieCopyOnWriteTable.java  | 52 ++++++++++----------
 .../apache/hudi/table/HoodieMergeOnReadTable.java  | 27 ++++++-----
 .../java/org/apache/hudi/table/HoodieTable.java    | 12 ++---
 .../org/apache/hudi/table/RollbackExecutor.java    | 14 +++---
 26 files changed, 245 insertions(+), 242 deletions(-)

diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 66538e0..d350777 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -85,11 +85,6 @@
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>${slf4j.version}</version>
-    </dependency>
 
     <!-- Parquet -->
     <dependency>
diff --git a/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java
index 8457b90..dd108be 100644
--- a/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java
@@ -26,9 +26,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -39,7 +39,7 @@ import java.io.Serializable;
  */
 public abstract class AbstractHoodieClient implements Serializable, AutoCloseable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractHoodieClient.class);
+  private static final Logger LOG = LogManager.getLogger(AbstractHoodieClient.class);
 
   protected final transient FileSystem fs;
   protected final transient JavaSparkContext jsc;
diff --git a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java
index 00e0f75..56a47b7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java
@@ -45,9 +45,9 @@ import org.apache.hudi.func.OperationResult;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -65,7 +65,7 @@ import static org.apache.hudi.common.table.HoodieTimeline.COMPACTION_ACTION;
  */
 public class CompactionAdminClient extends AbstractHoodieClient {
 
-  private static final Logger LOG = LoggerFactory.getLogger(CompactionAdminClient.class);
+  private static final Logger LOG = LogManager.getLogger(CompactionAdminClient.class);
 
   public CompactionAdminClient(JavaSparkContext jsc, String basePath) {
     super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build());
@@ -358,14 +358,13 @@ public class CompactionAdminClient extends AbstractHoodieClient {
       if (!dryRun) {
         return jsc.parallelize(renameActions, parallelism).map(lfPair -> {
           try {
-            LOG.info("RENAME {} => {}", lfPair.getLeft().getPath(), lfPair.getRight().getPath());
+            LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath());
             renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight());
             return new RenameOpResult(lfPair, true, Option.empty());
           } catch (IOException e) {
             LOG.error("Error renaming log file", e);
-            LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. "
-                            + "Try running \"compaction repair {} \" to recover from failure ***\n\n\n",
-                    lfPair.getLeft().getBaseCommitTime());
+            LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. Try running \"compaction repair "
+                + lfPair.getLeft().getBaseCommitTime() + "\" to recover from failure ***\n\n\n");
             return new RenameOpResult(lfPair, false, Option.of(e));
           }
         }).collect();
@@ -396,7 +395,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
     HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant);
     if (plan.getOperations() != null) {
       LOG.info(
-          "Number of Compaction Operations :{} for instant :{}", plan.getOperations().size(), compactionInstant);
+          "Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant);
       List<CompactionOperation> ops = plan.getOperations().stream()
           .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
       return jsc.parallelize(ops, parallelism).flatMap(op -> {
@@ -410,7 +409,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
         }
       }).collect();
     }
-    LOG.warn("No operations for compaction instant : {}", compactionInstant);
+    LOG.warn("No operations for compaction instant : " + compactionInstant);
     return new ArrayList<>();
   }
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java
index 68503c6..9411782 100644
--- a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java
@@ -39,16 +39,16 @@ import org.apache.hudi.table.HoodieTable;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
 
 public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieCleanClient.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieCleanClient.class);
   private final transient HoodieMetrics metrics;
 
   public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) {
@@ -85,7 +85,7 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
 
     // If there are inflight(failed) or previously requested clean operation, first perform them
     table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
-      LOG.info("There were previously unfinished cleaner operations. Finishing Instant={}", hoodieInstant);
+      LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant);
       runClean(table, hoodieInstant);
     });
 
@@ -122,7 +122,7 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
       // Save to both aux and timeline folder
       try {
         table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan));
-        LOG.info("Requesting Cleaning with instant time {}", cleanInstant);
+        LOG.info("Requesting Cleaning with instant time " + cleanInstant);
       } catch (IOException e) {
         LOG.error("Got exception when saving cleaner requested file", e);
         throw new HoodieIOException(e.getMessage(), e);
@@ -173,20 +173,20 @@ public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHo
       Option<Long> durationInMs = Option.empty();
       if (context != null) {
         durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
-        LOG.info("cleanerElaspsedTime (Minutes): {}", durationInMs.get() / (1000 * 60));
+        LOG.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60));
       }
 
       HoodieTableMetaClient metaClient = createMetaClient(true);
       // Create the metadata and save it
       HoodieCleanMetadata metadata =
           CleanerUtils.convertCleanMetadata(metaClient, cleanInstant.getTimestamp(), durationInMs, cleanStats);
-      LOG.info("Cleaned {} files. Earliest Retained : {}", metadata.getTotalFilesDeleted(), metadata.getEarliestCommitToRetain());
+      LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain());
       metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted());
 
       table.getActiveTimeline().transitionCleanInflightToComplete(
           new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()),
           AvroUtils.serializeCleanMetadata(metadata));
-      LOG.info("Marked clean started on {} as complete", cleanInstant.getTimestamp());
+      LOG.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete");
       return metadata;
     } catch (IOException e) {
       throw new HoodieIOException("Failed to clean up after commit", e);
diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java
index f309f40..3c4290c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java
@@ -35,6 +35,8 @@ import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -49,8 +51,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 /**
@@ -58,7 +58,7 @@ import scala.Tuple2;
  */
 public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieReadClient.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class);
 
   /**
    * TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple
diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
index 09e3f58..efb6d20 100644
--- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
@@ -67,6 +67,8 @@ import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
@@ -84,8 +86,6 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 /**
@@ -96,7 +96,7 @@ import scala.Tuple2;
  */
 public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteClient.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieWriteClient.class);
   private static final String UPDATE_STR = "update";
   private static final String LOOKUP_STR = "lookup";
   private final boolean rollbackPending;
@@ -399,13 +399,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
 
   private void commitOnAutoCommit(String commitTime, JavaRDD<WriteStatus> resultRDD, String actionType) {
     if (config.shouldAutoCommit()) {
-      LOG.info("Auto commit enabled: Committing {}", commitTime);
+      LOG.info("Auto commit enabled: Committing " + commitTime);
       boolean commitResult = commit(commitTime, resultRDD, Option.empty(), actionType);
       if (!commitResult) {
         throw new HoodieCommitException("Failed to commit " + commitTime);
       }
     } else {
-      LOG.info("Auto commit disabled for {}", commitTime);
+      LOG.info("Auto commit disabled for " + commitTime);
     }
   }
 
@@ -454,13 +454,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     if (preppedRecords.getStorageLevel() == StorageLevel.NONE()) {
       preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
     } else {
-      LOG.info("RDD PreppedRecords was persisted at: {}", preppedRecords.getStorageLevel());
+      LOG.info("RDD PreppedRecords was persisted at: " + preppedRecords.getStorageLevel());
     }
 
     WorkloadProfile profile = null;
     if (hoodieTable.isWorkloadProfileNeeded()) {
       profile = new WorkloadProfile(preppedRecords);
-      LOG.info("Workload profile : {}", profile);
+      LOG.info("Workload profile :" + profile);
       saveWorkloadProfileMetadataToInflight(profile, hoodieTable, commitTime);
     }
 
@@ -526,7 +526,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   private boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,
       Option<Map<String, String>> extraMetadata, String actionType) {
 
-    LOG.info("Commiting {}", commitTime);
+    LOG.info("Commiting " + commitTime);
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
 
@@ -573,7 +573,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
             metadata, actionType);
         writeContext = null;
       }
-      LOG.info("Committed {}", commitTime);
+      LOG.info("Committed " + commitTime);
     } catch (IOException e) {
       throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + commitTime,
           e);
@@ -607,7 +607,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     }
 
     String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp();
-    LOG.info("Savepointing latest commit {}", latestCommit);
+    LOG.info("Savepointing latest commit " + latestCommit);
     return savepoint(latestCommit, user, comment);
   }
 
@@ -658,7 +658,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
               config.shouldAssumeDatePartitioning()))
           .mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
             // Scan all partitions files with this commit time
-            LOG.info("Collecting latest files in partition path {}", partitionPath);
+            LOG.info("Collecting latest files in partition path " + partitionPath);
             ReadOptimizedView view = table.getROFileSystemView();
             List<String> latestFiles = view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime)
                 .map(HoodieDataFile::getFileName).collect(Collectors.toList());
@@ -672,7 +672,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       table.getActiveTimeline()
           .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime),
               AvroUtils.serializeSavepointMetadata(metadata));
-      LOG.info("Savepoint {} created", commitTime);
+      LOG.info("Savepoint " + commitTime + " created");
       return true;
     } catch (IOException e) {
       throw new HoodieSavepointException("Failed to savepoint " + commitTime, e);
@@ -696,13 +696,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
     boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint);
     if (!isSavepointPresent) {
-      LOG.warn("No savepoint present {}", savepointTime);
+      LOG.warn("No savepoint present " + savepointTime);
       return;
     }
 
     activeTimeline.revertToInflight(savePoint);
     activeTimeline.deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime));
-    LOG.info("Savepoint {} deleted", savepointTime);
+    LOG.info("Savepoint " + savepointTime + " deleted");
   }
 
   /**
@@ -730,7 +730,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     } else {
       throw new IllegalArgumentException("Compaction is not in requested state " + compactionTime);
     }
-    LOG.info("Compaction {} deleted", compactionTime);
+    LOG.info("Compaction " + compactionTime + " deleted");
   }
 
   /**
@@ -758,7 +758,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
 
     List<String> commitsToRollback = commitTimeline.findInstantsAfter(savepointTime, Integer.MAX_VALUE).getInstants()
         .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
-    LOG.info("Rolling back commits {}", commitsToRollback);
+    LOG.info("Rolling back commits " + commitsToRollback);
 
     restoreToInstant(savepointTime);
 
@@ -818,7 +818,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
             // delete these files when it does not see a corresponding instant file under .hoodie
             List<HoodieRollbackStat> statsForCompaction = doRollbackAndGetStats(instant);
             instantsToStats.put(instant.getTimestamp(), statsForCompaction);
-            LOG.info("Deleted compaction instant {}", instant);
+            LOG.info("Deleted compaction instant " + instant);
             break;
           default:
             throw new IllegalArgumentException("invalid action name " + instant.getAction());
@@ -859,7 +859,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
 
     if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) {
       // nothing to rollback
-      LOG.info("No commits to rollback {}", commitToRollback);
+      LOG.info("No commits to rollback " + commitToRollback);
     }
 
     // Make sure only the last n commits are being rolled back
@@ -880,13 +880,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
 
     List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true);
 
-    LOG.info("Deleted inflight commits {}", commitToRollback);
+    LOG.info("Deleted inflight commits " + commitToRollback);
 
     // cleanup index entries
     if (!index.rollbackCommit(commitToRollback)) {
       throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback);
     }
-    LOG.info("Index rolled back for commits {}", commitToRollback);
+    LOG.info("Index rolled back for commits " + commitToRollback);
     return stats;
   }
 
@@ -907,7 +907,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     table.getActiveTimeline().saveAsComplete(
         new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime),
         AvroUtils.serializeRollbackMetadata(rollbackMetadata));
-    LOG.info("Commits {} rollback is complete", commitsToRollback);
+    LOG.info("Commits " + commitsToRollback + " rollback is complete");
 
     if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
       LOG.info("Cleaning up older rollback meta files");
@@ -935,7 +935,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
         AvroUtils.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats);
     table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime),
         AvroUtils.serializeRestoreMetadata(restoreMetadata));
-    LOG.info("Commits {} rollback is complete. Restored dataset to {}", commitsToRollback, restoreToInstant);
+    LOG.info("Commits " + commitsToRollback + " rollback is complete. Restored dataset to " + restoreToInstant);
 
     if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
       LOG.info("Cleaning up older restore meta files");
@@ -1027,7 +1027,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   }
 
   private void startCommit(String instantTime) {
-    LOG.info("Generate a new instant time {}", instantTime);
+    LOG.info("Generate a new instant time " + instantTime);
     HoodieTableMetaClient metaClient = createMetaClient(true);
     // if there are pending compactions, their instantTime must not be greater than that of this instant time
     metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> {
@@ -1047,7 +1047,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws IOException {
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
-    LOG.info("Generate a new instant time {}", instantTime);
+    LOG.info("Generate a new instant time " + instantTime);
     boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata);
     return notEmpty ? Option.of(instantTime) : Option.empty();
   }
@@ -1291,9 +1291,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
               + config.getBasePath() + " at time " + compactionCommitTime, e);
         }
       }
-      LOG.info("Compacted successfully on commit {}", compactionCommitTime);
+      LOG.info("Compacted successfully on commit " + compactionCommitTime);
     } else {
-      LOG.info("Compaction did not run for commit {}", compactionCommitTime);
+      LOG.info("Compaction did not run for commit " + compactionCommitTime);
     }
   }
 
@@ -1304,7 +1304,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       if (finalizeCtx != null) {
         Option<Long> durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
         durationInMs.ifPresent(duration -> {
-          LOG.info("Finalize write elapsed time (milliseconds): {}", duration);
+          LOG.info("Finalize write elapsed time (milliseconds): " + duration);
           metrics.updateFinalizeWriteMetrics(duration, stats.size());
         });
       }
@@ -1344,7 +1344,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       m.forEach(metadata::addMetadata);
     });
 
-    LOG.info("Committing Compaction {}. Finished with result {}", compactionCommitTime, metadata);
+    LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
 
     try {
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index a958617..5afee3f 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -26,9 +26,9 @@ import org.apache.hudi.common.util.NetworkUtils;
 import org.apache.hudi.timeline.service.TimelineService;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -37,7 +37,7 @@ import java.io.IOException;
  */
 public class EmbeddedTimelineService {
 
-  private static final Logger LOG = LoggerFactory.getLogger(EmbeddedTimelineService.class);
+  private static final Logger LOG = LogManager.getLogger(EmbeddedTimelineService.class);
 
   private int serverPort;
   private String hostAddr;
@@ -72,13 +72,13 @@ public class EmbeddedTimelineService {
   public void startServer() throws IOException {
     server = new TimelineService(0, viewManager, hadoopConf.newCopy());
     serverPort = server.startService();
-    LOG.info("Started embedded timeline server at {} : {}", hostAddr, serverPort);
+    LOG.info("Started embedded timeline server at " + hostAddr + ":" + serverPort);
   }
 
   private void setHostAddrFromSparkConf(SparkConf sparkConf) {
     String hostAddr = sparkConf.get("spark.driver.host", null);
     if (hostAddr != null) {
-      LOG.info("Overriding hostIp to ({}) found in spark-conf. It was {}", hostAddr, this.hostAddr);
+      LOG.info("Overriding hostIp to (" + hostAddr + ") found in spark-conf. It was " + this.hostAddr);
       this.hostAddr = hostAddr;
     } else {
       LOG.warn("Unable to find driver bind address from spark config");
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java
index 4c8f9c4..e3a4904 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/DefaultHBaseQPSResourceAllocator.java
@@ -20,12 +20,12 @@ package org.apache.hudi.index.hbase;
 
 import org.apache.hudi.config.HoodieWriteConfig;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 public class DefaultHBaseQPSResourceAllocator implements HBaseIndexQPSResourceAllocator {
   private HoodieWriteConfig hoodieWriteConfig;
-  private static final Logger LOG = LoggerFactory.getLogger(DefaultHBaseQPSResourceAllocator.class);
+  private static final Logger LOG = LogManager.getLogger(DefaultHBaseQPSResourceAllocator.class);
 
   public DefaultHBaseQPSResourceAllocator(HoodieWriteConfig hoodieWriteConfig) {
     this.hoodieWriteConfig = hoodieWriteConfig;
@@ -46,7 +46,7 @@ public class DefaultHBaseQPSResourceAllocator implements HBaseIndexQPSResourceAl
   @Override
   public void releaseQPSResources() {
     // Do nothing, as there are no resources locked in default implementation
-    LOG.info("Release QPS resources called for {} with default implementation, do nothing",
-        this.hoodieWriteConfig.getHbaseTableName());
+    LOG.info(String.format("Release QPS resources called for %s with default implementation, do nothing",
+        this.hoodieWriteConfig.getHbaseTableName()));
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
index 5308a10..3789bff 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
@@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -63,8 +65,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 /**
@@ -83,7 +83,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
   private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
   private static final int SLEEP_TIME_MILLISECONDS = 100;
 
-  private static final Logger LOG = LoggerFactory.getLogger(HBaseIndex.class);
+  private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
   private static Connection hbaseConnection = null;
   private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
   private float qpsFraction;
@@ -115,7 +115,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
   @VisibleForTesting
   public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) {
     try {
-      LOG.info("createQPSResourceAllocator : {}", config.getHBaseQPSResourceAllocatorClass());
+      LOG.info("createQPSResourceAllocator :" + config.getHBaseQPSResourceAllocatorClass());
       return (HBaseIndexQPSResourceAllocator) ReflectionUtils
               .loadClass(config.getHBaseQPSResourceAllocatorClass(), config);
     } catch (Exception e) {
@@ -320,7 +320,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
             doPutsAndDeletes(hTable, puts, deletes);
           } catch (Exception e) {
             Exception we = new Exception("Error updating index for " + writeStatus, e);
-            LOG.error("Error updating index for {}", writeStatus, e);
+            LOG.error(we);
             writeStatus.setGlobalError(we);
           }
           writeStatusList.add(writeStatus);
@@ -370,7 +370,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
       HoodieTable<T> hoodieTable) {
     final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
     setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
-    LOG.info("multiPutBatchSize: before HBase puts {}", multiPutBatchSize);
+    LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
     JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
     // caching the index updated status RDD
     writeStatusJavaRDD = writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel());
@@ -398,15 +398,15 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
       this.numRegionServersForTable = getNumRegionServersAliveForTable();
       final float desiredQPSFraction =
           hBaseIndexQPSResourceAllocator.calculateQPSFractionForPutsTime(numPuts, this.numRegionServersForTable);
-      LOG.info("Desired QPSFraction : {}", desiredQPSFraction);
-      LOG.info("Number HBase puts : {}", numPuts);
-      LOG.info("HBase Puts Parallelism : {}", hbasePutsParallelism);
+      LOG.info("Desired QPSFraction :" + desiredQPSFraction);
+      LOG.info("Number HBase puts :" + numPuts);
+      LOG.info("Hbase Puts Parallelism :" + hbasePutsParallelism);
       final float availableQpsFraction =
           hBaseIndexQPSResourceAllocator.acquireQPSResources(desiredQPSFraction, numPuts);
       LOG.info("Allocated QPS Fraction :" + availableQpsFraction);
       multiPutBatchSize = putBatchSizeCalculator.getBatchSize(numRegionServersForTable, maxQpsPerRegionServer,
           hbasePutsParallelism, maxExecutors, SLEEP_TIME_MILLISECONDS, availableQpsFraction);
-      LOG.info("multiPutBatchSize : {}",  multiPutBatchSize);
+      LOG.info("multiPutBatchSize :" + multiPutBatchSize);
     }
   }
 
@@ -420,7 +420,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
   public static class HbasePutBatchSizeCalculator implements Serializable {
 
     private static final int MILLI_SECONDS_IN_A_SECOND = 1000;
-    private static final Logger LOG = LoggerFactory.getLogger(HbasePutBatchSizeCalculator.class);
+    private static final Logger LOG = LogManager.getLogger(HbasePutBatchSizeCalculator.class);
 
     /**
      * Calculate putBatch size so that sum of requests across multiple jobs in a second does not exceed
@@ -462,15 +462,15 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
       int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors));
       int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs;
       int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec));
-      LOG.info("HBaseIndexThrottling: qpsFraction : {}", qpsFraction);
-      LOG.info("HBaseIndexThrottling: numRSAlive : {}", numRSAlive);
-      LOG.info("HBaseIndexThrottling: maxReqPerSec : {}", maxReqPerSec);
-      LOG.info("HBaseIndexThrottling: numTasks : {}", numTasks);
-      LOG.info("HBaseIndexThrottling: maxExecutors : {}", maxExecutors);
-      LOG.info("HBaseIndexThrottling: maxParallelPuts : {}", maxParallelPuts);
-      LOG.info("HBaseIndexThrottling: maxReqsSentPerTaskPerSec : {}", maxReqsSentPerTaskPerSec);
-      LOG.info("HBaseIndexThrottling: numRegionServersForTable : {}", numRegionServersForTable);
-      LOG.info("HBaseIndexThrottling: multiPutBatchSize : {}", multiPutBatchSize);
+      LOG.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction);
+      LOG.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive);
+      LOG.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec);
+      LOG.info("HbaseIndexThrottling: numTasks :" + numTasks);
+      LOG.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors);
+      LOG.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts);
+      LOG.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec);
+      LOG.info("HbaseIndexThrottling: numRegionServersForTable :" + numRegionServersForTable);
+      LOG.info("HbaseIndexThrottling: multiPutBatchSize :" + multiPutBatchSize);
       return multiPutBatchSize;
     }
   }
@@ -485,7 +485,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
             .toIntExact(regionLocator.getAllRegionLocations().stream().map(HRegionLocation::getServerName).distinct().count());
         return numRegionServersForTable;
       } catch (IOException e) {
-        LOG.error("Error while connecting HBase:", e);
+        LOG.error(e);
         throw new RuntimeException(e);
       }
     }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 33e4417..edf01ce 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -47,10 +47,10 @@ import com.google.common.collect.Maps;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.TaskContext;
 import org.apache.spark.util.SizeEstimator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -64,7 +64,7 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieAppendHandle.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class);
   // This acts as the sequenceID for records written
   private static AtomicLong recordIndex = new AtomicLong(1);
   private final String fileId;
@@ -123,7 +123,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
       } else {
         // This means there is no base data file, start appending to a new log file
         fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId));
-        LOG.info("New InsertHandle for partition : {}", partitionPath);
+        LOG.info("New InsertHandle for partition :" + partitionPath);
       }
       writeStatus.getStat().setPrevCommit(baseInstantTime);
       writeStatus.setFileId(fileId);
@@ -137,7 +137,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
         ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
         ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogOffset(writer.getCurrentSize());
       } catch (Exception e) {
-        LOG.error("Error in update task at commit {}", instantTime, e);
+        LOG.error("Error in update task at commit " + instantTime, e);
         writeStatus.setGlobalError(e);
         throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit "
             + instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath() + partitionPath, e);
@@ -179,7 +179,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
       hoodieRecord.deflate();
       return avroRecord;
     } catch (Exception e) {
-      LOG.error("Error writing record {}", hoodieRecord, e);
+      LOG.error("Error writing record  " + hoodieRecord, e);
       writeStatus.markFailure(hoodieRecord, e, recordMetadata);
     }
     return Option.empty();
@@ -232,7 +232,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
       // Not throwing exception from here, since we don't want to fail the entire job
       // for a single record
       writeStatus.markFailure(record, t, recordMetadata);
-      LOG.error("Error writing record {}", record, t);
+      LOG.error("Error writing record " + record, t);
     }
   }
 
@@ -259,8 +259,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
       runtimeStats.setTotalUpsertTime(timer.endTimer());
       stat.setRuntimeStats(runtimeStats);
 
-      LOG.info("AppendHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(),
-          stat.getFileId(), runtimeStats.getTotalUpsertTime());
+      LOG.info(String.format("AppendHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(),
+          stat.getFileId(), runtimeStats.getTotalUpsertTime()));
 
       return writeStatus;
     } catch (IOException e) {
@@ -308,7 +308,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
     if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
       // Recompute averageRecordSize before writing a new block and update existing value with
       // avg of new and old
-      LOG.info("AvgRecordSize => {}", averageRecordSize);
+      LOG.info("AvgRecordSize => " + averageRecordSize);
       averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2;
       doAppend(header);
       estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java
index d75df4b..9c319c8 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java
@@ -40,8 +40,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -62,7 +62,7 @@ import java.util.stream.Collectors;
  */
 public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Serializable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieCleanHelper.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieCleanHelper.class);
 
   private final SyncableFileSystemView fileSystemView;
   private final HoodieTimeline commitTimeline;
@@ -101,7 +101,8 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
         if ((cleanMetadata.getEarliestCommitToRetain() != null)
             && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
           LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
-                  + "since last cleaned at {}. New Instant to retain : {}", cleanMetadata.getEarliestCommitToRetain(), newInstantToRetain);
+              + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+              + ". New Instant to retain : " + newInstantToRetain);
           return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
               HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
               newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> {
@@ -126,7 +127,8 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
    * single file (i.e run it with versionsRetained = 1)
    */
   private List<String> getFilesToCleanKeepingLatestVersions(String partitionPath) {
-    LOG.info("Cleaning {}, retaining latest {} file versions. ", partitionPath, config.getCleanerFileVersionsRetained());
+    LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
+        + " file versions. ");
     List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
     List<String> deletePaths = new ArrayList<>();
     // Collect all the datafiles savepointed by all the savepoints
@@ -185,7 +187,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
    */
   private List<String> getFilesToCleanKeepingLatestCommits(String partitionPath) {
     int commitsRetained = config.getCleanerCommitsRetained();
-    LOG.info("Cleaning {}, retaining latest {} commits. ", partitionPath, commitsRetained);
+    LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
     List<String> deletePaths = new ArrayList<>();
 
     // Collect all the datafiles savepointed by all the savepoints
@@ -272,7 +274,7 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> implements Seri
     } else {
       throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
     }
-    LOG.info("{} patterns used to delete in partition path: {}", deletePaths.size(), partitionPath);
+    LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath);
 
     return deletePaths;
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java
index 9baad75..bafbc8d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java
@@ -54,9 +54,9 @@ import com.google.common.collect.Sets;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -72,7 +72,7 @@ import java.util.stream.Stream;
  */
 public class HoodieCommitArchiveLog {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieCommitArchiveLog.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieCommitArchiveLog.class);
 
   private final Path archiveFilePath;
   private final HoodieTableMetaClient metaClient;
@@ -119,9 +119,9 @@ public class HoodieCommitArchiveLog {
       boolean success = true;
       if (!instantsToArchive.isEmpty()) {
         this.writer = openWriter();
-        LOG.info("Archiving instants {}", instantsToArchive);
+        LOG.info("Archiving instants " + instantsToArchive);
         archive(instantsToArchive);
-        LOG.info("Deleting archived instants {}", instantsToArchive);
+        LOG.info("Deleting archived instants " + instantsToArchive);
         success = deleteArchivedInstants(instantsToArchive);
       } else {
         LOG.info("No Instants to archive");
@@ -188,14 +188,14 @@ public class HoodieCommitArchiveLog {
   }
 
   private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) throws IOException {
-    LOG.info("Deleting instants {}", archivedInstants);
+    LOG.info("Deleting instants " + archivedInstants);
     boolean success = true;
     for (HoodieInstant archivedInstant : archivedInstants) {
       Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
       try {
         if (metaClient.getFs().exists(commitFile)) {
           success &= metaClient.getFs().delete(commitFile, false);
-          LOG.info("Archived and deleted instant file {}", commitFile);
+          LOG.info("Archived and deleted instant file " + commitFile);
         }
       } catch (IOException e) {
         throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e);
@@ -205,7 +205,7 @@ public class HoodieCommitArchiveLog {
     // Remove older meta-data from auxiliary path too
     Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
         || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
-    LOG.info("Latest Committed Instant={}", latestCommitted);
+    LOG.info("Latest Committed Instant=" + latestCommitted);
     if (latestCommitted.isPresent()) {
       success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get());
     }
@@ -233,7 +233,7 @@ public class HoodieCommitArchiveLog {
       Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName());
       if (metaClient.getFs().exists(metaFile)) {
         success &= metaClient.getFs().delete(metaFile, false);
-        LOG.info("Deleted instant file in auxiliary metapath : {}", metaFile);
+        LOG.info("Deleted instant file in auxiliary metapath : " + metaFile);
       }
     }
     return success;
@@ -243,7 +243,7 @@ public class HoodieCommitArchiveLog {
     try {
       HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
       Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
-      LOG.info("Wrapper schema {}", wrapperSchema.toString());
+      LOG.info("Wrapper schema " + wrapperSchema.toString());
       List<IndexedRecord> records = new ArrayList<>();
       for (HoodieInstant hoodieInstant : instants) {
         try {
@@ -252,7 +252,7 @@ public class HoodieCommitArchiveLog {
             writeToFile(wrapperSchema, records);
           }
         } catch (Exception e) {
-          LOG.error("Failed to archive commits, commit file: {}", hoodieInstant.getFileName(), e);
+          LOG.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e);
           if (this.config.isFailOnTimelineArchivingEnabled()) {
             throw e;
           }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index bece881..095e0a0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -36,16 +36,16 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.TaskContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Iterator;
 
 public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieCreateHandle.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);
 
   private final HoodieStorageWriter<IndexedRecord> storageWriter;
   private final Path path;
@@ -73,7 +73,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
     } catch (IOException e) {
       throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
     }
-    LOG.info("New CreateHandle for partition : {} with fileId {}", partitionPath, fileId);
+    LOG.info("New CreateHandle for partition :" + partitionPath + " with fileId " + fileId);
   }
 
   /**
@@ -120,7 +120,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
       // Not throwing exception from here, since we don't want to fail the entire job
       // for a single record
       writeStatus.markFailure(record, t, recordMetadata);
-      LOG.error("Error writing record {}", record, t);
+      LOG.error("Error writing record " + record, t);
     }
   }
 
@@ -152,7 +152,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
    */
   @Override
   public WriteStatus close() {
-    LOG.info("Closing the file {} as we are done with all the records {}", writeStatus.getFileId(), recordsWritten);
+    LOG
+        .info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
     try {
 
       storageWriter.close();
@@ -174,8 +175,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
       stat.setRuntimeStats(runtimeStats);
       writeStatus.setStat(stat);
 
-      LOG.info("CreateHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(),
-              stat.getFileId(), runtimeStats.getTotalCreateTime());
+      LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(),
+          stat.getFileId(), runtimeStats.getTotalCreateTime()));
 
       return writeStatus;
     } catch (IOException e) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
index 45472bc..9f3bdbb 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
@@ -31,8 +31,8 @@ import org.apache.hudi.table.HoodieTable;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -44,7 +44,7 @@ import java.util.Set;
  */
 public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends HoodieReadHandle<T> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieKeyLookupHandle.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieKeyLookupHandle.class);
 
   private final HoodieTableType tableType;
 
@@ -63,7 +63,7 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie
     HoodieTimer timer = new HoodieTimer().startTimer();
     this.bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(hoodieTable.getHadoopConf(),
         new Path(getLatestDataFile().getPath()));
-    LOG.info("Read bloom filter from {} in {} ms", partitionPathFilePair, timer.endTimer());
+    LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFilePair, timer.endTimer()));
   }
 
   /**
@@ -82,7 +82,7 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie
         LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
             timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Keys matching for file {} => {}", filePath, foundRecordKeys);
+          LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
         }
       }
     } catch (Exception e) {
@@ -98,7 +98,7 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie
     // check record key against bloom filter of current file & add to possible keys if needed
     if (bloomFilter.mightContain(recordKey)) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Record key {} matches bloom filter in {}", recordKey, partitionPathFilePair);
+        LOG.debug("Record key " + recordKey + " matches bloom filter in  " + partitionPathFilePair);
       }
       candidateRecordKeys.add(recordKey);
     }
@@ -110,14 +110,15 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends Hoodie
    */
   public KeyLookupResult getLookupResult() {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("#The candidate row keys for {} => {}", partitionPathFilePair, candidateRecordKeys);
+      LOG.debug("#The candidate row keys for " + partitionPathFilePair + " => " + candidateRecordKeys);
     }
 
     HoodieDataFile dataFile = getLatestDataFile();
     List<String> matchingKeys =
         checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
-    LOG.info("Total records ({}), bloom filter candidates ({})/fp({}), actual matches ({})", totalKeysChecked,
-                    candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size());
+    LOG.info(
+        String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked,
+            candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
     return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
         dataFile.getCommitTime(), matchingKeys);
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 0c801e7..518b883 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -43,9 +43,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.TaskContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -56,7 +56,7 @@ import java.util.Set;
 @SuppressWarnings("Duplicates")
 public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWriteHandle<T> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeHandle.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
 
   private Map<String, HoodieRecord<T>> keyToNewRecords;
   private Set<String> writtenRecordKeys;
@@ -137,7 +137,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
     if (exception.isPresent() && exception.get() instanceof Throwable) {
       // Not throwing exception from here, since we don't want to fail the entire job for a single record
       writeStatus.markFailure(record, exception.get(), recordMetadata);
-      LOG.error("Error writing record {}", record, exception.get());
+      LOG.error("Error writing record " + record, exception.get());
     } else {
       write(record, avroRecord);
     }
@@ -155,7 +155,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
    * Extract old file path, initialize StorageWriter and WriteStatus.
    */
   private void init(String fileId, String partitionPath, HoodieDataFile dataFileToBeMerged) {
-    LOG.info("partitionPath: {}, fileId to be merged: {}", partitionPath, fileId);
+    LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId);
     this.writtenRecordKeys = new HashSet<>();
     writeStatus.setStat(new HoodieWriteStat());
     try {
@@ -171,7 +171,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
           + FSUtils.makeDataFileName(instantTime, writeToken, fileId)).toString();
       newFilePath = new Path(config.getBasePath(), relativePath);
 
-      LOG.info("Merging new data into oldPath {}, as newPath {}", oldFilePath.toString(), newFilePath.toString());
+      LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
+          newFilePath.toString()));
       // file name is same for all records, in this bunch
       writeStatus.setFileId(fileId);
       writeStatus.setPartitionPath(partitionPath);
@@ -186,7 +187,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
       storageWriter =
           HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema);
     } catch (IOException io) {
-      LOG.error("Error in update task at commit {}", instantTime, io);
+      LOG.error("Error in update task at commit " + instantTime, io);
       writeStatus.setGlobalError(io);
       throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit "
           + instantTime + " on path " + hoodieTable.getMetaClient().getBasePath(), io);
@@ -200,7 +201,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
     try {
       // Load the new records in a map
       long memoryForMerge = config.getMaxMemoryPerPartitionMerge();
-      LOG.info("MaxMemoryPerPartitionMerge => {}", memoryForMerge);
+      LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
       this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
           new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema));
     } catch (IOException io) {
@@ -217,10 +218,12 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
       // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist
       keyToNewRecords.put(record.getRecordKey(), record);
     }
-    LOG.info("Number of entries in MemoryBasedMap => {}. Total size in bytes of MemoryBasedMap => {}. "
-                    + "Number of entries in DiskBasedMap => {}. Size of file spilled to disk => {}",
-            ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries(), ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize(),
-            ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries(), ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
+    LOG.info("Number of entries in MemoryBasedMap => "
+        + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
+        + "Total size in bytes of MemoryBasedMap => "
+        + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => "
+        + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => "
+        + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
     return partitionPath;
   }
 
@@ -250,7 +253,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
       hoodieRecord.deflate();
       return true;
     } catch (Exception e) {
-      LOG.error("Error writing record {}", hoodieRecord, e);
+      LOG.error("Error writing record  " + hoodieRecord, e);
       writeStatus.markFailure(hoodieRecord, e, recordMetadata);
     }
     return false;
@@ -292,12 +295,12 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
       try {
         storageWriter.writeAvro(key, oldRecord);
       } catch (ClassCastException e) {
-        LOG.error("Schema mismatch when rewriting old record {} from file {} to file {} with writerSchema {}",
-                oldRecord, getOldFilePath(), newFilePath, writerSchema.toString(true));
+        LOG.error("Schema mismatch when rewriting old record " + oldRecord + " from file " + getOldFilePath()
+            + " to file " + newFilePath + " with writerSchema " + writerSchema.toString(true));
         throw new HoodieUpsertException(errMsg, e);
       } catch (IOException e) {
-        LOG.error("Failed to merge old record into new file for key {} from old file {} to new file {}",
-                key, getOldFilePath(), newFilePath, e);
+        LOG.error("Failed to merge old record into new file for key " + key + " from old file " + getOldFilePath()
+            + " to new file " + newFilePath, e);
         throw new HoodieUpsertException(errMsg, e);
       }
       recordsWritten++;
@@ -342,8 +345,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
 
       LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(),
           stat.getFileId(), runtimeStats.getTotalUpsertTime()));
-      LOG.info("MergeHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(),
-              stat.getFileId(), runtimeStats.getTotalUpsertTime());
 
       return writeStatus;
     } catch (IOException e) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 50256bc..7a1939a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -36,9 +36,9 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.TaskContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -47,7 +47,7 @@ import java.io.IOException;
  */
 public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends HoodieIOHandle {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteHandle.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class);
   protected final Schema originalSchema;
   protected final Schema writerSchema;
   protected HoodieTimer timer;
@@ -97,7 +97,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
   protected void createMarkerFile(String partitionPath) {
     Path markerPath = makeNewMarkerPath(partitionPath);
     try {
-      LOG.info("Creating Marker Path={}", markerPath);
+      LOG.info("Creating Marker Path=" + markerPath);
       fs.create(markerPath, false).close();
     } catch (IOException e) {
       throw new HoodieException("Failed to create marker file " + markerPath, e);
@@ -147,7 +147,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
     if (exception.isPresent() && exception.get() instanceof Throwable) {
       // Not throwing exception from here, since we don't want to fail the entire job for a single record
       writeStatus.markFailure(record, exception.get(), recordMetadata);
-      LOG.error("Error writing record {}", record, exception.get());
+      LOG.error("Error writing record " + record, exception.get());
     } else {
       write(record, avroRecord);
     }
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java
index 1d02d4e..6f97601 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java
@@ -47,13 +47,13 @@ import com.google.common.collect.Sets;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.util.AccumulatorV2;
 import org.apache.spark.util.LongAccumulator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -74,7 +74,7 @@ import static java.util.stream.Collectors.toList;
  */
 public class HoodieRealtimeTableCompactor implements HoodieCompactor {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieRealtimeTableCompactor.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieRealtimeTableCompactor.class);
   // Accumulator to keep track of total log files for a dataset
   private AccumulatorV2<Long, Long> totalLogFiles;
   // Accumulator to keep track of total log file slices for a dataset
@@ -92,7 +92,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
     HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
     List<CompactionOperation> operations = compactionPlan.getOperations().stream()
         .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
-    LOG.info("Compactor compacting {} files", operations);
+    LOG.info("Compactor compacting " + operations + " files");
 
     return jsc.parallelize(operations, operations.size())
         .map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator);
@@ -103,8 +103,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
     FileSystem fs = metaClient.getFs();
 
     Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
-    LOG.info("Compacting base {} with delta files {} for commit {}",
-            operation.getDataFileName(), operation.getDeltaFileNames(), commitTime);
+    LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
+        + " for commit " + commitTime);
     // TODO - FIX THIS
     // Reads the entire avro file. Always only specific blocks should be read from the avro file
     // (failure recover).
@@ -115,7 +115,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
         .getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.COMMIT_ACTION,
             HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
         .filterCompletedInstants().lastInstant().get().getTimestamp();
-    LOG.info("MaxMemoryPerCompaction => {}", config.getMaxMemoryPerCompaction());
+    LOG.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction());
 
     List<String> logFiles = operation.getDeltaFileNames().stream().map(
         p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
@@ -176,7 +176,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
     // TODO : check if maxMemory is not greater than JVM or spark.executor memory
     // TODO - rollback any compactions in flight
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
-    LOG.info("Compacting {} with commit {}", metaClient.getBasePath(), compactionCommitTime);
+    LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
     List<String> partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
         config.shouldAssumeDatePartitioning());
 
@@ -189,7 +189,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
     }
 
     RealtimeView fileSystemView = hoodieTable.getRTFileSystemView();
-    LOG.info("Compaction looking for files to compact in {} partitions", partitionPaths);
+    LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
     List<HoodieCompactionOperation> operations = jsc.parallelize(partitionPaths, partitionPaths.size())
         .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView
             .getLatestFileSlices(partitionPath)
@@ -206,10 +206,10 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
                   config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles));
             }).filter(c -> !c.getDeltaFileNames().isEmpty()).collect(toList()).iterator())
         .collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
-    LOG.info("Total of {} compactions are retrieved", operations.size());
-    LOG.info("Total number of latest files slices {}", totalFileSlices.value());
-    LOG.info("Total number of log files {}", totalLogFiles.value());
-    LOG.info("Total number of file slices {}", totalFileSlices.value());
+    LOG.info("Total of " + operations.size() + " compactions are retrieved");
+    LOG.info("Total number of latest files slices " + totalFileSlices.value());
+    LOG.info("Total number of log files " + totalLogFiles.value());
+    LOG.info("Total number of file slices " + totalFileSlices.value());
     // Filter the compactions with the passed in filter. This lets us choose most effective
     // compactions only
     HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations,
@@ -221,7 +221,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
             + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions
             + ", Selected workload :" + compactionPlan);
     if (compactionPlan.getOperations().isEmpty()) {
-      LOG.warn("After filtering, Nothing to compact for {}", metaClient.getBasePath());
+      LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
     }
     return compactionPlan;
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index c0dd905..b6fcd09 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -24,15 +24,15 @@ import org.apache.hudi.config.HoodieWriteConfig;
 
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 /**
  * Wrapper for metrics-related operations.
  */
 public class HoodieMetrics {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieMetrics.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieMetrics.class);
   // Some timers
   public String rollbackTimerName = null;
   public String cleanTimerName = null;
@@ -155,7 +155,8 @@ public class HoodieMetrics {
 
   public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) {
     if (config.isMetricsOn()) {
-      LOG.info("Sending rollback metrics (duration={}, numFilesDeleted={})", durationInMs, numFilesDeleted);
+      LOG.info(
+          String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
       Metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
       Metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
     }
@@ -163,7 +164,8 @@ public class HoodieMetrics {
 
   public void updateCleanMetrics(long durationInMs, int numFilesDeleted) {
     if (config.isMetricsOn()) {
-      LOG.info("Sending clean metrics (duration={}, numFilesDeleted={})", durationInMs, numFilesDeleted);
+      LOG.info(
+          String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
       Metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
       Metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
     }
@@ -171,7 +173,8 @@ public class HoodieMetrics {
 
   public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) {
     if (config.isMetricsOn()) {
-      LOG.info("Sending finalize write metrics (duration={}, numFilesFinalized={})", durationInMs, numFilesFinalized);
+      LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs,
+          numFilesFinalized));
       Metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
       Metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
     }
@@ -179,7 +182,7 @@ public class HoodieMetrics {
 
   public void updateIndexMetrics(final String action, final long durationInMs) {
     if (config.isMetricsOn()) {
-      LOG.info("Sending index metrics ({}.duration, {})", action, durationInMs);
+      LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs));
       Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
     }
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
index a3e95fe..2559a4b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
@@ -22,8 +22,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 
 import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import javax.management.remote.JMXConnectorServer;
 import javax.management.remote.JMXConnectorServerFactory;
@@ -38,7 +38,7 @@ import java.rmi.registry.LocateRegistry;
  */
 public class JmxMetricsReporter extends MetricsReporter {
 
-  private static final Logger LOG = LoggerFactory.getLogger(JmxMetricsReporter.class);
+  private static final Logger LOG = LogManager.getLogger(JmxMetricsReporter.class);
   private final JMXConnectorServer connector;
 
   public JmxMetricsReporter(HoodieWriteConfig config) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
index 4d00684..4b19441 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -24,8 +24,8 @@ import org.apache.hudi.exception.HoodieException;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.io.Closeables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.Closeable;
 
@@ -33,7 +33,7 @@ import java.io.Closeable;
  * This is the main class of the metrics system.
  */
 public class Metrics {
-  private static final Logger LOG = LoggerFactory.getLogger(Metrics.class);
+  private static final Logger LOG = LogManager.getLogger(Metrics.class);
 
   private static volatile boolean initialized = false;
   private static Metrics metrics = null;
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java
index bb33a97..aac6c70 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java
@@ -24,8 +24,8 @@ import com.codahale.metrics.MetricFilter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.graphite.Graphite;
 import com.codahale.metrics.graphite.GraphiteReporter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.Closeable;
 import java.net.InetSocketAddress;
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class MetricsGraphiteReporter extends MetricsReporter {
 
-  private static final Logger LOG = LoggerFactory.getLogger(MetricsGraphiteReporter.class);
+  private static final Logger LOG = LogManager.getLogger(MetricsGraphiteReporter.class);
   private final MetricRegistry registry;
   private final GraphiteReporter graphiteReporter;
   private final HoodieWriteConfig config;
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
index a80c1ef..b9d433d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
@@ -21,15 +21,15 @@ package org.apache.hudi.metrics;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import com.codahale.metrics.MetricRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 /**
  * Factory class for creating MetricsReporter.
  */
 public class MetricsReporterFactory {
 
-  private static final Logger LOG = LoggerFactory.getLogger(MetricsReporterFactory.class);
+  private static final Logger LOG = LogManager.getLogger(MetricsReporterFactory.class);
 
   public static MetricsReporter createReporter(HoodieWriteConfig config, MetricRegistry registry) {
     MetricsReporterType type = config.getMetricsReporterType();
@@ -45,7 +45,7 @@ public class MetricsReporterFactory {
         reporter = new JmxMetricsReporter(config);
         break;
       default:
-        LOG.error("Reporter type[{}] is not supported.", type);
+        LOG.error("Reporter type[" + type + "] is not supported.");
         break;
     }
     return reporter;
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index 1ccf026..f1f277b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -58,6 +58,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.hadoop.ParquetReader;
@@ -79,8 +81,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 /**
@@ -92,7 +92,7 @@ import scala.Tuple2;
  */
 public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieTable<T> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieCopyOnWriteTable.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieCopyOnWriteTable.class);
 
   public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc) {
     super(config, jsc);
@@ -130,7 +130,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     try {
       boolean deleteResult = fs.delete(deletePath, false);
       if (deleteResult) {
-        LOG.debug("Cleaned file at path : {}", deletePath);
+        LOG.debug("Cleaned file at path :" + deletePath);
       }
       return deleteResult;
     } catch (FileNotFoundException fio) {
@@ -172,7 +172,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
       throws IOException {
     // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
     if (!recordItr.hasNext()) {
-      LOG.info("Empty partition with fileId => {}", fileId);
+      LOG.info("Empty partition with fileId => " + fileId);
       return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
     }
     // these are updates
@@ -212,8 +212,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
 
     // TODO(vc): This needs to be revisited
     if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
-      LOG.info("Upsert Handle has partition path as null {}, {}", upsertHandle.getOldFilePath(),
-              upsertHandle.getWriteStatus());
+      LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+          + upsertHandle.getWriteStatus());
     }
     return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
   }
@@ -291,7 +291,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
         LOG.info("Nothing to clean here. It is already clean");
         return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
       }
-      LOG.info("Total Partitions to clean : {}, with policy {}", partitionsToClean.size(), config.getCleanerPolicy());
+      LOG.info(
+          "Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
       int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
       LOG.info("Using cleanerParallelism: " + cleanerParallelism);
 
@@ -317,7 +318,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     int cleanerParallelism = Math.min(
         (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
         config.getCleanerParallelism());
-    LOG.info("Using cleanerParallelism: {}", cleanerParallelism);
+    LOG.info("Using cleanerParallelism: " + cleanerParallelism);
     List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc
         .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream()
             .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y)))
@@ -353,7 +354,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     HoodieActiveTimeline activeTimeline = this.getActiveTimeline();
 
     if (instant.isCompleted()) {
-      LOG.info("Unpublishing instant {}", instant);
+      LOG.info("Unpublishing instant " + instant);
       instant = activeTimeline.revertToInflight(instant);
     }
 
@@ -363,7 +364,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
       String commit = instant.getTimestamp();
 
       // delete all the data files for this commit
-      LOG.info("Clean out all parquet files generated for commit: {}", commit);
+      LOG.info("Clean out all parquet files generated for commit: " + commit);
       List<RollbackRequest> rollbackRequests = generateRollbackRequests(instant);
 
       //TODO: We need to persist this as rollback workload and use it in case of partial failures
@@ -371,7 +372,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     }
     // Delete Inflight instant if enabled
     deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, instant);
-    LOG.info("Time(in ms) taken to finish rollback {}", (System.currentTimeMillis() - startTime));
+    LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
     return stats;
   }
 
@@ -397,7 +398,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
 
     // Remove the rolled back inflight commits
     if (deleteInstant) {
-      LOG.info("Deleting instant={}", instantToBeDeleted);
+      LOG.info("Deleting instant=" + instantToBeDeleted);
       activeTimeline.deletePending(instantToBeDeleted);
       if (instantToBeDeleted.isInflight() && !metaClient.getTimelineLayoutVersion().isNullVersion()) {
         // Delete corresponding requested instant
@@ -405,9 +406,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
             instantToBeDeleted.getTimestamp());
         activeTimeline.deletePending(instantToBeDeleted);
       }
-      LOG.info("Deleted pending commit {}", instantToBeDeleted);
+      LOG.info("Deleted pending commit " + instantToBeDeleted);
     } else {
-      LOG.warn("Rollback finished without deleting inflight instant file. Instant={}", instantToBeDeleted);
+      LOG.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
     }
   }
 
@@ -575,10 +576,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
       assignUpdates(profile);
       assignInserts(profile);
 
-      LOG.info("Total Buckets :{}, buckets info => {}, \n"
-              + "Partition to insert buckets => {}, \n"
-              + "UpdateLocations mapped to buckets =>{}",
-              totalBuckets, bucketInfoMap, partitionPathToInsertBuckets, updateLocationToBucket);
+      LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
+          + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
+          + "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
     }
 
     private void assignUpdates(WorkloadProfile profile) {
@@ -606,13 +606,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
       long averageRecordSize =
           averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
               config.getCopyOnWriteRecordSizeEstimate());
-      LOG.info("AvgRecordSize => {}", averageRecordSize);
+      LOG.info("AvgRecordSize => " + averageRecordSize);
       for (String partitionPath : partitionPaths) {
         WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
         if (pStat.getNumInserts() > 0) {
 
           List<SmallFile> smallFiles = getSmallFiles(partitionPath);
-          LOG.info("For partitionPath : {} Small Files => {}", partitionPath, smallFiles);
+          LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
 
           long totalUnassignedInserts = pStat.getNumInserts();
           List<Integer> bucketNumbers = new ArrayList<>();
@@ -627,10 +627,10 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
               int bucket;
               if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
                 bucket = updateLocationToBucket.get(smallFile.location.getFileId());
-                LOG.info("Assigning {} inserts to existing update bucket {}", recordsToAppend, bucket);
+                LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
               } else {
                 bucket = addUpdateBucket(smallFile.location.getFileId());
-                LOG.info("Assigning {} inserts to new update bucket {}", recordsToAppend, bucket);
+                LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
               }
               bucketNumbers.add(bucket);
               recordsPerBucket.add(recordsToAppend);
@@ -646,8 +646,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
             }
 
             int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
-            LOG.info("After small file assignment: unassignedInserts => {}, totalInsertBuckets => {}, "
-                    + "recordsPerBucket => {}", totalUnassignedInserts, insertBuckets, insertRecordsPerBucket);
+            LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts
+                + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
             for (int b = 0; b < insertBuckets; b++) {
               bucketNumbers.add(totalBuckets);
               recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
@@ -667,7 +667,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
             bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
             insertBuckets.add(bkt);
           }
-          LOG.info("Total insert buckets for partition path {} => {}", partitionPath, insertBuckets);
+          LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
           partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
         }
       }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index 8845772..a654fcb 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -44,11 +44,11 @@ import org.apache.hudi.io.compact.HoodieRealtimeTableCompactor;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -77,7 +77,7 @@ import java.util.stream.Collectors;
  */
 public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends HoodieCopyOnWriteTable<T> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeOnReadTable.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class);
 
   // UpsertPartitioner for MergeOnRead table type
   private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner;
@@ -98,10 +98,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
   @Override
   public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, Iterator<HoodieRecord<T>> recordItr)
       throws IOException {
-    LOG.info("Merging updates for commit {} for file {}", commitTime, fileId);
+    LOG.info("Merging updates for commit " + commitTime + " for file " + fileId);
 
     if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) {
-      LOG.info("Small file corrections for updates for commit {} for file {}", commitTime, fileId);
+      LOG.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId);
       return super.handleUpdate(commitTime, fileId, recordItr);
     } else {
       HoodieAppendHandle<T> appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr);
@@ -124,7 +124,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
 
   @Override
   public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) {
-    LOG.info("Checking if compaction needs to be run on {}", config.getBasePath());
+    LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
     Option<HoodieInstant> lastCompaction =
         getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
     String deltaCommitsSinceTs = "0";
@@ -135,12 +135,13 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
     int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline()
         .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants();
     if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) {
-      LOG.info("Not running compaction as only {} delta commits was found since last compaction {}. Waiting for {}",
-              deltaCommitsSinceLastCompaction, deltaCommitsSinceTs, config.getInlineCompactDeltaCommitMax());
+      LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction
+          + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for "
+          + config.getInlineCompactDeltaCommitMax());
       return new HoodieCompactionPlan();
     }
 
-    LOG.info("Compacting merge on read table {}", config.getBasePath());
+    LOG.info("Compacting merge on read table " + config.getBasePath());
     HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor();
     try {
       return compactor.generateCompactionPlan(jsc, this, config, instantTime,
@@ -170,11 +171,11 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
     long startTime = System.currentTimeMillis();
 
     String commit = instant.getTimestamp();
-    LOG.error("Rolling back instant {}", instant);
+    LOG.error("Rolling back instant " + instant);
 
     // Atomically un-publish all non-inflight commits
     if (instant.isCompleted()) {
-      LOG.error("Un-publishing instant {}, deleteInstants={}", instant, deleteInstants);
+      LOG.error("Un-publishing instant " + instant + ", deleteInstants=" + deleteInstants);
       instant = this.getActiveTimeline().revertToInflight(instant);
     }
 
@@ -190,7 +191,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
     // For Requested State (like failure during index lookup), there is nothing to do rollback other than
     // deleting the timeline file
     if (!instant.isRequested()) {
-      LOG.info("Unpublished {}", commit);
+      LOG.info("Unpublished " + commit);
       List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instant);
       // TODO: We need to persist this as rollback workload and use it in case of partial failures
       allRollbackStats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests);
@@ -199,7 +200,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
     // Delete Inflight instants if enabled
     deleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant);
 
-    LOG.info("Time(in ms) taken to finish rollback {}", (System.currentTimeMillis() - startTime));
+    LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
 
     return allRollbackStats;
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 5bb0ffa..d2f5715 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -52,11 +52,11 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -73,7 +73,7 @@ import java.util.stream.Stream;
  */
 public abstract class HoodieTable<T extends HoodieRecordPayload> implements Serializable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HoodieTable.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieTable.class);
 
   protected final HoodieWriteConfig config;
   protected final HoodieTableMetaClient metaClient;
@@ -324,7 +324,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
       Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
       if (fs.exists(markerDir)) {
         // For append only case, we do not write to marker dir. Hence, the above check
-        LOG.info("Removing marker directory={}", markerDir);
+        LOG.info("Removing marker directory=" + markerDir);
         fs.delete(markerDir, true);
       }
     } catch (IOException ioe) {
@@ -363,7 +363,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
       invalidDataPaths.removeAll(validDataPaths);
       if (!invalidDataPaths.isEmpty()) {
         LOG.info(
-            "Removing duplicate data files created due to spark retries before committing. Paths={}", invalidDataPaths);
+            "Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths);
       }
 
       Map<String, List<Pair<String, String>>> groupByPartition = invalidDataPaths.stream()
@@ -381,7 +381,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
         jsc.parallelize(new ArrayList<>(groupByPartition.values()), config.getFinalizeWriteParallelism())
             .map(partitionWithFileList -> {
               final FileSystem fileSystem = metaClient.getFs();
-              LOG.info("Deleting invalid data files={}", partitionWithFileList);
+              LOG.info("Deleting invalid data files=" + partitionWithFileList);
               if (partitionWithFileList.isEmpty()) {
                 return true;
               }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java
index 236f440..0f3297c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/RollbackExecutor.java
@@ -36,6 +36,8 @@ import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
@@ -46,8 +48,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 /**
@@ -55,7 +55,7 @@ import scala.Tuple2;
  */
 public class RollbackExecutor implements Serializable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(RollbackExecutor.class);
+  private static final Logger LOG = LogManager.getLogger(RollbackExecutor.class);
 
   private final HoodieTableMetaClient metaClient;
   private final HoodieWriteConfig config;
@@ -179,13 +179,13 @@ public class RollbackExecutor implements Serializable {
    */
   private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
       Map<FileStatus, Boolean> results, String partitionPath, PathFilter filter) throws IOException {
-    LOG.info("Cleaning path {}", partitionPath);
+    LOG.info("Cleaning path " + partitionPath);
     FileSystem fs = metaClient.getFs();
     FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
     for (FileStatus file : toBeDeleted) {
       boolean success = fs.delete(file.getPath(), false);
       results.put(file, success);
-      LOG.info("Delete file {} \t {}", file.getPath(), success);
+      LOG.info("Delete file " + file.getPath() + "\t" + success);
     }
     return results;
   }
@@ -195,7 +195,7 @@ public class RollbackExecutor implements Serializable {
    */
   private Map<FileStatus, Boolean> deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
       Map<FileStatus, Boolean> results, String commit, String partitionPath) throws IOException {
-    LOG.info("Cleaning path {}", partitionPath);
+    LOG.info("Cleaning path " + partitionPath);
     FileSystem fs = metaClient.getFs();
     PathFilter filter = (path) -> {
       if (path.toString().contains(".parquet")) {
@@ -208,7 +208,7 @@ public class RollbackExecutor implements Serializable {
     for (FileStatus file : toBeDeleted) {
       boolean success = fs.delete(file.getPath(), false);
       results.put(file, success);
-      LOG.info("Delete file {} \t {}", file.getPath(), success);
+      LOG.info("Delete file " + file.getPath() + "\t" + success);
     }
     return results;
   }


Mime
View raw message