chukwa-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject [02/11] chukwa git commit: CHUKWA-797. Added retry logic for sending data to HBase. (Eric Yang)
Date Sat, 19 Mar 2016 17:22:48 GMT
CHUKWA-797. Added retry logic for sending data to HBase.  (Eric Yang)


Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo
Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/fc706655
Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/fc706655
Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/fc706655

Branch: refs/heads/master
Commit: fc70665574b7b54de583cd224552f032d8706633
Parents: 3f5bb3c
Author: Eric Yang <eyang@apache.org>
Authored: Sat Mar 19 08:58:18 2016 -0700
Committer: Eric Yang <eyang@apache.org>
Committed: Sat Mar 19 08:58:18 2016 -0700

----------------------------------------------------------------------
 .../connector/PipelineConnector.java              | 18 +++++++++---------
 .../datacollection/writer/hbase/HBaseWriter.java  | 11 ++++++++++-
 2 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/chukwa/blob/fc706655/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
index 929d871..bcb167a 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
@@ -109,18 +109,20 @@ public class PipelineConnector implements Connector, Runnable {
         try {
           // get all ready chunks from the chunkQueue to be sent
           chunkQueue.collect(newQueue, MAX_SIZE_PER_POST);
+          CommitStatus result = writers.add(newQueue);
+          if(result.equals(ChukwaWriter.COMMIT_OK)) {
+            chunkCount = newQueue.size();
+            for (Chunk c : newQueue) {
+              agent.reportCommit(c.getInitiator(), c.getSeqID());
+            }          
+          }
+        } catch (WriterException e) {
+          log.warn("PipelineStageWriter Exception: ", e);
         } catch (InterruptedException e) {
           log.warn("thread interrupted during addChunks(ChunkQueue)");
           Thread.currentThread().interrupt();
           break;
         }
-        CommitStatus result = writers.add(newQueue);
-        if(result.equals(ChukwaWriter.COMMIT_OK)) {
-          chunkCount = newQueue.size();
-          for (Chunk c : newQueue) {
-            agent.reportCommit(c.getInitiator(), c.getSeqID());
-          }          
-        }
         long now = System.currentTimeMillis();
         long delta = MIN_POST_INTERVAL - now + lastPost;
         if(delta > 0) {
@@ -129,8 +131,6 @@ public class PipelineConnector implements Connector, Runnable {
         lastPost = now;
       } // end of try forever loop
       log.info("received stop() command so exiting run() loop to shutdown connector");
-    } catch (WriterException e) {
-      log.warn("PipelineStageWriter Exception: ", e);
     } catch (OutOfMemoryError e) {
       log.warn("Bailing out", e);
       throw new RuntimeException("Shutdown pipeline connector.");

http://git-wip-us.apache.org/repos/asf/chukwa/blob/fc706655/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
index 621f22e..5ba87bd 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
@@ -54,6 +54,7 @@ public class HBaseWriter extends PipelineableWriter {
   private ArrayList<Put> output;
   private Reporter reporter;
   private ChukwaConfiguration conf;
+  private Configuration hconf;
   String defaultProcessor;
   private static Connection connection;
   
@@ -92,6 +93,7 @@ public class HBaseWriter extends PipelineableWriter {
   private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf)
throws IOException {
     this.reportStats = reportStats;
     this.conf = conf;
+    this.hconf = hconf;
     this.statTimer = new Timer();
     this.defaultProcessor = conf.get(
       "chukwa.demux.mapper.default.processor",
@@ -106,7 +108,7 @@ public class HBaseWriter extends PipelineableWriter {
     } catch (NoSuchAlgorithmException e) {
       throw new IOException("Can not register hashing algorithm.");
     }
-    if (connection == null) {
+    if (connection == null || connection.isClosed()) {
       connection = ConnectionFactory.createConnection(hconf);
     }
   }
@@ -118,6 +120,13 @@ public class HBaseWriter extends PipelineableWriter {
   }
 
   public void init(Configuration conf) throws WriterException {
+    if (connection == null || connection.isClosed()) {
+      try {
+        connection = ConnectionFactory.createConnection(hconf);
+      } catch (IOException e) {
+        throw new WriterException("HBase is offline, retry later...");
+      }
+    }
   }
 
   @Override


Mime
View raw message