kylin-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KYLIN-3446) Convert to HFile in spark reports ZK connection refused
Date Mon, 20 Aug 2018 11:10:00 GMT

    [ https://issues.apache.org/jira/browse/KYLIN-3446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585792#comment-16585792
] 

ASF GitHub Bot commented on KYLIN-3446:
---------------------------------------

shaofengshi closed pull request #195: KYLIN-3446 Connect to HBase out of Spark
URL: https://github.com/apache/kylin/pull/195
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 68aa1728d5..3cc123922a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -27,11 +27,15 @@
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
@@ -46,6 +50,7 @@
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.CuboidShardUtil;
+import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +68,7 @@
     CubeDesc cubeDesc = null;
     String segmentID = null;
     String cuboidModeName = null;
+    String hbaseConfPath = null;
     KylinConfig kylinConfig;
     Path partitionFilePath;
 
@@ -74,6 +80,7 @@ public int run(String[] args) throws Exception {
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_PARTITION_FILE_PATH);
         options.addOption(OPTION_CUBOID_MODE);
+        options.addOption(OPTION_DICT_PATH);
         parseOptions(options, args);
 
         partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
@@ -85,11 +92,12 @@ public int run(String[] args) throws Exception {
         kylinConfig = cube.getConfig();
         segmentID = getOptionValue(OPTION_SEGMENT_ID);
         cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
+        hbaseConfPath = getOptionValue(OPTION_DICT_PATH);
         CubeSegment cubeSegment = cube.getSegmentById(segmentID);
 
         byte[][] splitKeys;
         Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
-        
+
         // for cube planner, will keep cuboidSizeMap unchanged if cube planner is disabled
         Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
         if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
@@ -104,14 +112,36 @@ public int run(String[] args) throws Exception {
             }
             cuboidSizeMap = optimizedCuboidSizeMap;
         }
-        
+
         splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment,
                 partitionFilePath.getParent());
 
         CubeHTableUtil.createHTable(cubeSegment, splitKeys);
+        createHBaseConnection(cubeSegment);
         return 0;
     }
 
+    private void createHBaseConnection(CubeSegment cubeSegment) throws Exception {
+
+        Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
+        HadoopUtil.healSickConfig(hbaseConf);
+        Job job = new Job(hbaseConf, cubeSegment.getStorageLocationIdentifier());
+        job.getConfiguration().set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
+        HTable table = new HTable(hbaseConf, cubeSegment.getStorageLocationIdentifier());
+        HFileOutputFormat2.configureIncrementalLoadMap(job, table);
+
+        logger.info("Saving HBase configuration to " + hbaseConfPath);
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        FSDataOutputStream out = null;
+        try {
+            out = fs.create(new Path(hbaseConfPath));
+            job.getConfiguration().writeXml(out);
+        } catch (IOException e) {
+            throw new ExecuteException("write hbase configuration failed");
+        } finally {
+            out.close();
+        }
+    }
 
     //one region for one shard
     private static byte[][] getSplitsByRegionCount(int regionCount) {
@@ -124,7 +154,9 @@ public int run(String[] args) throws Exception {
         return result;
     }
 
-    public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double>
cubeSizeMap, final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder)
throws IOException {
+    public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double>
cubeSizeMap,
+            final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder)
+            throws IOException {
 
         final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
         float cut = cubeDesc.getConfig().getKylinHBaseRegionCut();
@@ -157,7 +189,8 @@ public int run(String[] args) throws Exception {
             }
 
             if (nRegion != original) {
-                logger.info("Region count is adjusted from " + original + " to " + nRegion
+ " to help random sharding");
+                logger.info(
+                        "Region count is adjusted from " + original + " to " + nRegion +
" to help random sharding");
             }
         }
 
@@ -188,10 +221,13 @@ public int run(String[] args) throws Exception {
                 }
 
                 if (shardNum > nRegion) {
-                    logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate
%d regions, reduce to %d", cuboidId, estimatedSize, shardNum, nRegion));
+                    logger.info(
+                            String.format("Cuboid %d 's estimated size %.2f MB will generate
%d regions, reduce to %d",
+                                    cuboidId, estimatedSize, shardNum, nRegion));
                     shardNum = nRegion;
                 } else {
-                    logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate
%d regions", cuboidId, estimatedSize, shardNum));
+                    logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate
%d regions", cuboidId,
+                            estimatedSize, shardNum));
                 }
 
                 cuboidShards.put(cuboidId, (short) shardNum);
@@ -204,7 +240,8 @@ public int run(String[] args) throws Exception {
             }
 
             for (int i = 0; i < nRegion; ++i) {
-                logger.info(String.format("Region %d's estimated size is %.2f MB, accounting
for %.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
+                logger.info(String.format("Region %d's estimated size is %.2f MB, accounting
for %.2f percent", i,
+                        regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
             }
 
             CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards, nRegion);
@@ -222,7 +259,8 @@ public int run(String[] args) throws Exception {
                 if (size >= mbPerRegion || (size + cubeSizeMap.get(cuboidId)) >= mbPerRegion
* 1.2) {
                     // if the size already bigger than threshold, or it will exceed by 20%,
cut for next region
                     regionSplit.add(cuboidId);
-                    logger.info("Region " + regionIndex + " will be " + size + " MB, contains
cuboids < " + cuboidId + " (" + cuboidCount + ") cuboids");
+                    logger.info("Region " + regionIndex + " will be " + size + " MB, contains
cuboids < " + cuboidId
+                            + " (" + cuboidCount + ") cuboids");
                     size = 0;
                     cuboidCount = 0;
                     regionIndex++;
@@ -240,7 +278,8 @@ public int run(String[] args) throws Exception {
         }
     }
 
-    protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits,
int mbPerRegion, final Path outputFolder, final KylinConfig kylinConfig) throws IOException
{
+    protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits,
int mbPerRegion,
+            final Path outputFolder, final KylinConfig kylinConfig) throws IOException {
 
         if (outputFolder == null) {
             logger.warn("outputFolder for hfile split file is null, skip inner region split");
@@ -300,7 +339,8 @@ protected static void saveHFileSplits(final List<HashMap<Long, Double>>
innerReg
                     logger.info(String.format("Region %d's hfile %d size is %.2f mb", i,
j, accumulatedSize));
                     byte[] split = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN];
                     BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN);
-                    System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN,
RowConstants.ROWKEY_CUBOIDID_LEN);
+                    System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN,
+                            RowConstants.ROWKEY_CUBOIDID_LEN);
                     splits.add(split);
                     accumulatedSize = 0;
                     j++;
@@ -310,11 +350,15 @@ protected static void saveHFileSplits(final List<HashMap<Long,
Double>> innerReg
 
         }
 
-        SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, SequenceFile.Writer.file(hfilePartitionFile),
SequenceFile.Writer.keyClass(RowKeyWritable.class), SequenceFile.Writer.valueClass(NullWritable.class));
+        SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf,
+                SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class),
+                SequenceFile.Writer.valueClass(NullWritable.class));
 
         for (int i = 0; i < splits.size(); i++) {
             //when we compare the rowkey, we compare the row firstly.
-            hfilePartitionWriter.append(new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()),
NullWritable.get());
+            hfilePartitionWriter.append(
+                    new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()),
+                    NullWritable.get());
         }
         hfilePartitionWriter.close();
     }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
index 4fda1398cd..dfe7d0cefd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
@@ -59,8 +59,10 @@ public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum
         StringBuilder cmd = new StringBuilder();
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId)
+ "/part-r-00000");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION,
+                getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_DICT_PATH, getJobWorkingDir(jobId)
+ "/hbase-conf.xml");
 
         createHtableStep.setJobParams(cmd.toString());
         createHtableStep.setJobClass(CreateHTableJob.class);
@@ -69,7 +71,8 @@ public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum
     }
 
     // TODO make it abstract
-    public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment>
mergingSegments, String jobID, Class<? extends AbstractHadoopJob> clazz) {
+    public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment>
mergingSegments,
+            String jobID, Class<? extends AbstractHadoopJob> clazz) {
         final List<String> mergingCuboidPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
@@ -86,7 +89,8 @@ public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeS
         appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, formattedPath);
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
-        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" +
seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
 
         mergeCuboidDataStep.setMapReduceParams(cmd.toString());
         mergeCuboidDataStep.setMapReduceJobClass(clazz);
@@ -148,8 +152,10 @@ public MergeGCStep createOptimizeGCStep() {
     }
 
     public List<String> getMergingHTables() {
-        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment)
seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than
2 segments to merge, target segment " + seg);
+        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization())
+                .getMergingSegments((CubeSegment) seg);
+        Preconditions.checkState(mergingSegments.size() > 1,
+                "there should be more than 2 segments to merge, target segment " + seg);
         final List<String> mergingHTables = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingHTables.add(merging.getStorageLocationIdentifier());
@@ -158,8 +164,10 @@ public MergeGCStep createOptimizeGCStep() {
     }
 
     public List<String> getMergingHDFSPaths() {
-        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment)
seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than
2 segments to merge, target segment " + seg);
+        final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization())
+                .getMergingSegments((CubeSegment) seg);
+        Preconditions.checkState(mergingSegments.size() > 1,
+                "there should be more than 2 segments to merge, target segment " + seg);
         final List<String> mergingHDFSPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
@@ -180,11 +188,13 @@ public MergeGCStep createOptimizeGCStep() {
     }
 
     public String getHFilePath(String jobId) {
-        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) +
"/" + seg.getRealization().getName() + "/hfile/");
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(
+                getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
     }
 
     public String getRowkeyDistributionOutputPath(String jobId) {
-        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) +
"/" + seg.getRealization().getName() + "/rowkey_stats");
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(
+                getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
     }
 
     public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index 622a0e8891..e599bfce03 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -48,6 +48,7 @@ public AbstractExecutable createConvertCuboidToHfileStep(String jobId) {
         sparkExecutable.setParam(SparkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId));
         sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(),
                 getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
+        sparkExecutable.setParam(SparkCubeHFile.OPTION_WORKING_PATH.getOpt(), getJobWorkingDir(jobId));
 
         sparkExecutable.setJobId(jobId);
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index fd8459fd51..32a4cd4119 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -17,7 +17,6 @@
 */
 package org.apache.kylin.storage.hbase.steps;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -30,12 +29,11 @@
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -58,7 +56,6 @@
 import org.apache.kylin.engine.spark.KylinSparkJobListener;
 import org.apache.kylin.engine.spark.SparkUtil;
 import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.spark.Partitioner;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -91,6 +88,8 @@
             .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
     public static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION)
             .hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION);
+    public static final Option OPTION_WORKING_PATH = OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH).hasArg()
+            .isRequired(true).withDescription("Job working path").create(BatchConstants.ARG_DICT_PATH);
 
     private Options options;
 
@@ -102,6 +101,7 @@ public SparkCubeHFile() {
         options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
         options.addOption(OPTION_PARTITION_FILE_PATH);
+        options.addOption(OPTION_WORKING_PATH);
     }
 
     @Override
@@ -117,6 +117,7 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
         final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
         final Path partitionFilePath = new Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH));
+        final String workingPath = optionsHelper.getOptionValue(OPTION_WORKING_PATH);
 
         Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"),
KeyValueCreator.class,
                 KeyValue.class, RowKeyWritable.class };
@@ -171,17 +172,15 @@ protected void execute(OptionsHelper optionsHelper) throws Exception
{
         }
 
         logger.info("There are " + keys.size() + " split keys, totally " + (keys.size() +
1) + " hfiles");
-        Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
-        HadoopUtil.healSickConfig(hbaseConf);
-        Job job = new Job(hbaseConf, cubeSegment.getStorageLocationIdentifier());
-        job.getConfiguration().set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
-        HTable table = new HTable(hbaseConf, cubeSegment.getStorageLocationIdentifier());
-        try {
-            HFileOutputFormat2.configureIncrementalLoadMap(job, table);
-        } catch (IOException ioe) {
-            // this can be ignored.
-            logger.debug(ioe.getMessage(), ioe);
-        }
+
+        //HBase conf
+        String hbasePath = workingPath + "/hbase-conf.xml";
+        FSDataInputStream confInput = fs.open(new Path(hbasePath));
+        logger.info("Loading HBase configuration from:" + hbasePath);
+
+        Configuration hbaseJobConf = new Configuration();
+        hbaseJobConf.addResource(confInput);
+        Job job = new Job(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
 
         FileOutputFormat.setOutputPath(job, new Path(outputPath));
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Convert to HFile in spark reports ZK connection refused
> -------------------------------------------------------
>
>                 Key: KYLIN-3446
>                 URL: https://issues.apache.org/jira/browse/KYLIN-3446
>             Project: Kylin
>          Issue Type: Bug
>          Components: Spark Engine
>            Reporter: Shaofeng SHI
>            Assignee: Yichen Zhou
>            Priority: Major
>             Fix For: v2.5.0
>
>
> {code:java}
> to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown
error)
> 2018-07-12 18:51:21,001 INFO [Scheduler 1109292714 Job 62f42193-20ff-4ca9-b898-52978a473bce-864]
spark.SparkExecutable:38 : 18/07/12 18:51:21 WARN zookeeper.ClientCnxn: Session 0x0 for server
null, unexpected error, closing socket connection and attempting reconnect
> 2018-07-12 18:51:21,002 INFO [Scheduler 1109292714 Job 62f42193-20ff-4ca9-b898-52978a473bce-864]
spark.SparkExecutable:38 : java.net.ConnectException: Connection refused
> 2018-07-12 18:51:21,002 INFO [Scheduler 1109292714 Job 62f42193-20ff-4ca9-b898-52978a473bce-864]
spark.SparkExecutable:38 : at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 2018-07-12 18:51:21,002 INFO [Scheduler 1109292714 Job 62f42193-20ff-4ca9-b898-52978a473bce-864]
spark.SparkExecutable:38 : at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> 2018-07-12 18:51:21,002 INFO [Scheduler 1109292714 Job 62f42193-20ff-4ca9-b898-52978a473bce-864]
spark.SparkExecutable:38 : at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> 2018-07-12 18:51:21,002 INFO [Scheduler 1109292714 Job 62f42193-20ff-4ca9-b898-52978a473bce-864]
spark.SparkExecutable:38 : at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message