eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [31/52] [abbrv] incubator-eagle git commit: [EAGLE-502] Always retry to parse spark history job when failure happens.
Date Wed, 07 Sep 2016 17:42:27 GMT
[EAGLE-502] Always retry to parse spark history job when failure happens.

Author: pkuwm <ihuizhi.lu@gmail.com>

Closes #390 from pkuwm/EAGLE-502.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3f7004f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3f7004f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3f7004f1

Branch: refs/heads/master
Commit: 3f7004f1cc939ba2e633e38248c322d2216e48a9
Parents: e778775
Author: pkuwm <ihuizhi.lu@gmail.com>
Authored: Fri Sep 2 10:51:31 2016 +0800
Committer: Qingwen Zhao <qingwen220@gmail.com>
Committed: Fri Sep 2 10:51:31 2016 +0800

----------------------------------------------------------------------
 .../jpm/spark/crawl/JHFSparkEventReader.java    |  6 +--
 .../status/JobHistoryZKStateManager.java        | 26 ++++--------
 .../history/storm/SparkHistoryJobSpout.java     | 44 ++++++--------------
 .../src/main/resources/application.conf         |  2 +-
 .../jpm/spark/running/SparkRunningJobApp.java   |  2 +-
 .../storm/SparkRunningJobFetchSpout.java        |  4 +-
 .../running/storm/SparkRunningJobParseBolt.java |  1 +
 .../jpm/util/resourcefetch/ResourceFetcher.java |  2 +-
 .../SparkHistoryServerResourceFetcher.java      |  6 +--
 .../hbase/HbaseMetadataBrowseWebResource.java   | 16 ++++---
 10 files changed, 41 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
index fe02da5..6c68b48 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
@@ -17,14 +17,14 @@
 
 package org.apache.eagle.jpm.spark.crawl;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.eagle.jpm.spark.entity.*;
 import org.apache.eagle.jpm.util.*;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
@@ -353,7 +353,7 @@ public class JHFSparkEventReader {
         stage.setCompleteTime(completeTime);
         this.lastEventTime = completeTime;
 
-        if (stageInfo.containsKey("Failure Reason")) {
+        if (stageInfo != null && stageInfo.containsKey("Failure Reason")) {
             stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString());
         } else {
             stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 7a95e56..9fafc1f 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -35,10 +35,11 @@ import java.util.Iterator;
 import java.util.List;
 
 public class JobHistoryZKStateManager {
-    public static final Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class);
+    private final static Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class);
+
+    private final static String START_TIMESTAMP = "lastAppTime";
     private String zkRoot;
     private CuratorFramework curator;
-    private static String START_TIMESTAMP = "lastAppTime";
 
     private CuratorFramework newCurator(SparkHistoryJobAppConfig config) throws Exception
{
         return CuratorFrameworkFactory.newClient(
@@ -72,7 +73,7 @@ public class JobHistoryZKStateManager {
         InterProcessLock lock = new InterProcessReadWriteLock(curator,jobPath).writeLock();
         try {
             lock.acquire();
-            Iterator<String> iter =  curator.getChildren().forPath(jobPath).iterator();
+            Iterator<String> iter = curator.getChildren().forPath(jobPath).iterator();
             while (iter.hasNext()) {
                 String appId = iter.next();
                 String path = jobPath + "/" + appId;
@@ -104,9 +105,7 @@ public class JobHistoryZKStateManager {
         InterProcessLock lock = new InterProcessReadWriteLock(curator,jobPath).writeLock();
         try {
             lock.acquire();
-            Iterator<String> iter =  curator.getChildren().forPath(jobPath).iterator();
-            while (iter.hasNext()) {
-                String appId = iter.next();
+            (curator.getChildren().forPath(jobPath)).forEach(appId -> {
                 String path = jobPath + "/" + appId;
                 try {
                     if (curator.checkExists().forPath(path) != null) {
@@ -119,7 +118,7 @@ public class JobHistoryZKStateManager {
                     LOG.error("fail to read unprocessed job", e);
                     throw new RuntimeException(e);
                 }
-            }
+            });
 
         } catch (Exception e) {
             LOG.error("fail to read unprocessed jobs", e);
@@ -174,10 +173,7 @@ public class JobHistoryZKStateManager {
     public boolean hasApplication(String appId) {
         String path = zkRoot + "/jobs/" + appId;
         try {
-            if (curator.checkExists().forPath(path) != null) {
-                return true;
-            }
-            return false;
+            return curator.checkExists().forPath(path) != null;
         } catch (Exception e) {
             LOG.error("fail to check whether application exists", e);
             throw new RuntimeException(e);
@@ -191,7 +187,7 @@ public class JobHistoryZKStateManager {
                 curator.delete().deletingChildrenIfNeeded().forPath(path);
             }
 
-            name = name.replace("/","_");
+            name = name.replace("/", "_");
             if (name.length() > 50) {
                 name = name.substring(0, 50);
             }
@@ -226,7 +222,6 @@ public class JobHistoryZKStateManager {
     }
 
     public void updateApplicationStatus(String appId, Enum<ZKStateConstant.AppStatus>
status) {
-
         String path = zkRoot + "/jobs/" + appId ;
         InterProcessLock lock = new InterProcessReadWriteLock(curator,zkRoot + "/jobs").readLock();
         try {
@@ -238,8 +233,7 @@ public class JobHistoryZKStateManager {
                     curator.setData().forPath(path, status.toString().getBytes("UTF-8"));
                 }
             } else {
-                String errorMsg = String.format("fail to update for application with path
%s", path);
-                LOG.error(errorMsg);
+                LOG.error("Failed to update for application with path: " + path);
             }
         } catch (Exception e) {
             LOG.error("fail to update application status", e);
@@ -252,8 +246,6 @@ public class JobHistoryZKStateManager {
             } catch (Exception e) {
                 LOG.error("fail to release lock",e);
             }
-
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
index db60744..5602b4c 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
@@ -19,6 +19,13 @@
 
 package org.apache.eagle.jpm.spark.history.storm;
 
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
 import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
 import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
@@ -26,16 +33,10 @@ import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
 import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
 import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.util.Calendar;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -46,9 +47,6 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
     private SparkHistoryJobAppConfig config;
     private ResourceFetcher rmFetch;
     private long lastFinishAppTime = 0;
-    private Map<String, Integer> failTimes;
-
-    private static final int FAIL_MAX_TIMES = 5;
 
     public SparkHistoryJobSpout(SparkHistoryJobAppConfig config) {
         this.config = config;
@@ -57,7 +55,6 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
     @Override
     public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector)
{
         rmFetch = new RMResourceFetcher(config.jobHistoryConfig.rms);
-        this.failTimes = new HashMap<>();
         this.collector = spoutOutputCollector;
         this.zkState = new JobHistoryZKStateManager(config);
         this.lastFinishAppTime = zkState.readLastFinishedTimestamp();
@@ -75,7 +72,6 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
             LOG.info("Last finished time = {}", calendar.getTime());
             if (fetchTime - this.lastFinishAppTime > this.config.stormConfig.spoutCrawlInterval)
{
                 List<AppInfo> appInfos = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB,
Long.toString(lastFinishAppTime));
-                //List<AppInfo> appInfos = (null != apps ? (List<AppInfo>)apps.get(0):new
ArrayList<AppInfo>());
                 if (appInfos != null) {
                     LOG.info("Get " + appInfos.size() + " from yarn resource manager.");
                     for (AppInfo app : appInfos) {
@@ -120,29 +116,15 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
 
     @Override
     public void fail(Object msgId) {
-        String appId = (String) msgId;
-        int failTimes = 0;
-        if (this.failTimes.containsKey(appId)) {
-            failTimes = this.failTimes.get(appId);
-        }
-        failTimes++;
-        if (failTimes >= FAIL_MAX_TIMES) {
-            this.failTimes.remove(appId);
-            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
-            LOG.error(String.format("Application %s has failed for over %s times, drop it.",
appId, FAIL_MAX_TIMES));
-        } else {
-            this.failTimes.put(appId, failTimes);
-            collector.emit(new Values(appId), appId);
-            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
-        }
+        // Sleep 3 seconds and retry.
+        Utils.sleep(3000);
+
+        collector.emit(new Values(msgId), msgId);
+        zkState.updateApplicationStatus((String)msgId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
     }
 
     @Override
     public void ack(Object msgId) {
-        String appId = (String) msgId;
-        if (this.failTimes.containsKey(appId)) {
-            this.failTimes.remove(appId);
-        }
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 289c6f7..483e2e9 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -17,7 +17,7 @@
 {
   "basic":{
     "cluster":"sandbox",
-    "datacenter":"sandbox",
+    "dataCenter":"sandbox",
     jobConf.additional.info: ""
   },
   "eagleProps":{

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
index 5e21406..2ee2a04 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
@@ -65,4 +65,4 @@ public class SparkRunningJobApp extends StormApplication {
 
         return topologyBuilder.createTopology();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
index 7162bac..76c7815 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
@@ -67,6 +67,7 @@ public class SparkRunningJobFetchSpout extends BaseRichSpout {
         this.sparkRunningJobManager = new SparkRunningJobManager(zkStateConfig);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void nextTuple() {
         LOG.info("Start to fetch spark running jobs");
@@ -154,8 +155,7 @@ public class SparkRunningJobFetchSpout extends BaseRichSpout {
         //content of path /apps/spark/running/yarnAppId/appId is SparkAppEntity(current attempt)
         //as we know, a yarn application may contains many spark applications
         //so, the returned results is a Map, key is yarn appId
-        Map<String, Map<String, SparkAppEntity>> result = this.sparkRunningJobManager.recover();
-        return result;
+        return this.sparkRunningJobManager.recover();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
index a497e29..9c0ffef 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -71,6 +71,7 @@ public class SparkRunningJobParseBolt extends BaseRichBolt {
         this.resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void execute(Tuple tuple) {
         AppInfo appInfo = (AppInfo)tuple.getValue(1);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ResourceFetcher.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ResourceFetcher.java
index f920ddb..4999315 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ResourceFetcher.java
@@ -22,6 +22,6 @@ import java.util.List;
 
 public interface ResourceFetcher<T> {
     //continue to refactor later
-    List<T> getResource(Constants.ResourceType resoureType, Object... parameter) throws
Exception;
+    List<T> getResource(Constants.ResourceType resourceType, Object... parameter) throws
Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
index ce2d9b8..d9390c1 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
@@ -77,12 +77,12 @@ public class SparkHistoryServerResourceFetcher implements ResourceFetcher<SparkA
         }
     }
 
-    public List<SparkApplication> getResource(Constants.ResourceType resoureType, Object...
parameter) throws Exception {
-        switch (resoureType) {
+    public List<SparkApplication> getResource(Constants.ResourceType resourceType,
Object... parameter) throws Exception {
+        switch (resourceType) {
             case SPARK_JOB_DETAIL:
                 return doFetchSparkApplicationDetail((String) parameter[0]);
             default:
-                throw new Exception("Not support resourceType :" + resoureType);
+                throw new Exception("Not support resourceType :" + resourceType);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
index 68ea552..69be2e5 100644
--- a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
+++ b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
@@ -73,7 +73,7 @@ public class HbaseMetadataBrowseWebResource {
     private Configuration convert(Map<String, Object> originalConfig) throws Exception
{
         Configuration config = new Configuration();
         for (Map.Entry<String, Object> entry : originalConfig.entrySet()) {
-            config.set(entry.getKey().toString(), entry.getValue().toString());
+            config.set(entry.getKey(), entry.getValue().toString());
         }
         return config;
     }
@@ -125,12 +125,11 @@ public class HbaseMetadataBrowseWebResource {
             response.setException(EagleExceptionWrapper.wrap(ex));
         }
         if(tables != null) {
-            String resource = null;
             for (String table : tables) {
-                resource = String.format("%s:%s", namespace, table);
+                String resource = String.format("%s:%s", namespace, table);
                 Set<String> childSensitiveTypes = new HashSet<>();
-                String senstiveType = checkSensitivity(site, resource, childSensitiveTypes);
-                values.add(new HbaseResourceEntity(resource, namespace, table, null, senstiveType,
childSensitiveTypes));
+                String sensitiveType = checkSensitivity(site, resource, childSensitiveTypes);
+                values.add(new HbaseResourceEntity(resource, namespace, table, null, sensitiveType,
childSensitiveTypes));
             }
         }
         response.setObj(values);
@@ -157,12 +156,11 @@ public class HbaseMetadataBrowseWebResource {
             response.setException(EagleExceptionWrapper.wrap(ex));
         }
         if(columns != null) {
-            String resource = null;
             for (String col : columns) {
-                resource = String.format("%s:%s:%s", namespace, table, col);
+                String resource = String.format("%s:%s:%s", namespace, table, col);
                 Set<String> childSensitiveTypes = new HashSet<>();
-                String senstiveType = checkSensitivity(site, resource, childSensitiveTypes);
-                values.add(new HbaseResourceEntity(resource, namespace, table, col, senstiveType,
childSensitiveTypes));
+                String sensitiveType = checkSensitivity(site, resource, childSensitiveTypes);
+                values.add(new HbaseResourceEntity(resource, namespace, table, col, sensitiveType,
childSensitiveTypes));
             }
         }
         response.setObj(values);


Mime
View raw message