apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pramod Immaneni <pra...@datatorrent.com>
Subject Re: [Discuss] Design of the python execution operator
Date Tue, 19 Dec 2017 16:19:21 GMT
Hi Ananth,

>From your explanation, it looks like the threads overall allow you to
achieve two things. Have some sort of overall timeout if by which a tuple
doesn't finish processing then it is flagged as such. Second, it doesn't
block processing of subsequent tuples and you can still process them
meeting the SLA. By checkpoint, however, I think you should try to have a
resolution one way or the other for all the tuples received within the
checkpoint period or every window boundary (see idempotency below),
otherwise, there is a chance of data loss in case of operator restarts. If
a loss is acceptable for stragglers you could let straggler processing
continue beyond checkpoint boundary and let them finish when they can. You
could support both behaviors by use of a property. Furthermore, you may not
want all threads stuck with stragglers and then you are back to square one
so you may need to stop processing stragglers beyond a certain thread usage
threshold. Is there a way to interrupt the processing of the engine?

Then there is the question of idempotency. I suspect it would be difficult
to maintain it unless you wait for processing to finish for all tuples
received during the window every window boundary. You may provide an option
for relaxing the strict guarantees for the stragglers like mentioned above.


On Thu, Dec 14, 2017 at 10:49 AM, Ananth G <ananthg.apex@gmail.com> wrote:

> Hello Pramod,
> Thanks for the comments. I adjusted the title of the JIRA. Here is what I
> was thinking for the worker pool implementation.
> - The main reason ( which I forgot to mention in the design points below )
> is that the java embedded engine allows only the thread that created the
> instance to execute the python logic. This is more because of the JNI
> specification itself. Some hints here https://stackoverflow.com/
> questions/18056347/jni-calling-java-from-c-with-multiple-threads <
> https://stackoverflow.com/questions/18056347/jni-calling-java-from-c-with-
> multiple-threads> and here http://journals.ecs.soton.ac.
> uk/java/tutorial/native1.1/implementing/sync.html <
> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/
> implementing/sync.html>
> - This essentially means that the main operator thread will have to call
> the python code execution logic if the design were otherwise.
> - Since the end user can choose to can write any kind of logic including
> blocking I/O as part of the implementation, I did not want to stall the
> operator thread for any usage pattern.
> - In fact there is only one overall interpreter in the JVM process space
> and the interpreter thread is just a JNI wrapper around it to account for
> the JNI limitations above.
> - It is for the very same reason, there is an API in the implementation to
> support for registering Shared Modules across all of the interpreter
> threads. Use cases for this exist when there is a global variable provided
> by the underlying Python library and loading it multiple times can cause
> issues. Hence the API to register a shared module which can be used by all
> of the Interpreter Threads.
> - The operator submits to a work request queue and consumes from a
> response queue for each of the interpreter thread. There exists one request
> and one response queue per interpreter thread.
> - The stragglers will get drained from the response queue for a previously
> submitted request queue.
> - The other reason why I chose to implement it this ways is also for some
> of the use case that I foresee in the ML scoring scenarios. In fraud
> systems, if I have a strict SLA to score a model, the main thread in the
> operator is not helping me implement this pattern at all. The caller to the
> Apex application will need to proceed if the scoring gets delayed due to
> whatever reason. However the scoring can continue on the interpreter thread
> and can be drained later ( It is just that the caller did not make use of
> this result but can still be persisted for operators consuming from the
> straggler port.
> - There are 3 output ports for this operator. DefaultOutputPort,
> stragglersPort and an errorPort.
> - Some libraries like Tensorflow can become really heavy. Tensorflow
> models can execute a tensorflow DAG as part of a model scoring
> implementation and hence I wanted to take the approach of a worker pool.
> Yes your point is valid if we wait for the stragglers to complete in a
> given window. The current implementation does not force to wait for all of
> the stragglers to complete. The stragglers are emitted only when there is a
> new tuple that is being processed. i.e. when a new tuple arrives for
> scoring , the straggler response queue is checked if there are any entries
> and if yes, the responses are emitted into the stragglerPort. This
> essentially means that there are situations when the straggler port is
> emitting the result for a request submitted in the previous window. This
> also implies that idempotency cannot be guaranteed across runs of the same
> input data. In fact all threaded implementations have this issue as
> ordering of the results is not guaranteed to be unique even within a given
> window ?
> I can enforce a block/drain at the end of the window to force a completion
> basing on the feedback.
> Regards,
> Ananth
> > On 15 Dec 2017, at 4:21 am, Pramod Immaneni <pramod@datatorrent.com>
> wrote:
> >
> > Hi Anath,
> >
> > Sounds interesting and looks like you have put quite a bit of work on it.
> > Might I suggest changing the title of 2260 to better fit your proposal
> and
> > implementation, mainly so that there is differentiation from 2261.
> >
> > I wanted to discuss the proposal to use multiple threads in an operator
> > instance. Unless the execution threads are blocking for some sort of i/o
> > why would it result in a noticeable performance difference compared to
> > processing in operator thread and running multiple partitions of the
> > operator in container local. By running the processing in a separate
> thread
> > from the operator lifecycle thread you don't still get away from matching
> > the incoming data throughput. The checkpoint will act as a time where you
> > backpressure will start to materialize when the operator would have to
> wait
> > for your background processing to complete to guarantee all data till the
> > checkpoint is processed.
> >
> > Thanks
> >
> >
> > On Thu, Dec 14, 2017 at 2:20 AM, Ananth G <ananthg.apex@gmail.com>
> wrote:
> >
> >> Hello All,
> >>
> >> I would like to submit the design for the Python execution operator
> before
> >> I raise the pull request so that I can refine the implementation based
> on
> >> feedback. Could you please provide feedback on the design if any and I
> will
> >> raise the PR accordingly.
> >>
> >> - This operator is for the JIRA ticket raised here
> >> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
> >> https://issues.apache.org/jira/browse/APEXMALHAR-2260>
> >> - The operator embeds a python interpreter in the operator JVM process
> >> space and is not external to the JVM.
> >> - The implementation is proposing the use of Java Embedded Python ( JEP
> )
> >> given here https://github.com/ninia/jep <https://github.com/ninia/jep>
> >> - The JEP engine is under zlib/libpng license. Since this is an approved
> >> license under https://www.apache.org/legal/resolved.html#category-a <
> >> https://www.apache.org/legal/resolved.html#category-a> I am assuming it
> >> is ok for the community to approve the inclusion of this library
> >> - Python integration is a messy piece due to the nature of dynamic
> >> libraries. All python libraries need to be natively installed. This also
> >> means we will not be able bundle python libraries and dependencies as
> part
> >> of the build into the target JVM container. Hence this operator has the
> >> current limitation of the python binaries installed through an external
> >> process on all of the YARN nodes for now.
> >> - The JEP maven dependency jar in the POM is a JNI wrapper around the
> >> dynamic library that is installed externally to the Apex installation
> >> process on all of the YARN nodes.
> >> - Hope to take up https://issues.apache.org/jira/browse/APEXCORE-796 <
> >> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this issue
> >> in the future.
> >> - The python operator implementation can be extended to py4J based
> >> implementation ( as opposed to in-memory model like JEP ) in the future
> if
> >> required be. JEP is the implementation based on an in-memory design
> pattern.
> >> - The python operator allows for 4 major API patterns
> >>    - Execute a method call by accepting parameters to pass to the
> >> interpreter
> >>    - Execute a python script as given in a file path
> >>    - Evaluate an expression and allows for passing of variables between
> >> the java code and the python in-memory interpreter bridge
> >>    - A handy method wherein a series of instructions can be passed in
> one
> >> single java call ( executed as a sequence of python eval instructions
> under
> >> the hood )
> >> - Automatic garbage collection of the variables that are passed from
> java
> >> code to the in memory python interpreter
> >> - Support for all major python libraries. Tensorflow, Keras, Scikit,
> >> xgboost. Preliminary tests for these libraries seem to work as per code
> >> here : https://github.com/ananthc/sampleapps/tree/master/apache-
> >> apex/apexjvmpython <https://github.com/ananthc/
> >> sampleapps/tree/master/apache-apex/apexjvmpython>
> >> - The implementation allows for SLA based execution model. i.e. the
> >> operator is given a chance to execute the python code and if not
> complete
> >> within a time out, the operator code returns back null.
> >> - A tuple that has become a straggler as per previous point will
> >> automatically be drained off to a different port so that downstream
> >> operators can still consume the straggler if they want to when the
> results
> >> arrive.
> >> - Because of the nature of python being an interpreter and if a previous
> >> tuple is being still processed, there is chance of a back pressure
> pattern
> >> building up very quickly. Hence this operator works on the concept of a
> >> worker pool. The Python operator uses a configurable number of worker
> >> thread each of which embed the Python interpreter within their
> processing
> >> space. i.e. it is in fact a collection of python ink memory interpreters
> >> inside the Python operator implementation.
> >> - The operator chooses one of the threads at runtime basing on their
> busy
> >> state thus allowing for back-pressure issues to be resolved
> automatically.
> >> - There is a first class support for Numpy in JEP. Java arrays would be
> >> convertible to the Python Numpy arrays and vice versa and share the same
> >> memory addresses for efficiency reasons.
> >> - The base operator implements dynamic partitioning based on a thread
> >> starvation policy. At each checkpoint, it checks how much percentage of
> the
> >> requests resulted in starved threads and if the starvation exceeds a
> >> configured percentage, a new instance of the operator is provisioned for
> >> every such instance of the operator
> >> - The operator provides the notion of a worker execution mode. There are
> >> two worker modes that are passed in each of the above calls from the
> user.
> >> ALL or ANY.  Because python interpreter is state based engine, a newly
> >> dynamically partitioned operator might not be in the exact state of the
> >> remaining operators. Hence the operator has this notion of worker
> execution
> >> mode. Any call ( any of the 4 calls mentioned above ) called with ALL
> >> execution mode will be executed on all the workers of the worker thread
> >> pool as well as the dynamically portioned instance whenever such an
> >> instance is provisioned.
> >> - The base operator implementation has a method that can be overridden
> to
> >> implement the logic that needs to be executed for each tuple. The base
> >> operator default implementation is a simple NO-OP.
> >> - The operator automatically picks up the least busy of the thread pool
> >> worker which has JEP embedded in it to execute the call.
> >> - The JEP based installation will not support non Cpython modules. All
> of
> >> the major python libraries are cpython based and hence I believe this
> is of
> >> a lesser concern. If we hit a roadblock when a new python library being
> a
> >> non-Cpython based library needs to be run, then we could implement the
> >> ApexPythonEngine interface to something like Py4J which involves
> >> interprocess communication.
> >> - The python operator requires the user to set the library path
> >> java.library.path for the operator to make use of the dynamic libraries
> of
> >> the corresponding platform. This has to be passed in as the JVM options.
> >> Failing to do so will result in the operator failing to load the
> >> interpreter properly.
> >> - The supported python versions are 2.7, 3.3 , 3.4 , 3.5 and 3.6. Numpy
> >=
> >> 1.7 is supported.
> >> - There is no support for virtual environments yet. In case of multiple
> >> python versions on the node, to include the right python version for the
> >> apex operator, ensure that the environment variables and the dynamic
> >> library path are set appropriately. This is a workaround and I hope
> >> APEXCORE-796 will solve this issue as well.
> >>
> >>
> >> Regards,
> >> Ananth
> >>
> >>

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message