spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brad Cox <bradj...@gmail.com>
Subject Infinite recursion in createDataFrame for avro types
Date Sun, 10 Apr 2016 16:51:00 GMT
I'm getting a StackOverflowError from inside the createDataFrame call in this example. It originates
in scala code involving java type inferencing which calls itself in an infinite loop.

final EventParser parser = new EventParser();
JavaRDD<Event> eventRDD = sc.textFile(path)
        .map(new Function<String, Event>()
{
    public Event call(String line) throws Exception
    {
        Event event = parser.parse(line);
        log.info("event: "+event.toString());
        return event;
    }
});
log.info("eventRDD:" + eventRDD.toDebugString());

DataFrame df = sqlContext.createDataFrame(eventRDD, Event.class);
df.show();

The bottom of the stack trace looks like this:

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:104)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

This looks similar to the bug reported in http://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.html
but I'm using Spark 1.4.1 which is later than when this bug was repaired.

The Event class is generated by avro from this avsc. It does contain double and long fields
which has been reported as causing problems but replacing double with string doesn't change
the symptoms.

{
    "namespace": "mynamespace", 
    "type": "record", 
    "name": "Event", 
    "fields": [
        { "name": "ts", "type": "double", "doc": "Timestamp"},
        { "name": "uid", "type": "string", "doc": "Unique ID of Connection"},
        { "name": "idorigh", "type": "string", "doc": "Originating endpoint’s IP address
(AKA ORIG)"},
        { "name": "idorigp", "type": "int", "doc": "Originating endpoint’s TCP/UDP port
(or ICMP code)"},
        { "name": "idresph", "type": "string", "doc": "Responding endpoint’s IP address
(AKA RESP)"},
        { "name": "idrespp", "type": "int", "doc": "Responding endpoint’s TCP/UDP port (or
ICMP code)"},
        { "name": "proto", "type": "string", "doc": "Transport layer protocol of connection"},
        { "name": "service", "type": "string", "doc": "Dynamically detected application protocol,
if any"},
        { "name": "duration", "type": "double", "doc": "Time of last packet seen – time
of first packet seen"},
        { "name": "origbytes", "type": "int", "doc": "Originator payload bytes; from sequence
numbers if TCP"},
        { "name": "respbytes", "type": "int", "doc": "Responder payload bytes; from sequence
numbers if TCP"},
        { "name": "connstate", "type": "string", "doc": "Connection state (see conn.log:conn_state
table)"},
        { "name": "localorig", "type": "boolean", "doc": "If conn originated locally T; if
remotely F."},
        { "name": "localresp", "type": "boolean", "doc": "empty, always unset"},
        { "name": "missedbytes", "type": "int", "doc": "Number of missing bytes in content
gaps"},
        { "name": "history", "type": "string", "doc": "Connection state history (see conn.log:history
table)"},
        { "name": "origpkts", "type": [ "int", "null"], "doc": "Number of ORIG packets"},
        { "name": "origipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes
(via IP total_length header field)"},
        { "name": "resppkts", "type": [ "int", "null"], "doc": "Number of RESP packets"},
        { "name": "respipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes
(via IP total_length header field)"},
        { "name": "tunnelparents", "type": [ "string", "null"], "doc": "If tunneled, connection
UID of encapsulating parent (s)"},
        { "name": "origcc", "type": ["string", "null"], "doc": "ORIG GeoIP Country Code"},
        { "name": "respcc", "type": ["string", "null"], "doc": "RESP GeoIP Country Code"}
    ]
}

Could someone pls advise? Thanks!

Also posted at: https://stackoverflow.com/questions/36532237/infinite-recursion-in-createdataframe-for-avro-types

Dr. Brad J. Cox    Cell: 703-594-1883 Skype: dr.brad.cox





---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message