hadoop-common-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "lqjacklee (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HADOOP-16021) SequenceFile.createWriter appendIfExists codec cause NullPointerException
Date Tue, 01 Jan 2019 11:56:00 GMT

    [ https://issues.apache.org/jira/browse/HADOOP-16021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16731579#comment-16731579
] 

lqjacklee commented on HADOOP-16021:
------------------------------------

[~xinkenny] Thanks ,I will try to reproduce it .

> SequenceFile.createWriter appendIfExists codec cause NullPointerException
> -------------------------------------------------------------------------
>
>                 Key: HADOOP-16021
>                 URL: https://issues.apache.org/jira/browse/HADOOP-16021
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: common
>    Affects Versions: 2.7.3
>         Environment: windows10 or Linux-centos , hadoop2.7.3, jdk8
>            Reporter: asin
>            Priority: Major
>              Labels: bug
>         Attachments: 055.png, 62.png, CompressionType.BLOCK-Not supported-error log.txt,
CompressionType.NONE-NullPointerException-error log.txt
>
>
>  
>  I want append the data in a file , when i use SequenceFile.appendIfExists , it throw
NullPointerException at at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119)
> when i remove the 'appendIfExists', it works, but it will cover old file.
>  
> when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not support" exception
>  
> {code:java}
> // my code
> SequenceFile.Writer writer = null; 
> writer = SequenceFile.createWriter(conf, 
>     SequenceFile.Writer.file(path), 
>     SequenceFile.Writer.keyClass(Text.class), 
>     SequenceFile.Writer.valueClass(Text.class), 
>     SequenceFile.Writer.appendIfExists(true) );
> {code}
>  
> {code:java}
> // all my code
> public class Writer1 implements VoidFunction<Iterator<Tuple2<String, String>>>
{
>     private static Configuration conf = new Configuration();
>     private int MAX_LINE = 3; // little num,for test
>     @Override
>     public void call(Iterator<Tuple2<String, String>> iterator) throws Exception
{
>         int partitionId = TaskContext.get().partitionId();
>         int count = 0;
>         SequenceFile.Writer writer = null;
>         while (iterator.hasNext()) {
>             Tuple2<String, String> tp = iterator.next();
>             Path path = new Path("D:/tmp-doc/logs/logs.txt");
>             if (writer == null)
>                 writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path),
>                         SequenceFile.Writer.keyClass(Text.class),
>                         SequenceFile.Writer.valueClass(Text.class),
>                         SequenceFile.Writer.appendIfExists(true)
>                         );
>             writer.append(new Text(tp._1), new Text(tp._2));
>             count++;
>             if (count > MAX_LINE) {
>                 IOUtils.closeStream(writer);
>                 count = 0;
>                 writer = SequenceFile.createWriter(... // same as above
>             }
>         }
>         if (count > 0) {
>             IOUtils.closeStream(writer);
>         }
>         IOUtils.closeStream(writer);
>     }
> }
> {code}
>  // above code call by below
> {code:java}
> import com.xxx.algo.hadoop.Writer1
> import com.xxx.algo.utils.Utils
> import kafka.serializer.StringDecoder
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.streaming.kafka.KafkaUtils
> import org.apache.spark.streaming.{Durations, StreamingContext}
> import org.apache.spark.{SparkConf, SparkContext}
> object KafkaSparkStreamingApp {
>   def main(args: Array[String]): Unit = {
>     val kafka = "192.168.30.4:9092,192.168.30.5:9092,192.168.30.6:9092"
>     val zk = "192.168.30.4:2181,192.168.30.5:2181,192.168.30.6:2181"
>     val topics = Set("test.aries.collection.appevent.biz")
>     val tag = "biz"
>     val durationSeconds = 5000
>     val conf = new SparkConf()
>     conf.setAppName("user-log-consumer")
>       .set("spark.serilizer","org.apache.spark.serializer.KryoSerializer")
>       .set("spark.kryo.registrationRequired", "true")
>       .set("spark.defalut.parallelism","2")
>       .set("spark.rdd.compress","true")
>       .setMaster("local[2]")
>     val sc = new SparkContext(conf)
>     val session = SparkSession.builder()
>       .config(conf)
>       .getOrCreate()
>     val ssc = new StreamingContext(sc, Durations.milliseconds(durationSeconds))
>     val kafkaParams = Map[String, String](
>       "metadata.broker.list" -> kafka,
>       "bootstrap.servers" -> kafka,
>       "zookeeper.connect" -> zk,
>       "group.id" -> "recommend_stream_spark",
>       "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
>       "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
>       "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
>     )
>     val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
>       ssc,
>       kafkaParams,
>       topics
>     )
>     val timeFieldName = "log_time"
>     stream.foreachRDD(rddMsg => {
>       rddMsg.map(msg => {
>         val value = msg._2
>         val time = Utils.getTime(value, timeFieldName)
>         new Tuple2(time + "," + tag, value)
>       })
>         .toJavaRDD().foreachPartition(new Writer1()) // here
>     })
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
> {code}
> {{more info see:[https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


Mime
View raw message