spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pinela <pin...@gmail.com>
Subject Re: SparkStreaming variable scope
Date Wed, 09 Dec 2015 22:02:36 GMT
Hey Bryan,

Thank for the answer ;) I knew it was a basic python/spark-noob thing :)

this also worked

*def getRowID():*
* return datetime.now().strftime("%Y%m%d%H%M%S")*


and then calling getRowID() in the lambda, because the function gets sent
to the executor right?

Thanks again for the quick reply :)

All the best and Happy Holidays.
Jpinela.



On Wed, Dec 9, 2015 at 8:22 PM, Bryan Cutler <cutlerb@gmail.com> wrote:

> rowid from your code is a variable in the driver, so it will be evaluated
> once and then only the value is sent to words.map.  You probably want to
> have rowid be a lambda itself, so that it will get the value at the time it
> is evaluated.  For example if I have the following:
>
> >>> data = sc.parallelize([1,2,3])
> >>> from datetime import datetime
> >>> rowid = lambda: datetime.now().strftime("%Y%m%d%H%M%S")
> >>> data.map(lambda x: (rowid(), x))
> >>> mdata = data.map(lambda x: (rowid(), x))
> >>> mdata.collect()
> [('20151209121532', 1), ('20151209121532', 2), ('20151209121532', 3)]
> >>> mdata.collect()
> [('20151209121540', 1), ('20151209121540', 2), ('20151209121540', 3)]
>
> here rowid is evaluated whenever an action is called on the RDD, i.e.
> collect
>
> On Wed, Dec 9, 2015 at 10:23 AM, jpinela <pinela@gmail.com> wrote:
>
>> Hi Guys,
>> I am sure this is a simple question, but I can't find it in the docs
>> anywhere.
>> This reads from flume and writes to hbase (as you can see).
>> But has a variable scope problem (I believe).
>> I have the following code:
>>
>> *
>> from pyspark.streaming import StreamingContext
>> from pyspark.streaming.flume import FlumeUtils
>> from datetime import datetime
>> ssc = StreamingContext(sc, 5)
>> conf = {"hbase.zookeeper.quorum": "ubuntu3",
>>             "hbase.mapred.outputtable": "teste2",
>>             "mapreduce.outputformat.class":
>> "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
>>             "mapreduce.job.output.key.class":
>> "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
>>             "mapreduce.job.output.value.class":
>> "org.apache.hadoop.io.Writable"}
>>
>>
>> keyConv =
>>
>> "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
>> valueConv =
>> "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
>>
>> lines = FlumeUtils.createStream(ssc, 'ubuntu3', 9997)
>> words = lines.map(lambda line: line[1])
>> rowid = datetime.now().strftime("%Y%m%d%H%M%S")
>> outrdd= words.map(lambda x: (str(1),[rowid,"cf1desc","col1",x]))
>> print("ok 1")
>> outrdd.pprint()
>>
>> outrdd.foreachRDD(lambda x:
>>
>> x.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv))
>>
>> ssc.start()
>> ssc.awaitTermination()*
>>
>> the issue is that the rowid variable is allways at the point that the
>> streaming was began.
>> How can I go around this? I tried a function, an application, nothing
>> worked.
>> Thank you.
>> jp
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-variable-scope-tp25652.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Mime
View raw message