kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [1/3] kylin git commit: KYLIN-2170 fix cleanup() in mapper and reducer [Forced Update!]
Date Tue, 08 Nov 2016 12:30:42 GMT
Repository: kylin
Updated Branches:
  refs/heads/yang21-hbase1.x a3fd7d5e4 -> 3bfe52498 (forced update)


KYLIN-2170 fix cleanup() in mapper and reducer


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3b07c26a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3b07c26a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3b07c26a

Branch: refs/heads/yang21-hbase1.x
Commit: 3b07c26a0e633d3d367bce52cd7496cd1e860d0f
Parents: 1720f1e
Author: Li Yang <liyang@apache.org>
Authored: Tue Nov 8 18:45:59 2016 +0800
Committer: Li Yang <liyang@apache.org>
Committed: Tue Nov 8 18:46:21 2016 +0800

----------------------------------------------------------------------
 .../mr/steps/FactDistinctColumnsReducer.java    | 39 ++++++------
 .../mr/steps/FactDistinctHiveColumnsMapper.java | 34 ++++++-----
 .../engine/mr/steps/InMemCuboidMapper.java      | 24 +++++---
 .../steps/RowKeyDistributionCheckerMapper.java  | 12 ++--
 .../cardinality/ColumnCardinalityMapper.java    | 22 ++++---
 .../cardinality/ColumnCardinalityReducer.java   | 39 ++++++------
 .../hbase/steps/RangeKeyDistributionMapper.java | 10 ++-
 .../steps/RangeKeyDistributionReducer.java      | 64 +++++++++++---------
 8 files changed, 137 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index a7b2e56..a9c5d4b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -162,25 +162,28 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text,
Text, NullWri
 
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-
-        if (isStatistics == false) {
-            if (!outputTouched || colValues.size() > 0) {
-                outputDistinctValues(col, colValues, context);
-                colValues.clear();
-            }
-        } else {
-            //output the hll info;
-            long grandTotal = 0;
-            for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) {
-                grandTotal += hll.getCountEstimate();
+        try {
+            if (isStatistics == false) {
+                if (!outputTouched || colValues.size() > 0) {
+                    outputDistinctValues(col, colValues, context);
+                    colValues.clear();
+                }
+            } else {
+                //output the hll info;
+                long grandTotal = 0;
+                for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) {
+                    grandTotal += hll.getCountEstimate();
+                }
+                double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge
/ grandTotal;
+                
+                int mapperNumber = baseCuboidRowCountInMappers.size();
+    
+                writeMapperAndCuboidStatistics(context); // for human check
+                CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput),
//
+                        cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio);
             }
-            double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge
/ grandTotal;
-            
-            int mapperNumber = baseCuboidRowCountInMappers.size();
-
-            writeMapperAndCuboidStatistics(context); // for human check
-            CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput),
//
-                    cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio);
+        } catch (Throwable ex) {
+            logger.error("", ex);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 5e278f8..2154bc6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -158,22 +158,26 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
 
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-        if (collectStatistics) {
-            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-            // output each cuboid's hll to reducer, key is 0 - cuboidId
-            HyperLogLogPlusCounter hll;
-            for (int i = 0; i < cuboidIds.length; i++) {
-                hll = allCuboidsHLL[i];
-
-                keyBuffer.clear();
-                keyBuffer.put(MARK_FOR_HLL); // one byte
-                keyBuffer.putLong(cuboidIds[i]);
-                outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
-                hllBuf.clear();
-                hll.writeRegisters(hllBuf);
-                outputValue.set(hllBuf.array(), 0, hllBuf.position());
-                context.write(outputKey, outputValue);
+        try {
+            if (collectStatistics) {
+                ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+                // output each cuboid's hll to reducer, key is 0 - cuboidId
+                HyperLogLogPlusCounter hll;
+                for (int i = 0; i < cuboidIds.length; i++) {
+                    hll = allCuboidsHLL[i];
+    
+                    keyBuffer.clear();
+                    keyBuffer.put(MARK_FOR_HLL); // one byte
+                    keyBuffer.putLong(cuboidIds[i]);
+                    outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
+                    hllBuf.clear();
+                    hll.writeRegisters(hllBuf);
+                    outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                    context.write(outputKey, outputValue);
+                }
             }
+        } catch (Throwable ex) {
+            ex.printStackTrace();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 1d90d01..dac93cb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -133,18 +133,22 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN,
Object, ByteArr
     protected void cleanup(Context context) throws IOException, InterruptedException {
         logger.info("Totally handled " + counter + " records!");
 
-        while (!future.isDone()) {
-            if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS))
{
-                break;
-            }
-        }
-
         try {
-            future.get();
-        } catch (Exception e) {
-            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(),
e);
+            while (!future.isDone()) {
+                if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS))
{
+                    break;
+                }
+            }
+    
+            try {
+                future.get();
+            } catch (Exception e) {
+                throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(),
e);
+            }
+            queue.clear();
+        } catch (Throwable ex) {
+            logger.error("", ex);
         }
-        queue.clear();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
index fa2ff73..21e97a3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
@@ -77,10 +77,14 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text,
Text, Tex
 
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-        LongWritable outputValue = new LongWritable();
-        for (Entry<Text, Long> kv : resultMap.entrySet()) {
-            outputValue.set(kv.getValue());
-            context.write(kv.getKey(), outputValue);
+        try {
+            LongWritable outputValue = new LongWritable();
+            for (Entry<Text, Long> kv : resultMap.entrySet()) {
+                outputValue.set(kv.getValue());
+                context.write(kv.getKey(), outputValue);
+            }
+        } catch (Throwable ex) {
+            ex.printStackTrace();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index f27bee3..8c624e3 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -96,15 +96,19 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T,
Object, IntWritab
 
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-        Iterator<Integer> it = hllcMap.keySet().iterator();
-        ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-        while (it.hasNext()) {
-            int key = it.next();
-            HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            buf.clear();
-            hllc.writeRegisters(buf);
-            buf.flip();
-            context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
+        try {
+            Iterator<Integer> it = hllcMap.keySet().iterator();
+            ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+            while (it.hasNext()) {
+                int key = it.next();
+                HyperLogLogPlusCounter hllc = hllcMap.get(key);
+                buf.clear();
+                hllc.writeRegisters(buf);
+                buf.flip();
+                context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
+            }
+        } catch (Throwable ex) {
+            ex.printStackTrace();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
index 858d84c..2551af3 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
@@ -69,24 +69,27 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable,
BytesWri
 
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-        List<Integer> keys = new ArrayList<Integer>();
-        Iterator<Integer> it = hllcMap.keySet().iterator();
-        while (it.hasNext()) {
-            keys.add(it.next());
+        try {
+            List<Integer> keys = new ArrayList<Integer>();
+            Iterator<Integer> it = hllcMap.keySet().iterator();
+            while (it.hasNext()) {
+                keys.add(it.next());
+            }
+            Collections.sort(keys);
+            it = keys.iterator();
+            while (it.hasNext()) {
+                int key = it.next();
+                HyperLogLogPlusCounter hllc = hllcMap.get(key);
+                ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+                buf.clear();
+                hllc.writeRegisters(buf);
+                buf.flip();
+                context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
+                // context.write(new Text("ErrorRate_" + key), new
+                // LongWritable((long)hllc.getErrorRate()));
+            }
+        } catch (Throwable ex) {
+            ex.printStackTrace();
         }
-        Collections.sort(keys);
-        it = keys.iterator();
-        while (it.hasNext()) {
-            int key = it.next();
-            HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-            buf.clear();
-            hllc.writeRegisters(buf);
-            buf.flip();
-            context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
-            // context.write(new Text("ErrorRate_" + key), new
-            // LongWritable((long)hllc.getErrorRate()));
-        }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
index c2190fb..c82d58d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
@@ -62,9 +62,13 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text,
Text, Lo
 
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-        if (lastKey != null) {
-            outputValue.set(bytesRead);
-            context.write(lastKey, outputValue);
+        try {
+            if (lastKey != null) {
+                outputValue.set(bytesRead);
+                context.write(lastKey, outputValue);
+            }
+        } catch (Throwable ex) {
+            ex.printStackTrace();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
index a4b7956..e9918d4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
@@ -97,37 +97,41 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text,
LongWritable
 
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-        int nRegion = Math.round((float) gbPoints.size() / cut);
-        nRegion = Math.max(minRegionCount, nRegion);
-        nRegion = Math.min(maxRegionCount, nRegion);
-
-        int gbPerRegion = gbPoints.size() / nRegion;
-        gbPerRegion = Math.max(1, gbPerRegion);
-
-        if (hfileSizeGB <= 0) {
-            hfileSizeGB = gbPerRegion;
-        }
-        int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB);
-        hfilePerRegion = Math.max(1, hfilePerRegion);
-
-        System.out.println(nRegion + " regions");
-        System.out.println(gbPerRegion + " GB per region");
-        System.out.println(hfilePerRegion + " hfile per region");
-
-        Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
-        SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()),
context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class);
-        int hfileCountInOneRegion = 0;
-        for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
-            hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()),
NullWritable.get());
-            if (++hfileCountInOneRegion >= hfilePerRegion) {
-                Text key = gbPoints.get(i);
-                outputValue.set(i);
-                System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
-                context.write(key, outputValue);
-
-                hfileCountInOneRegion = 0;
+        try {
+            int nRegion = Math.round((float) gbPoints.size() / cut);
+            nRegion = Math.max(minRegionCount, nRegion);
+            nRegion = Math.min(maxRegionCount, nRegion);
+    
+            int gbPerRegion = gbPoints.size() / nRegion;
+            gbPerRegion = Math.max(1, gbPerRegion);
+    
+            if (hfileSizeGB <= 0) {
+                hfileSizeGB = gbPerRegion;
+            }
+            int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB);
+            hfilePerRegion = Math.max(1, hfilePerRegion);
+    
+            System.out.println(nRegion + " regions");
+            System.out.println(gbPerRegion + " GB per region");
+            System.out.println(hfilePerRegion + " hfile per region");
+    
+            Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
+            SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()),
context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class);
+            int hfileCountInOneRegion = 0;
+            for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
+                hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()),
NullWritable.get());
+                if (++hfileCountInOneRegion >= hfilePerRegion) {
+                    Text key = gbPoints.get(i);
+                    outputValue.set(i);
+                    System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t"
+ outputValue.get());
+                    context.write(key, outputValue);
+    
+                    hfileCountInOneRegion = 0;
+                }
             }
+            hfilePartitionWriter.close();
+        } catch (Throwable ex) {
+            logger.error("", ex);
         }
-        hfilePartitionWriter.close();
     }
 }


Mime
View raw message