falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject falcon git commit: FALCON-2075 Falcon HiveDR tasks do not report progress and can get killed
Date Tue, 19 Jul 2016 20:23:33 GMT
Repository: falcon
Updated Branches:
  refs/heads/0.10 de422a2f2 -> ef00c3e41


FALCON-2075 Falcon HiveDR tasks do not report progress and can get killed

Author: Venkat Ranganathan <venkat@hortonworks.com>

Reviewers: "Praveen Adlakha <adlakha.praveen@gmail.com>, Balu Vellanki <balu@apache.org>"

Closes #230 from vrangan/FALCON-2075

(cherry picked from commit ec4a273a8776ebdd1c50e062cad46c981b1d0122)
Signed-off-by: bvellanki <bvellanki@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ef00c3e4
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ef00c3e4
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ef00c3e4

Branch: refs/heads/0.10
Commit: ef00c3e41a041235f834d2f46ce5b3ff172dc60f
Parents: de422a2
Author: Venkat Ranganathan <venkat@hortonworks.com>
Authored: Tue Jul 19 13:23:20 2016 -0700
Committer: bvellanki <bvellanki@hortonworks.com>
Committed: Tue Jul 19 13:23:28 2016 -0700

----------------------------------------------------------------------
 .../apache/falcon/hive/mapreduce/CopyMapper.java    | 14 ++++++++++++--
 .../apache/falcon/hive/mapreduce/CopyReducer.java   | 16 +++++++++++++++-
 2 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/ef00c3e4/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
index e2297ef..5cd7e74 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Map class for Hive DR.
@@ -40,6 +42,7 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text>
{
 
     private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
     private EventUtils eventUtils;
+    ScheduledThreadPoolExecutor timer;
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
@@ -54,15 +57,22 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text>
{
 
     @Override
     protected void map(LongWritable key, Text value,
-                       Context context) throws IOException, InterruptedException {
+                       final Context context) throws IOException, InterruptedException {
         LOG.debug("Processing Event value: {}", value.toString());
-
+        timer = new ScheduledThreadPoolExecutor(1);
+        timer.scheduleAtFixedRate(new Runnable() {
+            public void run() {
+                System.out.println("Hive DR copy mapper progress heart beat");
+                context.progress();
+            }
+        }, 0, 30, TimeUnit.SECONDS);
         try {
             eventUtils.processEvents(value.toString());
         } catch (Exception e) {
             LOG.error("Exception in processing events:", e);
             throw new IOException(e);
         } finally {
+            timer.shutdownNow();
             cleanup(context);
         }
         List<ReplicationStatus> replicationStatusList = eventUtils.getListReplicationStatus();

http://git-wip-us.apache.org/repos/asf/falcon/blob/ef00c3e4/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
index 50cb4b2..f4bb31c 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
@@ -35,12 +35,15 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Reducer class for Hive DR.
  */
 public class CopyReducer extends Reducer<Text, Text, Text, Text> {
     private DRStatusStore hiveDRStore;
+    private ScheduledThreadPoolExecutor timer;
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
@@ -62,9 +65,18 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text>
{
     }
 
     @Override
-    protected void reduce(Text key, Iterable<Text> values, Context context) throws
IOException, InterruptedException {
+    protected void reduce(Text key, Iterable<Text> values, final Context context)
+            throws IOException, InterruptedException {
         List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>();
         ReplicationStatus rs;
+        timer = new ScheduledThreadPoolExecutor(1);
+        timer.scheduleAtFixedRate(new Runnable() {
+            public void run() {
+                System.out.println("Hive DR copy reducer progress heart beat");
+                context.progress();
+            }
+        }, 0, 30, TimeUnit.SECONDS);
+
         try {
             for (Text value : values) {
                 String[] fields = (value.toString()).split("\t");
@@ -76,6 +88,8 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text> {
             hiveDRStore.updateReplicationStatus(key.toString(), sortStatusList(replStatusList));
         } catch (HiveReplicationException e) {
             throw new IOException(e);
+        } finally {
+            timer.shutdownNow();
         }
     }
 


Mime
View raw message