spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abel Coronado Iruegas <acoronadoirue...@gmail.com>
Subject Re: Example of Geoprocessing with Spark
Date Fri, 19 Sep 2014 02:02:14 GMT
Now i have a better version, but now the problem is that the saveAsTextFile
do not finish the Job, in the hdfs repository only exist a partial
temporary file, someone can tell me what is wrong:

Thanks !!

object SimpleApp {

        def main(args: Array[String]){

                val conf = new SparkConf().setAppName("Csv Clipper")

                val sc = new SparkContext(conf)

                val csvPath =
"hdfs://m01/user/acoronado/mov/movilidad_64mb.csv"

                val csv = sc.textFile(csvPath)

                csv.cache()

                val clipPoints = csv.map({line: String =>

                                               val Array(usuario, lat, lon,
date) = line.split(",").map(_.trim)

                                               val punto =
Point(lon.toDouble,lat.toDouble)

                                               val internal =
geoDataExternal.get.find(f => f.geometry intersects punto)

                                               val (cve_est, cve_mun) =
internal match {

                                       case
Some(f:org.geoscript.feature.Feature) => {

                                                            val index =
f.getAttribute(1).toString()

                                                            val existe =
geoDataMun.get(index).find(f => f.geometry intersects punto)

                                                            existe match {


case Some(f) => (f.getAttribute(1).toString, f.getAttribute(2).toString)


case None => ("0", "0")

                                                                          }

                                                          }

                                          case None => ("0", "0")

                                        }

                                        val time = try {(new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
"+0000")).getTime().toString()} catch {case e: Exception => "0"}


line+","+time+","+cve_est+","+cve_mun

                                })

                clipPoints.*saveAsTextFile*
("hdfs://m01/user/acoronado/mov/resultados_movilidad_60.csv")

                println("Spark Clip Exito!!!")

        }

        object geoDataMun {

          private val shp = Shapefile("/geoData/MunicipiosLatLon.shp")

          val features = shp.getFeatures.toIterable

      val result = scala.io.Source.fromFile("/geoData/indice_espacial.csv")

        .getLines()

        .toList map { line: String =>

                                       val campos =
line.split(",").map(_.trim)

                                       val cve_edo = campos(0)

                                       val cve_mun = campos(1)

                                       val index = campos(2)


scala.collection.immutable.List(index.toInt , (cve_edo,cve_mun))

                                    }

      val mapaIx = result.groupBy(x=>x(0)).mapValues(cves => cves.map(x =>
x(1)))

      def get(index:String) = {

        features.filter(f =>
mapaIx(index.toInt).contains((f.getAttribute(1).toString,f.getAttribute(2).toString)))

      }

        }

    object geoDataExternal{

      private val shp = Shapefile("/geoData/IndiceRecortado.shp")

      val features = shp.getFeatures

      def get:
FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
= features

    }

}


the log of the driver is:

14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://
sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
[Association failed with [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [

akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]

Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942

]

14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://
sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
[Association failed with [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [

akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]

Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942

]

14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://
sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
[Association failed with [akka.tcp://
sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [

akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]

Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942



On Mon, Sep 15, 2014 at 1:30 PM, Abel Coronado Iruegas <
acoronadoiruegas@gmail.com> wrote:

> Here an example of a working code that takes a csv with lat lon points and
> intersects with polygons of municipalities of Mexico, generating a new
> version of the file with new attributes.
>
> Do you think that could be improved?
>
> Thanks.
>
> The Code:
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkConf
> import org.geoscript.feature._
> import org.geoscript.geometry._
> import org.geoscript.geometry.builder._
> import com.vividsolutions.jts._
> import org.geoscript.layer.Shapefile
> import org.geotools.feature.FeatureCollection
> import java.text._
> import java.util._
>
> object SimpleApp {
>         def main(args: Array[String]){
>                 val conf = new SparkConf().setAppName("Csv Clipper")
>                 val sc = new SparkContext(conf)
>                 val csvPath =
> "hdfs://x01/user/acoronado/mov/movilidad.csv" //70 Millions of rows
>                 val csv = sc.textFile(csvPath)
>                 val clipPoints = csv.map({line: String =>
>                                                val Array(usuario, lat,
> lon, date) = line.split(",").map(_.trim)
>                                                val punto =
> Point(lon.toDouble,lat.toDouble)
>                                                val existe =
> geoData.get.filter(f => f.geometry intersects punto) // Geospatial operation
>                                                var cve_est = "0"
>                                                var cve_mun = "0"
>                                                var time = "0"
>                                                if(!existe.isEmpty){
>                                                   val f = existe.take(1)
>                                                   val ff = f.toList(0)
>                                                   cve_est =
> ff.getAttribute(1).toString //State Code
>                                                   cve_mun =
> ff.getAttribute(2).toString  // Municipality Code
>                                                   time = (new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
> "+0000")).getTime().toString()
>                                                }
>
>  line+","+time+","+cve_est+","+cve_mun
>                                            })
>
> clipPoints.coalesce(1,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/mov_all.csv")
>                 println("Spark Clip Exito!!!")
>         }
>         object geoData {
>             private val estatal =
> Shapefile("/geoData/MunicipiosLatLon.shp") //This directory exist in all
> the nodes.
>             private val estatalColl = estatal.getFeatures
>             def
> get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
> = estatalColl
>         }
> }
>

Mime
View raw message