Return-Path: X-Original-To: apmail-ignite-user-archive@minotaur.apache.org Delivered-To: apmail-ignite-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 591B918E0D for ; Mon, 4 Apr 2016 12:11:11 +0000 (UTC) Received: (qmail 59928 invoked by uid 500); 4 Apr 2016 12:11:11 -0000 Delivered-To: apmail-ignite-user-archive@ignite.apache.org Received: (qmail 59893 invoked by uid 500); 4 Apr 2016 12:11:11 -0000 Mailing-List: contact user-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@ignite.apache.org Delivered-To: mailing list user@ignite.apache.org Received: (qmail 59876 invoked by uid 99); 4 Apr 2016 12:11:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Apr 2016 12:11:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id A9A5EC05EF for ; Mon, 4 Apr 2016 12:11:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.772 X-Spam-Level: * X-Spam-Status: No, score=1.772 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_NONE=-0.0001, SPF_SOFTFAIL=0.972] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 0m2KORiohE-2 for ; Mon, 4 Apr 2016 12:11:03 +0000 (UTC) Received: from mbob.nabble.com (mbob.nabble.com [162.253.133.15]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id BEDAA5FADC for ; Mon, 4 Apr 2016 12:11:02 +0000 (UTC) Received: from malf.nabble.com (unknown [162.253.133.59]) by mbob.nabble.com (Postfix) with ESMTP id 5DB5E2470B19 for ; Mon, 4 Apr 2016 04:59:15 -0700 (PDT) Date: Mon, 4 Apr 2016 04:57:31 -0700 (PDT) From: F7753 To: user@ignite.apache.org Message-ID: <1459771051170-3894.post@n6.nabble.com> In-Reply-To: References: <1459494216430-3847.post@n6.nabble.com> <1459505806235-3852.post@n6.nabble.com> Subject: Re: How to solve the 22 parameters' limit under scala 2.10 in the case class? MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit I use two IgniteContext instance in my spark streaming job, one IgniteContext contains a cacheConfiguration, the cacheConfiguration was for the dataframe to join. Here the code is: ----------------------------------------------------------------------------------------------------- package main.scala /** * Created by F7753 on 2016/3/30. */ import kafka.serializer.StringDecoder import org.apache.ignite.cache.CacheMode import org.apache.ignite.cache.query.annotations.QuerySqlField import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} import org.apache.log4j._ import org.apache.ignite.Ignition import org.apache.ignite.configuration._ import org.apache.ignite.spark.IgniteContext import org.apache.ignite.configuration.CacheConfiguration import scala.annotation.meta.field object Schema { val small = StructType( Array( StructField("field0", StringType), StructField("field2", LongType), StructField("field3", LongType), StructField("field4", LongType), StructField("field5", LongType), StructField("field6", LongType), StructField("field7", StringType), StructField("field8", StringType), StructField("field9", StringType), StructField("field10", StringType), StructField("field11", StringType), StructField("field12", StringType), StructField("field13", StringType), StructField("field14", StringType), StructField("field15", StringType), StructField("field16", StringType), StructField("field17", StringType), StructField("field18", StringType), StructField("field19", StringType)) ) val source = StructType( Array( StructField("field0", LongType), StructField("field1", StringType), StructField("field2", StringType), StructField("field3", StringType), StructField("field4", IntegerType), StructField("field5", IntegerType), StructField("field6", IntegerType), StructField("field7", IntegerType), StructField("field8", IntegerType), StructField("field9", StringType), StructField("field10", StringType), StructField("field11", IntegerType), StructField("field12", StringType), StructField("field13", IntegerType), StructField("field14", StringType), StructField("field15",StringType), StructField("field16", IntegerType), StructField("field17", StringType), StructField("field18", IntegerType), StructField("field19", StringType), StructField("field20", StringType), StructField("field21", StringType), StructField("field22", IntegerType), StructField("field23", IntegerType), StructField("field24", StringType), StructField("field25", IntegerType), StructField("field26", IntegerType), StructField("field27", IntegerType), StructField("field28", IntegerType), StructField("field29", IntegerType), StructField("field30", LongType), StructField("field31", StringType), StructField("field32", LongType), StructField("field33", StringType), StructField("field34", LongType), StructField("field35", StringType), StructField("field36", LongType), StructField("field37", StringType), StructField("field38", IntegerType), StructField("field39", IntegerType), StructField("field40", IntegerType), StructField("field41", LongType)) ) } object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } } object StreamingJoin { private final val SMALL = StreamingJoin.getClass.getSimpleName + "SmallTable" private final val SOURCE = StreamingJoin.getClass.getSimpleName + "SourceTable" def checkArgs(args: Array[String]): Unit = { args(0) match { case "Socket" => if(args.length < 4) { System.err.println("Usage: StreamingTest Socket ") System.exit(1) } case _ => System.err.println("Unsurpported source...") System.exit(1) } } def main(args: Array[String]): Unit = { /** Type alias for `QuerySqlField`. */ type ScalarCacheQuerySqlField = QuerySqlField @field checkArgs(args) val scf = new SparkConf().setAppName("StreamingTest") val ssc = new StreamingContext(scf, Seconds(10)) val sqlCtx:SQLContext = SQLContextSingleton.getInstance(ssc.sparkContext) PropertyConfigurator.configure("./conf/log.conf") val smallTableData = SQLContextSingleton.getInstance(ssc.sparkContext).read .format("com.databricks.spark.csv") .option("header", "false") .schema(Schema.zb_test_schema) .load("hdfs://host:9000/fileName.csv") val smallTableDF = sqlCtx.createDataFrame(smallTableData.rdd, classOf[smallTableCache]) // create ignite context (embeded mode) val smallTableContext = new IgniteContext[String, small](ssc.sparkContext, "/home/test/SparkIgniteStreaming/config/example-cache.xml") // small table cache config val smallTableCacheCfg = new CacheConfiguration[String, small](SMALL) smallTableCacheCfg.setIndexedTypes(classOf[BigInt], classOf[small]) // table has "zb_test" name smallTableCacheCfg.setCacheMode(CacheMode.REPLICATED) // ignite small table cache val smallTableCache = smallTableContext.fromCache(smallTableCacheCfg) // smallTableCols rdd save to cache val smallTableCols_rdd = smallTableDF.rdd.map( r => ( r.getAs[String](0), new small( r.getAs[BigInt](0), r.getAs[String](1), r.getAs[String](2), r.getAs[String](3), r.getAs[Int](4), r.getAs[Int](5), r.getAs[Int](6), r.getAs[Int](7), r.getAs[Int](8), r.getAs[String](9), r.getAs[String](10), r.getAs[Int](11), r.getAs[String](12), r.getAs[String](13), r.getAs[String](14), r.getAs[Int](15), r.getAs[String](16), r.getAs[Int](17), r.getAs[String](18), r.getAs[String](19), r.getAs[String](20), r.getAs[Int](21), r.getAs[Int](22), r.getAs[String](23), r.getAs[Int](24), r.getAs[Int](25), r.getAs[Int](26), r.getAs[Int](27), r.getAs[Int](28), r.getAs[BigInt](29), r.getAs[String](30), r.getAs[BigInt](31), r.getAs[String](32), r.getAs[BigInt](33), r.getAs[String](34), r.getAs[BigInt](35), r.getAs[String](36), r.getAs[Int](37), r.getAs[Int](38), r.getAs[Int](39), r.getAs[BigInt](40) ))) smallTableCache.savePairs(smallTableCols_rdd) def creatSourceDStream: DStream[String] = { args(0) match { case "Socket" => ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.OFF_HEAP) } } val linesN = (1 to args(args.length-1).toInt).map(i => creatSourceDStream.flatMap(_.split("\n"))) val lines = ssc.union(linesN) lines.foreachRDD( (rdd: RDD[String], time: Time) => { val sqlCtx = SQLContextSingleton.getInstance(rdd.sparkContext) val rowRdd = rdd.map(_.split(",")).map(row => Row(row(0), row(1).toLong, row(2).toLong, row(3).toLong, row(4).toLong, row(5).toLong, row(6), row(7), row(8), row(9), row(10), row(11), row(12), row(13), row(14), row(15), row(16), row(17), row(18))) val sourceTableColsDF = sqlCtx.createDataFrame(rowRdd, classOf[source]) val sourceTableCols_rdd = sourceTableColsDF.rdd.map( r => ( r.getAs[String](0), new source( r.getAs[String](0), r.getAs[BigInt](1), r.getAs[BigInt](2), r.getAs[BigInt](3), r.getAs[BigInt](4), r.getAs[String](5), r.getAs[String](6), r.getAs[String](7), r.getAs[String](8), r.getAs[String](9), r.getAs[String](10), r.getAs[String](11), r.getAs[String](12), r.getAs[String](13), r.getAs[String](14), r.getAs[String](15), r.getAs[String](16), r.getAs[String](17) ) ) ) // source table cache config val sourceTableCacheCfg = new CacheConfiguration[String, source](SOURCE) sourceTableCacheCfg.setIndexedTypes(classOf[BigInt], classOf[source]) // table has "url_rz" name sourceTableCacheCfg.setCacheMode(CacheMode.PARTITIONED) val streamTableContext = new IgniteContext[String, source](ssc.sparkContext, "/home/test/config/default-config.xml") // ignite source table cache val sourceTableCache = streamTableContext.fromCache(sourceTableCacheCfg) // sourceTableCols rdd save to cache sourceTableCache.savePairs(sourceTableCols_rdd) val query = s""" |select s.fl, s.xz count(*) | from | SourceTable as e, \" """ + SOURCE+ """\".SmallTable as s | where | e.pz_id=s.pz_id | group by | s.fl, s.xz """.stripMargin val res = sourceTableCache.sql(query) println("-----------------------------") println("Time: " + time) println("-----------------------------") res.show(10) }) ssc.start() ssc.awaitTermination() } } ----------------------------------------------------------------------------------------------------- the Schema.scala: ----------------------------------------------------------------------------------------------------- package main.scala import org.apache.ignite.scalar.scalar._ /** * Created by INFI on 2016/3/31. */ class small(@ScalarCacheQuerySqlField field0: String, @ScalarCacheQuerySqlField field1: BigInt, @ScalarCacheQuerySqlField field2: BigInt, @ScalarCacheQuerySqlField field3: BigInt, @ScalarCacheQuerySqlField field4: BigInt, @ScalarCacheQuerySqlField field5: String, @ScalarCacheQuerySqlField field6: String, @ScalarCacheQuerySqlField field7: String, @ScalarCacheQuerySqlField field8: String, @ScalarCacheQuerySqlField field9: String, @ScalarCacheQuerySqlField field10: String, @ScalarCacheQuerySqlField field11: String, @ScalarCacheQuerySqlField field12: String, @ScalarCacheQuerySqlField field13: String, @ScalarCacheQuerySqlField field14: String, @ScalarCacheQuerySqlField field15: String, @ScalarCacheQuerySqlField field16: String, @ScalarCacheQuerySqlField field17: String)extends Serializable { } class source(@ScalarCacheQuerySqlField field0: BigInt, @ScalarCacheQuerySqlField field1: String, @ScalarCacheQuerySqlField field2: String, @ScalarCacheQuerySqlField field3: String, @ScalarCacheQuerySqlField field4: Int, @ScalarCacheQuerySqlField field5: Int, @ScalarCacheQuerySqlField field6: Int, @ScalarCacheQuerySqlField field7: Int, @ScalarCacheQuerySqlField field8: Int, @ScalarCacheQuerySqlField field9: String, @ScalarCacheQuerySqlField field10: String, @ScalarCacheQuerySqlField field11: Int, @ScalarCacheQuerySqlField field12: String, @ScalarCacheQuerySqlField field13: String, @ScalarCacheQuerySqlField field14: String, @ScalarCacheQuerySqlField field15: Int, @ScalarCacheQuerySqlField field16: String, @ScalarCacheQuerySqlField field17: Int, @ScalarCacheQuerySqlField field18: String, @ScalarCacheQuerySqlField field19: String, @ScalarCacheQuerySqlField field20: String, @ScalarCacheQuerySqlField field21: Int, @ScalarCacheQuerySqlField field22: Int, @ScalarCacheQuerySqlField field23: String, @ScalarCacheQuerySqlField field24: Int, @ScalarCacheQuerySqlField field25: Int, @ScalarCacheQuerySqlField field26: Int, @ScalarCacheQuerySqlField field27: Int, @ScalarCacheQuerySqlField field28: Int, @ScalarCacheQuerySqlField field29: BigInt, @ScalarCacheQuerySqlField field30: String, @ScalarCacheQuerySqlField field31: BigInt, @ScalarCacheQuerySqlField field32: String, @ScalarCacheQuerySqlField field33: BigInt, @ScalarCacheQuerySqlField field34: String, @ScalarCacheQuerySqlField field35: BigInt, @ScalarCacheQuerySqlField field36: String, @ScalarCacheQuerySqlField field37: Int, @ScalarCacheQuerySqlField field38: Int, @ScalarCacheQuerySqlField field39: Int, @ScalarCacheQuerySqlField field40: BigInt)extends Serializable { } ----------------------------------------------------------------------------------------------------- and the xml file is : ----------------------------------------------------------------------------------------------------- 127.0.0.1:47500..47509 ----------------------------------------------------------------------------------------------------- the log: ----------------------------------------------------------------------------------------------------- Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, nobida144): class org.apache.ignite.IgniteCheckedException: Spring XML configuration path is invalid: /home/test/SparkIgniteStreaming/config/example-cache.xml. Note that this path should be either absolute or a relative local file system path, relative to META-INF in classpath or valid URL to IGNITE_HOME. at org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3523) at org.apache.ignite.internal.IgnitionEx.loadConfigurations(IgnitionEx.java:643) at org.apache.ignite.internal.IgnitionEx.loadConfiguration(IgnitionEx.java:682) at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88) at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88) at org.apache.ignite.spark.Once.apply(IgniteContext.scala:184) at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:143) at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:171) at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:170) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.MalformedURLException: no protocol: /home/test/SparkIgniteStreaming/config/example-cache.xml at java.net.URL.(URL.java:589) at java.net.URL.(URL.java:486) at java.net.URL.(URL.java:435) at org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3514) ... 18 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at org.apache.ignite.spark.IgniteRDD.savePairs(IgniteRDD.scala:170) at main.scala.StreamingJoin$.main(StreamingJoin.scala:242) at main.scala.StreamingJoin.main(StreamingJoin.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: class org.apache.ignite.IgniteCheckedException: Spring XML configuration path is invalid: /home/test/SparkIgniteStreaming/config/example-cache.xml. Note that this path should be either absolute or a relative local file system path, relative to META-INF in classpath or valid URL to IGNITE_HOME. at org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3523) at org.apache.ignite.internal.IgnitionEx.loadConfigurations(IgnitionEx.java:643) at org.apache.ignite.internal.IgnitionEx.loadConfiguration(IgnitionEx.java:682) at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88) at org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88) at org.apache.ignite.spark.Once.apply(IgniteContext.scala:184) at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:143) at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:171) at org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:170) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.MalformedURLException: no protocol: /home/test/SparkIgniteStreaming/config/example-cache.xml at java.net.URL.(URL.java:589) at java.net.URL.(URL.java:486) at java.net.URL.(URL.java:435) at org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3514) ... 18 more ^C[18:09:32] Ignite node stopped OK [uptime=00:00:16:733] ----------------------------------------------------------------------------------------------------- I modified the schema fields' name for some reason, other code are the same with the one I executed. -- View this message in context: http://apache-ignite-users.70518.x6.nabble.com/How-to-solve-the-22-parameters-limit-under-scala-2-10-in-the-case-class-tp3847p3894.html Sent from the Apache Ignite Users mailing list archive at Nabble.com.