beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: [BEAM-2431] Add experimental python rpc direct runner
Date Wed, 09 Aug 2017 06:12:51 GMT
Repository: beam
Updated Branches:
  refs/heads/master 9ed2cf41f -> fb85d84dc


[BEAM-2431] Add experimental python rpc direct runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/76db0aa3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/76db0aa3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/76db0aa3

Branch: refs/heads/master
Commit: 76db0aa30c632296a6a882c012f9da2d21f775b5
Parents: 9ed2cf4
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Wed Aug 2 10:49:48 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Tue Aug 8 23:12:30 2017 -0700

----------------------------------------------------------------------
 .../runners/experimental/__init__.py            |  16 +++
 .../experimental/python_rpc_direct/__init__.py  |  22 ++++
 .../python_rpc_direct_runner.py                 | 111 +++++++++++++++++++
 .../experimental/python_rpc_direct/server.py    | 111 +++++++++++++++++++
 sdks/python/apache_beam/runners/job/__init__.py |  16 +++
 sdks/python/apache_beam/runners/job/manager.py  |  52 +++++++++
 sdks/python/apache_beam/runners/job/utils.py    |  32 ++++++
 sdks/python/apache_beam/runners/runner.py       |   6 +
 8 files changed, 366 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/experimental/__init__.py b/sdks/python/apache_beam/runners/experimental/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/runners/experimental/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
new file mode 100644
index 0000000..5d14030
--- /dev/null
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This is the experimental direct runner for testing the job api that
+sends a runner API proto over the API and then runs it on the other side.
+"""
+
+from apache_beam.runners.experimental.python_rpc_direct.python_rpc_direct_runner import PythonRPCDirectRunner

http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
new file mode 100644
index 0000000..247ce1f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A runner implementation that submits a job for remote execution.
+"""
+
+import logging
+import random
+import string
+
+import grpc
+
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.runners.job import utils as job_utils
+from apache_beam.runners.job.manager import DockerRPCManager
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.runners.runner import PipelineRunner
+
+
+__all__ = ['PythonRPCDirectRunner']
+
+
+class PythonRPCDirectRunner(PipelineRunner):
+  """Executes a single pipeline on the local machine inside a container."""
+
+  # A list of PTransformOverride objects to be applied before running a pipeline
+  # using DirectRunner.
+  # Currently this only works for overrides where the input and output types do
+  # not change.
+  # For internal SDK use only. This should not be updated by Beam pipeline
+  # authors.
+  _PTRANSFORM_OVERRIDES = []
+
+  def __init__(self):
+    self._cache = None
+
+  def run(self, pipeline):
+    """Remotely executes entire pipeline or parts reachable from node."""
+
+    # Performing configured PTransform overrides.
+    pipeline.replace_all(PythonRPCDirectRunner._PTRANSFORM_OVERRIDES)
+
+    # Start the RPC co-process
+    manager = DockerRPCManager()
+
+    # Submit the job to the RPC co-process
+    jobName = ('Job-' +
+               ''.join(random.choice(string.ascii_uppercase) for _ in range(6)))
+    options = {k: v for k, v in pipeline._options.get_all_options().iteritems()
+               if v is not None}
+
+    try:
+      response = manager.service.run(beam_job_api_pb2.SubmitJobRequest(
+          pipeline=pipeline.to_runner_api(),
+          pipelineOptions=job_utils.dict_to_struct(options),
+          jobName=jobName))
+
+      logging.info('Submitted a job with id: %s', response.jobId)
+
+      # Return the result object that references the manager instance
+      result = PythonRPCDirectPipelineResult(response.jobId, manager)
+      return result
+    except grpc.RpcError:
+      logging.error('Failed to run the job with name: %s', jobName)
+      raise
+
+
+class PythonRPCDirectPipelineResult(PipelineResult):
+  """Represents the state of a pipeline run on the Dataflow service."""
+
+  def __init__(self, job_id, job_manager):
+    self.job_id = job_id
+    self.manager = job_manager
+
+  @property
+  def state(self):
+    return self.manager.service.getState(
+        beam_job_api_pb2.GetJobStateRequest(jobId=self.job_id))
+
+  def wait_until_finish(self):
+    messages_request = beam_job_api_pb2.JobMessagesRequest(jobId=self.job_id)
+    for message in self.manager.service.getMessageStream(messages_request):
+      if message.HasField('stateResponse'):
+        logging.info(
+            'Current state of job: %s',
+            beam_job_api_pb2.JobState.JobStateType.Name(
+                message.stateResponse.state))
+      else:
+        logging.info('Message %s', message.messageResponse)
+    logging.info('Job with id: %s in terminal state now.', self.job_id)
+
+  def cancel(self):
+    return self.manager.service.cancel(
+        beam_job_api_pb2.CancelJobRequest(jobId=self.job_id))
+
+  def metrics(self):
+    raise NotImplementedError

http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
new file mode 100644
index 0000000..3addf92
--- /dev/null
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A runner implementation that submits a job for remote execution.
+"""
+from concurrent import futures
+import time
+import uuid
+
+import grpc
+
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import beam_job_api_pb2_grpc
+from apache_beam.pipeline import Pipeline
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+
+_ONE_DAY_IN_SECONDS = 60 * 60 * 24
+
+
+class JobService(beam_job_api_pb2_grpc.JobServiceServicer):
+
+  def __init__(self):
+    self.jobs = {}
+
+  def run(self, request, context):
+    job_id = uuid.uuid4().get_hex()
+    pipeline_result = Pipeline.from_runner_api(
+        request.pipeline,
+        'DirectRunner',
+        PipelineOptions()).run()
+    self.jobs[job_id] = pipeline_result
+    return beam_job_api_pb2.SubmitJobResponse(jobId=job_id)
+
+  def getState(self, request, context):
+    pipeline_result = self.jobs[request.jobId]
+    return beam_job_api_pb2.GetJobStateResponse(
+        state=self._map_state_to_jobState(pipeline_result.state))
+
+  def cancel(self, request, context):
+    pipeline_result = self.jobs[request.jobId]
+    pipeline_result.cancel()
+    return beam_job_api_pb2.CancelJobResponse(
+        state=self._map_state_to_jobState(pipeline_result.state))
+
+  def getMessageStream(self, request, context):
+    pipeline_result = self.jobs[request.jobId]
+    pipeline_result.wait_until_finish()
+    yield beam_job_api_pb2.JobMessagesResponse(
+        stateResponse=beam_job_api_pb2.GetJobStateResponse(
+            state=self._map_state_to_jobState(pipeline_result.state)))
+
+  def getStateStream(self, request, context):
+    context.set_details('Not Implemented for direct runner!')
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    return
+
+  @staticmethod
+  def _map_state_to_jobState(state):
+    if state == PipelineState.UNKNOWN:
+      return beam_job_api_pb2.JobState.UNKNOWN
+    elif state == PipelineState.STOPPED:
+      return beam_job_api_pb2.JobState.STOPPED
+    elif state == PipelineState.RUNNING:
+      return beam_job_api_pb2.JobState.RUNNING
+    elif state == PipelineState.DONE:
+      return beam_job_api_pb2.JobState.DONE
+    elif state == PipelineState.FAILED:
+      return beam_job_api_pb2.JobState.FAILED
+    elif state == PipelineState.CANCELLED:
+      return beam_job_api_pb2.JobState.CANCELLED
+    elif state == PipelineState.UPDATED:
+      return beam_job_api_pb2.JobState.UPDATED
+    elif state == PipelineState.DRAINING:
+      return beam_job_api_pb2.JobState.DRAINING
+    elif state == PipelineState.DRAINED:
+      return beam_job_api_pb2.JobState.DRAINED
+    else:
+      raise ValueError('Unknown pipeline state')
+
+
+def serve():
+  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+  beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(JobService(), server)
+
+  server.add_insecure_port('[::]:50051')
+  server.start()
+
+  try:
+    while True:
+      time.sleep(_ONE_DAY_IN_SECONDS)
+  except KeyboardInterrupt:
+    server.stop(0)
+
+
+if __name__ == '__main__':
+  serve()

http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/job/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/job/__init__.py b/sdks/python/apache_beam/runners/job/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/runners/job/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/job/manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/job/manager.py b/sdks/python/apache_beam/runners/job/manager.py
new file mode 100644
index 0000000..4d88a11
--- /dev/null
+++ b/sdks/python/apache_beam/runners/job/manager.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A object to control to the Job API Co-Process
+"""
+
+import logging
+import subprocess
+import time
+
+import grpc
+
+from apache_beam.portability.api import beam_job_api_pb2_grpc
+
+
+class DockerRPCManager(object):
+  """A native co-process to start a contianer that speaks the JobApi
+  """
+  def __init__(self, run_command=None):
+    # TODO(BEAM-2431): Change this to a docker container from a command.
+    self.process = subprocess.Popen(
+        ['python',
+         '-m',
+         'apache_beam.runners.experimental.python_rpc_direct.server'])
+
+    self.channel = grpc.insecure_channel('localhost:50051')
+    self.service = beam_job_api_pb2_grpc.JobServiceStub(self.channel)
+
+    # Sleep for 2 seconds for process to start completely
+    # This is just for the co-process and would be removed
+    # once we migrate to docker.
+    time.sleep(2)
+
+  def __del__(self):
+    """Terminate the co-process when the manager is GC'ed
+    """
+    logging.info('Shutting the co-process')
+    self.process.terminate()

http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/job/utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/job/utils.py b/sdks/python/apache_beam/runners/job/utils.py
new file mode 100644
index 0000000..84c727f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/job/utils.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utility functions for efficiently processing with the job API
+"""
+
+import json
+
+from google.protobuf import json_format
+from google.protobuf import struct_pb2
+
+
+def dict_to_struct(dict_obj):
+  return json_format.Parse(json.dumps(dict_obj), struct_pb2.Struct())
+
+
+def struct_to_dict(struct_obj):
+  return json.loads(json_format.MessageToJson(struct_obj))

http://git-wip-us.apache.org/repos/asf/beam/blob/76db0aa3/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index af00d8f..7ce9a03 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -41,7 +41,11 @@ _DIRECT_RUNNER_PATH = 'apache_beam.runners.direct.direct_runner.'
 _DATAFLOW_RUNNER_PATH = (
     'apache_beam.runners.dataflow.dataflow_runner.')
 _TEST_RUNNER_PATH = 'apache_beam.runners.test.'
+_PYTHON_RPC_DIRECT_RUNNER = (
+    'apache_beam.runners.experimental.python_rpc_direct.'
+    'python_rpc_direct_runner.')
 
+_KNOWN_PYTHON_RPC_DIRECT_RUNNER = ('PythonRPCDirectRunner',)
 _KNOWN_DIRECT_RUNNERS = ('DirectRunner', 'EagerRunner')
 _KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner',)
 _KNOWN_TEST_RUNNERS = ('TestDataflowRunner',)
@@ -51,6 +55,8 @@ _RUNNER_MAP.update(_get_runner_map(_KNOWN_DIRECT_RUNNERS,
                                    _DIRECT_RUNNER_PATH))
 _RUNNER_MAP.update(_get_runner_map(_KNOWN_DATAFLOW_RUNNERS,
                                    _DATAFLOW_RUNNER_PATH))
+_RUNNER_MAP.update(_get_runner_map(_KNOWN_PYTHON_RPC_DIRECT_RUNNER,
+                                   _PYTHON_RPC_DIRECT_RUNNER))
 _RUNNER_MAP.update(_get_runner_map(_KNOWN_TEST_RUNNERS,
                                    _TEST_RUNNER_PATH))
 


Mime
View raw message