spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "lucas.gary@gmail.com" <lucas.g...@gmail.com>
Subject Re: How to pass sparkSession from driver to executor
Date Thu, 21 Sep 2017 15:44:17 GMT
I'm not sure what you're doing.  But I have in the past used spark to
consume a manifest file and then execute a .mapPartition on the result like
this:


def map_key_to_event(s3_events_data_lake):

    def _map_key_to_event(event_key_list, s3_client=test_stub):
        print("Events in list")
        start = time.time()

        return_list = []

        if s3_client is None:
            s3_client = boto3.Session().client('s3')

        for event_key in event_key_list:
          try:
            response = s3_client.get_object(Bucket=s3_events_data_lake,
Key=event_key)
            contents = response['Body'].read().decode('utf-8')
            entity = json.loads(contents)
            event_type = json.loads(entity["Message"])["type"]
            entity["Message"] = json.loads(entity["Message"])
            # json.dumps here because Spark doesn't have a good json
datatype.
            return_list.append((event_type, json.dumps(entity)))
          except Exception:
            print("Key: {k} did not yield a valid object:
{o}".format(k=event_key, o=contents))

        end = time.time()
        print('time elapsed:')
        print(end - start)

        return return_list

    return _map_key_to_event


pkeys = spark.context.parallelize(full_list_for_time_slice, 32)
print("partitions: ")
print(pkeys.getNumPartitions())
events = pkeys.mapPartitions(map_func)





In this case I'm loading heterogeneous json files with wildly different
schemas, then saving them into parquet file / event type (IE turning one
big heterogeneous dump into numerous smaller homogenous dumps)

I'm sure this isn't the only or even best way to do it.

The underlying issue is that you're trying to violate the programming
model.  The model in this case consists of telling the driver what you want
and then having the executors go do it.

Spark Context is a driver level abstraction, it kind of doesn't make sense
in the executor context, the executor is acting on behalf of the driver and
shouldn't need a back reference to it.  You'd end up with some interesting
execution graphs.

This is a common pattern in spark as far as I can tell.  IE calling a map
and and then doing something with the items in the executor, either
computing or enriching.  My case above is a bit weird and I'm not certain
it's the right mechanism in that I'm literally taking a manifest file and
turning it into 'n' actual records.

Also, if you're going to be constructing a connection string / jdbc call /
s3 client... You really don't want to use a straight .map(func).  You'll
end up instantiating a connection on every iteration.

Hope this is somewhat helpful.

Gary

On 21 September 2017 at 06:31, Weichen Xu <weichen.xu@databricks.com> wrote:

> Spark do not allow executor code using `sparkSession`.
> But I think you can move all json files to one directory, and them run:
>
> ```
> spark.read.json("/path/to/jsonFileDir")
> ```
> But if you want to get filename at the same time, you can use
> ```
> spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")...
> ```
>
> On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari <ferrarir@gmail.com>
> wrote:
>
>> Depends on your use-case however broadcasting
>> <https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables>
>> could be a better option.
>>
>> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu <
>> chaku.mitcs@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I want to know how to pass sparkSession from driver to executor.
>>>
>>> I have a spark program (batch job) which does following,
>>>
>>> #################
>>>
>>> val spark = SparkSession.builder().appName("SampleJob").config("spark.
>>> master", "local") .getOrCreate()
>>>
>>> val df = this is dataframe which has list of file names (hdfs)
>>>
>>> df.foreach { fileName =>
>>>
>>>       *spark.read.json(fileName)*
>>>
>>>       ...... some logic here....
>>> }
>>>
>>> #################
>>>
>>>
>>> *spark.read.json(fileName) --- this fails as it runs in executor. When I
>>> put it outside foreach, i.e. in driver, it works.*
>>>
>>> As I am trying to use spark (sparkSession) in executor which is not
>>> visible outside driver. But I want to read hdfs files inside foreach, how
>>> do I do it.
>>>
>>> Can someone help how to do this.
>>>
>>> Thanks,
>>> Chackra
>>>
>>
>>
>

Mime
View raw message