hadoop-common-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "asin (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (HADOOP-16021) SequenceFile.createWriter appendIfExists codec cause NullPointerException
Date Thu, 27 Dec 2018 15:58:00 GMT

     [ https://issues.apache.org/jira/browse/HADOOP-16021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

asin updated HADOOP-16021:
--------------------------
    Description: 
 
 I want append the data in a file , when i use SequenceFile.appendIfExists , it throw NullPointerException
at at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119)

when i remove the 'appendIfExists', it works, but it will cover old file.

 

when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not support" exception

 
{code:java}
// my code
SequenceFile.Writer writer = null; 

writer = SequenceFile.createWriter(conf, 
    SequenceFile.Writer.file(path), 
    SequenceFile.Writer.keyClass(Text.class), 
    SequenceFile.Writer.valueClass(Text.class), 
    SequenceFile.Writer.appendIfExists(true) );
{code}
 
{code:java}
// all my code
public class Writer1 implements VoidFunction<Iterator<Tuple2<String, String>>>
{
    private static Configuration conf = new Configuration();
    private int MAX_LINE = 3; // little num,for test

    @Override
    public void call(Iterator<Tuple2<String, String>> iterator) throws Exception
{
        int partitionId = TaskContext.get().partitionId();
        int count = 0;
        SequenceFile.Writer writer = null;
        while (iterator.hasNext()) {

            Tuple2<String, String> tp = iterator.next();
            Path path = new Path("D:/tmp-doc/logs/logs.txt");

            if (writer == null)
                writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path),
                        SequenceFile.Writer.keyClass(Text.class),
                        SequenceFile.Writer.valueClass(Text.class),
                        SequenceFile.Writer.appendIfExists(true)
                        );

            writer.append(new Text(tp._1), new Text(tp._2));
            count++;

            if (count > MAX_LINE) {
                IOUtils.closeStream(writer);
                count = 0;
                writer = SequenceFile.createWriter(... // same as above
            }
        }
        if (count > 0) {
            IOUtils.closeStream(writer);
        }
        IOUtils.closeStream(writer);
    }
}
{code}
 // above code call by below
{code:java}
import com.xxx.algo.hadoop.Writer1
import com.xxx.algo.utils.Utils
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}


object KafkaSparkStreamingApp {
  def main(args: Array[String]): Unit = {
    val kafka = "192.168.30.4:9092,192.168.30.5:9092,192.168.30.6:9092"
    val zk = "192.168.30.4:2181,192.168.30.5:2181,192.168.30.6:2181"
    val topics = Set("test.aries.collection.appevent.biz")
    val tag = "biz"
    val durationSeconds = 5000
    val conf = new SparkConf()
    conf.setAppName("user-log-consumer")
      .set("spark.serilizer","org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.registrationRequired", "true")
      .set("spark.defalut.parallelism","2")
      .set("spark.rdd.compress","true")
      .setMaster("local[2]")
    val sc = new SparkContext(conf)
    val session = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    val ssc = new StreamingContext(sc, Durations.milliseconds(durationSeconds))
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> kafka,
      "bootstrap.servers" -> kafka,
      "zookeeper.connect" -> zk,
      "group.id" -> "recommend_stream_spark",
      "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )
    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc,
      kafkaParams,
      topics
    )
    val timeFieldName = "log_time"
    stream.foreachRDD(rddMsg => {
      rddMsg.map(msg => {
        val value = msg._2
        val time = Utils.getTime(value, timeFieldName)
        new Tuple2(time + "," + tag, value)
      })
        .toJavaRDD().foreachPartition(new Writer1()) // here
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
{code}
{{more info see:[https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception]}}

  was:
 
 I want append the data in a file , when i use SequenceFile.appendIfExists , it throw NullPointerException
at at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119)

when i remove the 'appendIfExists', it works, but it will cover old file.

 

when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not support" exception

 
{code:java}
// my code
SequenceFile.Writer writer = null; 

writer = SequenceFile.createWriter(conf, 
    SequenceFile.Writer.file(path), 
    SequenceFile.Writer.keyClass(Text.class), 
    SequenceFile.Writer.valueClass(Text.class), 
    SequenceFile.Writer.appendIfExists(true) );
{code}
 
{code:java}
// all my code
public class Writer1 implements VoidFunction<Iterator<Tuple2<String, String>>>
{
    private static Configuration conf = new Configuration();
    private int MAX_LINE = 3; // little num,for test

    @Override
    public void call(Iterator<Tuple2<String, String>> iterator) throws Exception
{
        int partitionId = TaskContext.get().partitionId();
        int count = 0;
        SequenceFile.Writer writer = null;
        while (iterator.hasNext()) {

            Tuple2<String, String> tp = iterator.next();
            Path path = new Path("D:/tmp-doc/logs/logs.txt");

            if (writer == null)
                writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path),
                        SequenceFile.Writer.keyClass(Text.class),
                        SequenceFile.Writer.valueClass(Text.class),
                        SequenceFile.Writer.appendIfExists(true)
                        );

            writer.append(new Text(tp._1), new Text(tp._2));
            count++;

            if (count > MAX_LINE) {
                IOUtils.closeStream(writer);
                count = 0;
                writer = SequenceFile.createWriter(... // same as above
            }
        }
        if (count > 0) {
            IOUtils.closeStream(writer);
        }
        IOUtils.closeStream(writer);
    }
}
{code}
 // above code call by below
{code:java}
import com.xxx.algo.hadoop.Writer1
import com.xxx.algo.utils.Utils
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}


object KafkaSparkStreamingApp {
def main(args: Array[String]): Unit = {
val kafka = "192.168.30.4:9092,192.168.30.5:9092,192.168.30.6:9092"
val zk = "192.168.30.4:2181,192.168.30.5:2181,192.168.30.6:2181"
val topics = Set("test.aries.collection.appevent.biz")
val tag = "biz"
val durationSeconds = 5000
val conf = new SparkConf()
conf.setAppName("user-log-consumer")
.set("spark.serilizer","org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.defalut.parallelism","2")
.set("spark.rdd.compress","true")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val session = SparkSession.builder()
.config(conf)
.getOrCreate()
val ssc = new StreamingContext(sc, Durations.milliseconds(durationSeconds))
val kafkaParams = Map[String, String](
"metadata.broker.list" -> kafka,
"bootstrap.servers" -> kafka,
"zookeeper.connect" -> zk,
"group.id" -> "recommend_stream_spark",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics
)
val timeFieldName = "log_time"
stream.foreachRDD(rddMsg => {
rddMsg.map(msg => {
val value = msg._2
val time = Utils.getTime(value, timeFieldName)
new Tuple2(time + "," + tag, value)
})
.toJavaRDD().foreachPartition(new Writer1())
})
ssc.start()
ssc.awaitTermination()
}
}

{code}

 {{more info see:[https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception]}}


> SequenceFile.createWriter appendIfExists codec cause NullPointerException
> -------------------------------------------------------------------------
>
>                 Key: HADOOP-16021
>                 URL: https://issues.apache.org/jira/browse/HADOOP-16021
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: common
>    Affects Versions: 2.7.3
>         Environment: windows10, hadoop2.7.3, jdk8
>            Reporter: asin
>            Priority: Major
>              Labels: bug
>         Attachments: 57.png
>
>
>  
>  I want append the data in a file , when i use SequenceFile.appendIfExists , it throw
NullPointerException at at org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1119)
> when i remove the 'appendIfExists', it works, but it will cover old file.
>  
> when i try use CompressionType.RECORD or CompressionType.BLOCK throw "not support" exception
>  
> {code:java}
> // my code
> SequenceFile.Writer writer = null; 
> writer = SequenceFile.createWriter(conf, 
>     SequenceFile.Writer.file(path), 
>     SequenceFile.Writer.keyClass(Text.class), 
>     SequenceFile.Writer.valueClass(Text.class), 
>     SequenceFile.Writer.appendIfExists(true) );
> {code}
>  
> {code:java}
> // all my code
> public class Writer1 implements VoidFunction<Iterator<Tuple2<String, String>>>
{
>     private static Configuration conf = new Configuration();
>     private int MAX_LINE = 3; // little num,for test
>     @Override
>     public void call(Iterator<Tuple2<String, String>> iterator) throws Exception
{
>         int partitionId = TaskContext.get().partitionId();
>         int count = 0;
>         SequenceFile.Writer writer = null;
>         while (iterator.hasNext()) {
>             Tuple2<String, String> tp = iterator.next();
>             Path path = new Path("D:/tmp-doc/logs/logs.txt");
>             if (writer == null)
>                 writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path),
>                         SequenceFile.Writer.keyClass(Text.class),
>                         SequenceFile.Writer.valueClass(Text.class),
>                         SequenceFile.Writer.appendIfExists(true)
>                         );
>             writer.append(new Text(tp._1), new Text(tp._2));
>             count++;
>             if (count > MAX_LINE) {
>                 IOUtils.closeStream(writer);
>                 count = 0;
>                 writer = SequenceFile.createWriter(... // same as above
>             }
>         }
>         if (count > 0) {
>             IOUtils.closeStream(writer);
>         }
>         IOUtils.closeStream(writer);
>     }
> }
> {code}
>  // above code call by below
> {code:java}
> import com.xxx.algo.hadoop.Writer1
> import com.xxx.algo.utils.Utils
> import kafka.serializer.StringDecoder
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.streaming.kafka.KafkaUtils
> import org.apache.spark.streaming.{Durations, StreamingContext}
> import org.apache.spark.{SparkConf, SparkContext}
> object KafkaSparkStreamingApp {
>   def main(args: Array[String]): Unit = {
>     val kafka = "192.168.30.4:9092,192.168.30.5:9092,192.168.30.6:9092"
>     val zk = "192.168.30.4:2181,192.168.30.5:2181,192.168.30.6:2181"
>     val topics = Set("test.aries.collection.appevent.biz")
>     val tag = "biz"
>     val durationSeconds = 5000
>     val conf = new SparkConf()
>     conf.setAppName("user-log-consumer")
>       .set("spark.serilizer","org.apache.spark.serializer.KryoSerializer")
>       .set("spark.kryo.registrationRequired", "true")
>       .set("spark.defalut.parallelism","2")
>       .set("spark.rdd.compress","true")
>       .setMaster("local[2]")
>     val sc = new SparkContext(conf)
>     val session = SparkSession.builder()
>       .config(conf)
>       .getOrCreate()
>     val ssc = new StreamingContext(sc, Durations.milliseconds(durationSeconds))
>     val kafkaParams = Map[String, String](
>       "metadata.broker.list" -> kafka,
>       "bootstrap.servers" -> kafka,
>       "zookeeper.connect" -> zk,
>       "group.id" -> "recommend_stream_spark",
>       "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
>       "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
>       "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
>     )
>     val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
>       ssc,
>       kafkaParams,
>       topics
>     )
>     val timeFieldName = "log_time"
>     stream.foreachRDD(rddMsg => {
>       rddMsg.map(msg => {
>         val value = msg._2
>         val time = Utils.getTime(value, timeFieldName)
>         new Tuple2(time + "," + tag, value)
>       })
>         .toJavaRDD().foreachPartition(new Writer1()) // here
>     })
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
> {code}
> {{more info see:[https://stackoverflow.com/questions/53943978/hadoop-sequencefile-createwriter-appendifexists-codec-cause-nullpointerexception]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


Mime
View raw message