flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zentol <...@git.apache.org>
Subject [GitHub] incubator-flink pull request: [FLINK-337] [FLINK-671] Generic Inte...
Date Fri, 14 Nov 2014 14:37:57 GMT
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/202#discussion_r20361477
  
    --- Diff: flink-addons/flink-language-binding/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
---
    @@ -0,0 +1,376 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
    + * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the "License"); you may not use this
file except in compliance with the
    + * License. You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + */
    +package org.apache.flink.languagebinding.api.java.python;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.operators.GroupReduceOperator;
    +import org.apache.flink.api.java.operators.SortedGrouping;
    +import org.apache.flink.api.java.operators.UnsortedGrouping;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.languagebinding.api.java.common.PlanBinder;
    +import org.apache.flink.languagebinding.api.java.common.OperationInfo;
    +import org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo;
    +//CHECKSTYLE.OFF: AvoidStarImport - enum/function import
    +import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo.*;
    +import org.apache.flink.languagebinding.api.java.python.functions.*;
    +//CHECKSTYLE.ON: AvoidStarImport
    +import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
    +import org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
    +import org.apache.flink.runtime.filecache.FileCache;
    +
    +/**
    + * This class allows the execution of a Flink plan written in python.
    + */
    +public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
    +	public static final String FLINK_PYTHON_ID = "flink";
    +	public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py";
    +	public static final String FLINK_PYTHON_EXECUTOR_NAME = "/executor.py";
    +
    +	private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir")
+ "/flink_plan";
    +	private static final String FLINK_PYTHON_REL_LOCAL_PATH = "/resources/python";
    +	private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
    +
    +	private Process process;
    +
    +	/**
    +	 * Entry point for the execution of a python plan.
    +	 *
    +	 * @param args planPath [package1[packageX[|parameter1[parameterX]]]]
    +	 * @throws Exception
    +	 */
    +	public static void main(String[] args) throws Exception {
    +		PythonPlanBinder binder = new PythonPlanBinder();
    +		binder.go(args);
    +	}
    +
    +	private void go(String[] args) throws Exception {
    +		env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		int split = 0;
    +		for (int x = 0; x < args.length; x++) {
    +			if (args[x].compareTo("|") == 0) {
    +				split = x;
    +			}
    +		}
    +
    +		prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 : split));
    +		startPython(Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));
    +		receivePlan();
    +		distributeFiles(env);
    +
    +		env.execute();
    +		close();
    +	}
    +
    +	//=====Setup========================================================================================================
    +	private void prepareFiles(String... filePaths) throws IOException, URISyntaxException
{
    +		prepareFlinkPythonPackage();
    +
    +		String planPath = filePaths[0];
    +		if (planPath.endsWith("/")) {
    +			planPath = planPath.substring(0, planPath.length() - 1);
    +		}
    +		String tmpPlanPath = FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME;
    +		clearPath(tmpPlanPath);
    +		FileCache.copy(new Path(planPath), new Path(tmpPlanPath), false);
    +
    +		for (int x = 1; x < filePaths.length; x++) {
    +			copyFile(filePaths[x]);
    +		}
    +	}
    +
    +	private void startPython(String[] args) throws IOException {
    +		sets = new HashMap();
    +		StringBuilder argsBuilder = new StringBuilder();
    +		for (String arg : args) {
    +			argsBuilder.append(" ").append(arg);
    +		}
    +		receiver = new Receiver(null);
    +		receiver.open(null);
    +		process = Runtime.getRuntime().exec("python -B " + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME
+ argsBuilder.toString());
    +
    +		new StreamPrinter(process.getInputStream()).start();
    +		new StreamPrinter(process.getErrorStream()).start();
    +	}
    +
    +	private void close() throws IOException, URISyntaxException {
    +		FileSystem hdfs = FileSystem.get(new URI(FLINK_HDFS_PATH));
    +		hdfs.delete(new Path(FLINK_HDFS_PATH), true);
    +
    +		FileSystem local = FileSystem.getLocalFileSystem();
    +		local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
    +		local.delete(new Path(FLINK_TMP_DATA_DIR), true);
    +
    +		try {
    +			receiver.close();
    +		} catch (NullPointerException npe) {
    +		}
    +		process.destroy();
    +	}
    +
    +	public static void prepareFlinkPythonPackage() throws IOException, URISyntaxException
{
    +		String originalFilePath = FLINK_DIR.substring(0, FLINK_DIR.length() - 7) + FLINK_PYTHON_REL_LOCAL_PATH;
    +		String tempFilePath = FLINK_PYTHON_FILE_PATH;
    +		clearPath(tempFilePath);
    +		FileCache.copy(new Path(originalFilePath), new Path(tempFilePath), false);
    +	}
    +
    +	public static void prepareFlinkPythonPackage(String path) throws IOException {
    +		FileCache.copy(new Path(path), new Path(FLINK_PYTHON_FILE_PATH), true);
    +	}
    +
    +	public static void distributeFiles(ExecutionEnvironment env) throws IOException, URISyntaxException
{
    +		clearPath(FLINK_HDFS_PATH);
    +		FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new Path(FLINK_HDFS_PATH), true);
    +		env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_ID);
    +		clearPath(FLINK_PYTHON_FILE_PATH);
    +	}
    +
    +	private static void clearPath(String path) throws IOException, URISyntaxException {
    +		FileSystem fs = FileSystem.get(new URI(path));
    +		if (fs.exists(new Path(path))) {
    +			fs.delete(new Path(path), true);
    +		}
    +	}
    +
    +	public static String copyFile(String path) throws IOException, URISyntaxException {
    +		if (path.endsWith("/")) {
    +			path = path.substring(0, path.length() - 1);
    +		}
    +		String identifier = path.substring(path.lastIndexOf("/"));
    +		String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier;
    +		clearPath(tmpFilePath);
    +		FileCache.copy(new Path(path), new Path(tmpFilePath), true);
    +		return identifier;
    +	}
    +
    +	//=====Plan Binding=================================================================================================
    +	protected class PythonOperationInfo extends OperationInfo {
    +		protected static final int INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED = -1;
    +		protected static final int INFO_MODE_UDF_DOUBLE_KEYED_TYPED = 0;
    +		protected static final int INFO_MODE_UDF_DOUBLE_TYPED = 1;
    +		protected static final int INFO_MODE_UDF_SINGLE_TYPED = 2;
    +		protected static final int INFO_MODE_UDF_SINGLE_TYPED_COMBINE = 9;
    +		protected static final int INFO_MODE_UDF = 3;
    +		protected static final int INFO_MODE_GROUP = 4;
    +		protected static final int INFO_MODE_SORT = 5;
    +		protected static final int INFO_MODE_UNION = 6;
    +		protected static final int INFO_MODE_PROJECT = 7;
    +		protected static final int INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED = 8;
    +		protected String operator;
    +		protected String meta;
    +		protected boolean combine;
    +
    +		protected PythonOperationInfo(int mode) throws IOException {
    +			parentID = (Integer) receiver.getRecord();
    +			childID = (Integer) receiver.getRecord();
    +			switch (mode) {
    +				case INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED:
    +					keys1 = (Tuple) receiver.getRecord();
    +					keys2 = (Tuple) receiver.getRecord();
    +					otherID = (Integer) receiver.getRecord();
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					projectionKeys1 = (Tuple) receiver.getRecord();
    +					projectionKeys2 = (Tuple) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED:
    +					otherID = (Integer) receiver.getRecord();
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					projectionKeys1 = (Tuple) receiver.getRecord();
    +					projectionKeys2 = (Tuple) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF_DOUBLE_KEYED_TYPED:
    +					keys1 = (Tuple) receiver.getRecord();
    +					keys2 = (Tuple) receiver.getRecord();
    +					otherID = (Integer) receiver.getRecord();
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF_DOUBLE_TYPED:
    +					otherID = (Integer) receiver.getRecord();
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF_SINGLE_TYPED:
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF_SINGLE_TYPED_COMBINE:
    +					types = receiver.getRecord();
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					combine = (Boolean) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UDF:
    +					operator = (String) receiver.getRecord();
    +					meta = (String) receiver.getRecord();
    +					break;
    +				case INFO_MODE_GROUP:
    +					keys1 = (Tuple) receiver.getRecord();
    +					break;
    +				case INFO_MODE_SORT:
    +					field = (Integer) receiver.getRecord();
    +					order = (Integer) receiver.getRecord();
    +					break;
    +				case INFO_MODE_UNION:
    +					otherID = (Integer) receiver.getRecord();
    +					break;
    +				case INFO_MODE_PROJECT:
    +					keys1 = (Tuple) receiver.getRecord();
    +					types = receiver.getRecord();
    +					break;
    +			}
    +		}
    +	}
    +
    +	@Override
    +	protected PythonOperationInfo createOperationInfo(String identifier) throws IOException
{
    +		switch (Operations.valueOf(identifier.toUpperCase())) {
    +			case COGROUP:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_TYPED);
    +			case CROSS:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +			case CROSS_H:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +			case CROSS_T:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_PROJECTION_TYPED);
    +			case FILTER:
    +				return new PythonOperationInfo(INFO_MODE_UDF);
    +			case FLATMAP:
    +				return new PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED);
    +			case GROUPREDUCE:
    +				return new PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED_COMBINE);
    +			case JOIN:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +			case JOIN_H:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +			case JOIN_T:
    +				return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_PROJECTION_TYPED);
    +			case MAP:
    +				return new PythonOperationInfo(INFO_MODE_UDF_SINGLE_TYPED);
    +			case PROJECTION:
    +				return new PythonOperationInfo(INFO_MODE_PROJECT);
    +			case REDUCE:
    +				return new PythonOperationInfo(INFO_MODE_UDF);
    +			case GROUPBY:
    +				return new PythonOperationInfo(INFO_MODE_GROUP);
    +			case SORT:
    +				return new PythonOperationInfo(INFO_MODE_SORT);
    +			case UNION:
    +				return new PythonOperationInfo(INFO_MODE_UNION);
    +		}
    +		return new PythonOperationInfo(INFO_MODE_UDF_DOUBLE_KEYED_TYPED);
    +	}
    +
    +	@Override
    +	protected DataSet applyCoGroupOperation(DataSet op1, DataSet op2, int[] firstKeys, int[]
secondKeys, PythonOperationInfo info) {
    +		return op1.coGroup(op2).where(firstKeys).equalTo(secondKeys).with(new PythonCoGroup(info.operator,
info.types, info.meta));
    +	}
    +
    +	public static class PseudoKeySelector<X> implements KeySelector<X, Integer>
{
    +		@Override
    +		public Integer getKey(X value) throws Exception {
    +			return 0;
    +		}
    +	}
    +
    +	@Override
    +	protected DataSet applyCrossOperation(DataSet op1, DataSet op2, int mode, PythonOperationInfo
info) {
    +		switch (mode) {
    +			case 0:
    +				return op1.join(op2).where(new PseudoKeySelector()).equalTo(new PseudoKeySelector()).with(new
PythonCross(info.operator, info.types, info.meta));
    --- End diff --
    
    A Cross is implemented as a join where every pair matches. I don't know the implications
of doing it in such a hacky way. (The comparison overhead should be negligible considering
the current performance.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message