Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 11CC3200BA3 for ; Thu, 20 Oct 2016 08:39:02 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1071B160ADB; Thu, 20 Oct 2016 06:39:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0967F160AF2 for ; Thu, 20 Oct 2016 08:39:00 +0200 (CEST) Received: (qmail 52340 invoked by uid 500); 20 Oct 2016 06:38:59 -0000 Mailing-List: contact dev-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list dev@pig.apache.org Received: (qmail 51969 invoked by uid 500); 20 Oct 2016 06:38:59 -0000 Delivered-To: apmail-hadoop-pig-dev@hadoop.apache.org Received: (qmail 51915 invoked by uid 99); 20 Oct 2016 06:38:59 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Oct 2016 06:38:59 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id AE8542C4C73 for ; Thu, 20 Oct 2016 06:38:59 +0000 (UTC) Date: Thu, 20 Oct 2016 06:38:59 +0000 (UTC) From: "liyunzhang_intel (JIRA)" To: pig-dev@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (PIG-4920) Fail to use Javascript UDF in spark yarn client mode MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 20 Oct 2016 06:39:02 -0000 [ https://issues.apache.org/jira/browse/PIG-4920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15590984#comment-15590984 ] liyunzhang_intel edited comment on PIG-4920 at 10/20/16 6:38 AM: ----------------------------------------------------------------- [~mohitsabharwal]: upload PIG-4920_6.patch,please help review. modification: 1. add more comments on SparkEngineConf#writeObject here explain more on following code: Here following code is {code} if (!UDFContext.getUDFContext().isUDFConfEmpty()) { ... } else { out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS)); out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS)); } {code} There are *2* threads will call SparkEngineConf#writeObject *main* thread: SparkLauncher#initialize-> SparkUtil#newJobConf-> ObjectSerializer#serialize-> SparkEngineConf#writeObject in main thread. UDFContext is not empty *dag-scheduler-event-loop* thread: DAGScheduler.submitMissingTasks->JavaSerializationStream.writeObject in dag-scheduler-event-loop: UDFContext is empty because UDFContext#getUDFContext is a thread local variable but at that time this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS) is not empty because we initialize them in main thread. the stacktrace of dag-scheduler-event-loop thread is like following: {code} dag-scheduler-event-loop@10914 daemon, prio=5, in group 'main', status: 'RUNNING' at org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf.writeObject(SparkEngineConf.java:84) at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1003) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) {code} was (Author: kellyzly): [~mohitsabharwal]: upload PIG-4920_6.patch, modification: 1. add more comments on SparkEngineConf#writeObject here explain more on following code: Here following code is {code} if (!UDFContext.getUDFContext().isUDFConfEmpty()) { ... } else { out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS)); out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS)); } {code} There are *2* threads will call SparkEngineConf#writeObject *main* thread: SparkLauncher#initialize-> SparkUtil#newJobConf-> ObjectSerializer#serialize-> SparkEngineConf#writeObject in main thread. UDFContext is not empty *dag-scheduler-event-loop* thread: DAGScheduler.submitMissingTasks->JavaSerializationStream.writeObject in dag-scheduler-event-loop: UDFContext is empty because UDFContext#getUDFContext is a thread local variable but at that time this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS) is not empty because we initialize them in main thread. the stacktrace of dag-scheduler-event-loop thread is like following: {code} dag-scheduler-event-loop@10914 daemon, prio=5, in group 'main', status: 'RUNNING' at org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf.writeObject(SparkEngineConf.java:84) at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468) at sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1003) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) {code} > Fail to use Javascript UDF in spark yarn client mode > ---------------------------------------------------- > > Key: PIG-4920 > URL: https://issues.apache.org/jira/browse/PIG-4920 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-4920.patch, PIG-4920_2.patch, PIG-4920_3.patch, PIG-4920_4.patch, PIG-4920_5.patch, PIG-4920_6.patch > > > udf.pig > {code} > register '/home/zly/prj/oss/merge.pig/pig/bin/udf.js' using javascript as myfuncs; > A = load './passwd' as (a0:chararray, a1:chararray); > B = foreach A generate myfuncs.helloworld(); > store B into './udf.out'; > {code} > udf.js > {code} > helloworld.outputSchema = "word:chararray"; > function helloworld() { > return 'Hello, World'; > } > > complex.outputSchema = "word:chararray"; > function complex(word){ > return {word:word}; > } > {code} > run udf.pig in spark local mode(export SPARK_MASTER="local"), it successfully. > run udf.pig in spark yarn client mode(export SPARK_MASTER="yarn-client"), it fails and error message like following: > {noformat} > Caused by: java.lang.reflect.InvocationTargetException > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:408) > at org.apache.pig.impl.PigContext.instantiateFuncFromSpec(PigContext.java:744) > ... 84 more > Caused by: java.lang.ExceptionInInitializerError > at org.apache.pig.scripting.js.JsScriptEngine.getInstance(JsScriptEngine.java:87) > at org.apache.pig.scripting.js.JsFunction.(JsFunction.java:173) > ... 89 more > Caused by: java.lang.IllegalStateException: could not get script path from UDFContext > at org.apache.pig.scripting.js.JsScriptEngine$Holder.(JsScriptEngine.java:69) > ... 91 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)