Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D90F11809E for ; Wed, 20 Jan 2016 13:51:20 +0000 (UTC) Received: (qmail 89684 invoked by uid 500); 20 Jan 2016 13:51:20 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 89639 invoked by uid 500); 20 Jan 2016 13:51:20 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 89566 invoked by uid 99); 20 Jan 2016 13:51:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jan 2016 13:51:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 84F2CDFF93; Wed, 20 Jan 2016 13:51:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chesnay@apache.org To: commits@flink.apache.org Date: Wed, 20 Jan 2016 13:51:21 -0000 Message-Id: <11d8db055f834065a158e449ab8c02aa@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] flink git commit: [FLINK-2501] [py] Remove the need to specify types for transformations http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java new file mode 100644 index 0000000..6c83a61 --- /dev/null +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.python.api.streaming.util; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.python.api.types.CustomTypeWrapper; + +public class SerializationUtils { + public static final byte TYPE_BOOLEAN = (byte) 34; + public static final byte TYPE_BYTE = (byte) 33; + public static final byte TYPE_INTEGER = (byte) 32; + public static final byte TYPE_LONG = (byte) 31; + public static final byte TYPE_DOUBLE = (byte) 30; + public static final byte TYPE_FLOAT = (byte) 29; + public static final byte TYPE_STRING = (byte) 28; + public static final byte TYPE_BYTES = (byte) 27; + public static final byte TYPE_NULL = (byte) 26; + + private enum SupportedTypes { + TUPLE, BOOLEAN, BYTE, BYTES, INTEGER, LONG, FLOAT, DOUBLE, STRING, NULL, CUSTOMTYPEWRAPPER + } + + public static Serializer getSerializer(Object value) { + String className = value.getClass().getSimpleName().toUpperCase(); + if (className.startsWith("TUPLE")) { + className = "TUPLE"; + } + if (className.startsWith("BYTE[]")) { + className = "BYTES"; + } + SupportedTypes type = SupportedTypes.valueOf(className); + switch (type) { + case TUPLE: + return new TupleSerializer((Tuple) value); + case BOOLEAN: + return new BooleanSerializer(); + case BYTE: + return new ByteSerializer(); + case BYTES: + return new BytesSerializer(); + case INTEGER: + return new IntSerializer(); + case LONG: + return new LongSerializer(); + case STRING: + return new StringSerializer(); + case FLOAT: + return new FloatSerializer(); + case DOUBLE: + return new DoubleSerializer(); + case NULL: + return new NullSerializer(); + case CUSTOMTYPEWRAPPER: + return new CustomTypeWrapperSerializer((CustomTypeWrapper) value); + default: + throw new IllegalArgumentException("Unsupported Type encountered: " + type); + } + } + + public static abstract class Serializer { + private byte[] typeInfo = null; + + public byte[] serialize(IN value) { + if (typeInfo == null) { + typeInfo = new byte[getTypeInfoSize()]; + ByteBuffer typeBuffer = ByteBuffer.wrap(typeInfo); + putTypeInfo(typeBuffer); + } + byte[] bytes = serializeWithoutTypeInfo(value); + byte[] total = new byte[typeInfo.length + bytes.length]; + ByteBuffer.wrap(total).put(typeInfo).put(bytes); + return total; + } + + public abstract byte[] serializeWithoutTypeInfo(IN value); + + protected abstract void putTypeInfo(ByteBuffer buffer); + + protected int getTypeInfoSize() { + return 1; + } + } + + public static class CustomTypeWrapperSerializer extends Serializer { + private final byte type; + + public CustomTypeWrapperSerializer(CustomTypeWrapper value) { + this.type = value.getType(); + } + + @Override + public byte[] serializeWithoutTypeInfo(CustomTypeWrapper value) { + byte[] result = new byte[4 + value.getData().length]; + ByteBuffer.wrap(result).putInt(value.getData().length).put(value.getData()); + return result; + } + + @Override + public void putTypeInfo(ByteBuffer buffer) { + buffer.put(type); + } + } + + public static class ByteSerializer extends Serializer { + @Override + public byte[] serializeWithoutTypeInfo(Byte value) { + return new byte[]{value}; + } + + @Override + public void putTypeInfo(ByteBuffer buffer) { + buffer.put(TYPE_BYTE); + } + } + + public static class BooleanSerializer extends Serializer { + @Override + public byte[] serializeWithoutTypeInfo(Boolean value) { + return new byte[]{value ? (byte) 1 : (byte) 0}; + } + + @Override + public void putTypeInfo(ByteBuffer buffer) { + buffer.put(TYPE_BOOLEAN); + } + } + + public static class IntSerializer extends Serializer { + @Override + public byte[] serializeWithoutTypeInfo(Integer value) { + byte[] data = new byte[4]; + ByteBuffer.wrap(data).putInt(value); + return data; + } + + @Override + public void putTypeInfo(ByteBuffer buffer) { + buffer.put(TYPE_INTEGER); + } + } + + public static class LongSerializer extends Serializer { + @Override + public byte[] serializeWithoutTypeInfo(Long value) { + byte[] data = new byte[8]; + ByteBuffer.wrap(data).putLong(value); + return data; + } + + @Override + public void putTypeInfo(ByteBuffer buffer) { + buffer.put(TYPE_LONG); + } + } + + public static class StringSerializer extends Serializer { + @Override + public byte[] serializeWithoutTypeInfo(String value) { + byte[] string = value.getBytes(); + byte[] data = new byte[4 + string.length]; + ByteBuffer.wrap(data).putInt(string.length).put(string); + return data; + } + + @Override + public void putTypeInfo(ByteBuffer buffer) { + buffer.put(TYPE_STRING); + } + } + + public static class FloatSerializer extends Serializer { + @Override + public byte[] serializeWithoutTypeInfo(Float value) { + byte[] data = new byte[4]; + ByteBuffer.wrap(data).putFloat(value); + return data; + } + + @Override + public void putTypeInfo(ByteBuffer buffer) { + buffer.put(TYPE_FLOAT); + } + } + + public static class DoubleSerializer extends Serializer { + @Override + public byte[] serializeWithoutTypeInfo(Double value) { + byte[] data = new byte[8]; + ByteBuffer.wrap(data).putDouble(value); + return data; + } + + @Override + public void putTypeInfo(ByteBuffer buffer) { + buffer.put(TYPE_DOUBLE); + } + } + + public static class NullSerializer extends Serializer { + @Override + public byte[] serializeWithoutTypeInfo(Object value) { + return new byte[0]; + } + + @Override + public void putTypeInfo(ByteBuffer buffer) { + buffer.put(TYPE_NULL); + } + } + + public static class BytesSerializer extends Serializer { + @Override + public byte[] serializeWithoutTypeInfo(byte[] value) { + byte[] data = new byte[4 + value.length]; + ByteBuffer.wrap(data).putInt(value.length).put(value); + return data; + } + + @Override + public void putTypeInfo(ByteBuffer buffer) { + buffer.put(TYPE_BYTES); + } + } + + public static class TupleSerializer extends Serializer { + private final Serializer[] serializer; + + public TupleSerializer(Tuple value) { + serializer = new Serializer[value.getArity()]; + for (int x = 0; x < serializer.length; x++) { + serializer[x] = getSerializer(value.getField(x)); + } + } + + @Override + public byte[] serializeWithoutTypeInfo(Tuple value) { + ArrayList bits = new ArrayList(); + + int totalSize = 0; + for (int x = 0; x < serializer.length; x++) { + byte[] bit = serializer[x].serializeWithoutTypeInfo(value.getField(x)); + bits.add(bit); + totalSize += bit.length; + } + int pointer = 0; + byte[] data = new byte[totalSize]; + for (byte[] bit : bits) { + System.arraycopy(bit, 0, data, pointer, bit.length); + pointer += bit.length; + } + return data; + } + + @Override + public void putTypeInfo(ByteBuffer buffer) { + buffer.put((byte) serializer.length); + for (Serializer s : serializer) { + s.putTypeInfo(buffer); + } + } + + @Override + public int getTypeInfoSize() { + int size = 1; + for (Serializer s : serializer) { + size += s.getTypeInfoSize(); + } + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py index b5674b9..4c0db32 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Collector.py @@ -16,11 +16,11 @@ # limitations under the License. ################################################################################ from struct import pack -import sys - from flink.connection.Constants import Types -from flink.plan.Constants import _Dummy + +#=====Compatibility==================================================================================================== +import sys PY2 = sys.version_info[0] == 2 PY3 = sys.version_info[0] == 3 @@ -30,57 +30,128 @@ else: stringtype = str +#=====Collector======================================================================================================== class Collector(object): - def __init__(self, con, env): + def __init__(self, con, env, info): self._connection = con self._serializer = None self._env = env + self._as_array = isinstance(info.types, bytearray) def _close(self): self._connection.send_end_signal() def collect(self, value): - self._serializer = _get_serializer(self._connection.write, value, self._env._types) + self._serializer = ArraySerializer(value, self._env._types) if self._as_array else KeyValuePairSerializer(value, self._env._types) self.collect = self._collect self.collect(value) def _collect(self, value): - self._connection.write(self._serializer.serialize(value)) + serialized_value = self._serializer.serialize(value) + self._connection.write(serialized_value) + + +class PlanCollector(object): + def __init__(self, con, env): + self._connection = con + self._env = env + + def _close(self): + self._connection.send_end_signal() + + def collect(self, value): + type = _get_type_info(value, self._env._types) + serializer = _get_serializer(value, self._env._types) + self._connection.write(b"".join([type, serializer.serialize(value)])) + + +#=====Serializer======================================================================================================= +class Serializer(object): + def serialize(self, value): + pass + + +class KeyValuePairSerializer(Serializer): + def __init__(self, value, custom_types): + self._typeK = [_get_type_info(key, custom_types) for key in value[0]] + self._typeV = _get_type_info(value[1], custom_types) + self._typeK_length = [len(type) for type in self._typeK] + self._typeV_length = len(self._typeV) + self._serializerK = [_get_serializer(key, custom_types) for key in value[0]] + self._serializerV = _get_serializer(value[1], custom_types) + + def serialize(self, value): + bits = [pack(">i", len(value[0]))[3:4]] + for i in range(len(value[0])): + x = self._serializerK[i].serialize(value[0][i]) + bits.append(pack(">i", len(x) + self._typeK_length[i])) + bits.append(self._typeK[i]) + bits.append(x) + v = self._serializerV.serialize(value[1]) + bits.append(pack(">i", len(v) + self._typeV_length)) + bits.append(self._typeV) + bits.append(v) + return b"".join(bits) + +class ArraySerializer(Serializer): + def __init__(self, value, custom_types): + self._type = _get_type_info(value, custom_types) + self._type_length = len(self._type) + self._serializer = _get_serializer(value, custom_types) -def _get_serializer(write, value, custom_types): + def serialize(self, value): + serialized_value = self._serializer.serialize(value) + return b"".join([pack(">i", len(serialized_value) + self._type_length), self._type, serialized_value]) + + +def _get_type_info(value, custom_types): if isinstance(value, (list, tuple)): - write(Types.TYPE_TUPLE) - write(pack(">I", len(value))) - return TupleSerializer(write, value, custom_types) + return b"".join([pack(">i", len(value))[3:4], b"".join([_get_type_info(field, custom_types) for field in value])]) + elif value is None: + return Types.TYPE_NULL + elif isinstance(value, stringtype): + return Types.TYPE_STRING + elif isinstance(value, bool): + return Types.TYPE_BOOLEAN + elif isinstance(value, int) or PY2 and isinstance(value, long): + return Types.TYPE_LONG + elif isinstance(value, bytearray): + return Types.TYPE_BYTES + elif isinstance(value, float): + return Types.TYPE_DOUBLE + else: + for entry in custom_types: + if isinstance(value, entry[1]): + return entry[0] + raise Exception("Unsupported Type encountered.") + + +def _get_serializer(value, custom_types): + if isinstance(value, (list, tuple)): + return TupleSerializer(value, custom_types) elif value is None: - write(Types.TYPE_NULL) return NullSerializer() elif isinstance(value, stringtype): - write(Types.TYPE_STRING) return StringSerializer() elif isinstance(value, bool): - write(Types.TYPE_BOOLEAN) return BooleanSerializer() elif isinstance(value, int) or PY2 and isinstance(value, long): - write(Types.TYPE_LONG) return LongSerializer() elif isinstance(value, bytearray): - write(Types.TYPE_BYTES) return ByteArraySerializer() elif isinstance(value, float): - write(Types.TYPE_DOUBLE) return FloatSerializer() else: for entry in custom_types: if isinstance(value, entry[1]): - write(entry[0]) - return CustomTypeSerializer(entry[2]) + return CustomTypeSerializer(entry[0], entry[2]) raise Exception("Unsupported Type encountered.") -class CustomTypeSerializer(object): - def __init__(self, serializer): +class CustomTypeSerializer(Serializer): + def __init__(self, id, serializer): + self._id = id self._serializer = serializer def serialize(self, value): @@ -88,9 +159,9 @@ class CustomTypeSerializer(object): return b"".join([pack(">i",len(msg)), msg]) -class TupleSerializer(object): - def __init__(self, write, value, custom_types): - self.serializer = [_get_serializer(write, field, custom_types) for field in value] +class TupleSerializer(Serializer): + def __init__(self, value, custom_types): + self.serializer = [_get_serializer(field, custom_types) for field in value] def serialize(self, value): bits = [] @@ -99,83 +170,33 @@ class TupleSerializer(object): return b"".join(bits) -class BooleanSerializer(object): +class BooleanSerializer(Serializer): def serialize(self, value): return pack(">?", value) -class FloatSerializer(object): +class FloatSerializer(Serializer): def serialize(self, value): return pack(">d", value) -class LongSerializer(object): +class LongSerializer(Serializer): def serialize(self, value): return pack(">q", value) -class ByteArraySerializer(object): +class ByteArraySerializer(Serializer): def serialize(self, value): value = bytes(value) return pack(">I", len(value)) + value -class StringSerializer(object): +class StringSerializer(Serializer): def serialize(self, value): value = value.encode("utf-8") return pack(">I", len(value)) + value -class NullSerializer(object): +class NullSerializer(Serializer): def serialize(self, value): - return b"" - - -class TypedCollector(object): - def __init__(self, con, env): - self._connection = con - self._env = env - - def collect(self, value): - if not isinstance(value, (list, tuple)): - self._send_field(value) - else: - self._connection.write(Types.TYPE_TUPLE) - meta = pack(">I", len(value)) - self._connection.write(bytes([meta[3]]) if PY3 else meta[3]) - for field in value: - self.collect(field) - - def _send_field(self, value): - if value is None: - self._connection.write(Types.TYPE_NULL) - elif isinstance(value, stringtype): - value = value.encode("utf-8") - size = pack(">I", len(value)) - self._connection.write(b"".join([Types.TYPE_STRING, size, value])) - elif isinstance(value, bytes): - size = pack(">I", len(value)) - self._connection.write(b"".join([Types.TYPE_BYTES, size, value])) - elif isinstance(value, bool): - data = pack(">?", value) - self._connection.write(b"".join([Types.TYPE_BOOLEAN, data])) - elif isinstance(value, int) or PY2 and isinstance(value, long): - data = pack(">q", value) - self._connection.write(b"".join([Types.TYPE_LONG, data])) - elif isinstance(value, float): - data = pack(">d", value) - self._connection.write(b"".join([Types.TYPE_DOUBLE, data])) - elif isinstance(value, bytearray): - value = bytes(value) - size = pack(">I", len(value)) - self._connection.write(b"".join([Types.TYPE_BYTES, size, value])) - elif isinstance(value, _Dummy): - self._connection.write(pack(">i", 127)[3:]) - self._connection.write(pack(">i", 0)) - else: - for entry in self._env._types: - if isinstance(value, entry[1]): - self._connection.write(entry[0]) - self._connection.write(CustomTypeSerializer(entry[2]).serialize(value)) - return - raise Exception("Unsupported Type encountered.") \ No newline at end of file + return b"" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py index 680f495..293f5e9 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py @@ -115,6 +115,8 @@ class BufferingTCPMappedFileConnection(object): self._read_buffer() old_offset = self._input_offset self._input_offset += des_size + if self._input_offset > self._input_size: + raise Exception("BufferUnderFlowException") return self._input[old_offset:self._input_offset] def _read_buffer(self): @@ -140,6 +142,12 @@ class BufferingTCPMappedFileConnection(object): self._input_offset = 0 self._input = b"" + def read_secondary(self, des_size): + return recv_all(self._socket, des_size) + + def write_secondary(self, data): + self._socket.send(data) + class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection): def __init__(self, input_file, output_file, port): http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Constants.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Constants.py index 0ca2232..01a16fa 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Constants.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Constants.py @@ -18,14 +18,15 @@ class Types(object): - TYPE_TUPLE = b'\x0B' - TYPE_BOOLEAN = b'\x0A' - TYPE_BYTE = b'\x09' - TYPE_SHORT = b'\x08' - TYPE_INTEGER = b'\x07' - TYPE_LONG = b'\x06' - TYPE_DOUBLE = b'\x04' - TYPE_FLOAT = b'\x05' - TYPE_STRING = b'\x02' - TYPE_NULL = b'\x00' - TYPE_BYTES = b'\x01' + TYPE_ARRAY = b'\x3F' + TYPE_KEY_VALUE = b'\x3E' + TYPE_VALUE_VALUE = b'\x3D' + TYPE_BOOLEAN = b'\x22' + TYPE_BYTE = b'\x21' + TYPE_INTEGER = b'\x20' + TYPE_LONG = b'\x1F' + TYPE_DOUBLE = b'\x1E' + TYPE_FLOAT = b'\x1D' + TYPE_STRING = b'\x1C' + TYPE_BYTES = b'\x1B' + TYPE_NULL = b'\x1A' http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py index 3425cfa..2532194 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py @@ -26,6 +26,7 @@ except: from flink.connection.Constants import Types +#=====Iterator========================================================================================================== class ListIterator(defIter.Iterator): def __init__(self, values): super(ListIterator, self).__init__() @@ -76,7 +77,7 @@ class GroupIterator(defIter.Iterator): else: self.cur = None self.empty = True - return tmp + return tmp[1] else: raise StopIteration @@ -93,7 +94,7 @@ class GroupIterator(defIter.Iterator): self.empty = False def _extract_keys(self, x): - return [x[k] for k in self.keys] + return [x[0][k] for k in self.keys] def _extract_keys_id(self, x): return x @@ -175,6 +176,7 @@ class Iterator(defIter.Iterator): self._group = group self._deserializer = None self._env = env + self._size = 0 def __next__(self): return self.next() @@ -184,8 +186,35 @@ class Iterator(defIter.Iterator): def next(self): if self.has_next(): + custom_types = self._env._types + read = self._read if self._deserializer is None: - self._deserializer = _get_deserializer(self._group, self._connection.read, self._env._types) + type = read(1) + if type == Types.TYPE_ARRAY: + key_des = _get_deserializer(read, custom_types) + self._deserializer = ArrayDeserializer(key_des) + return key_des.deserialize(read) + elif type == Types.TYPE_KEY_VALUE: + size = ord(read(1)) + key_des = [] + keys = [] + for _ in range(size): + new_d = _get_deserializer(read, custom_types) + key_des.append(new_d) + keys.append(new_d.deserialize(read)) + val_des = _get_deserializer(read, custom_types) + val = val_des.deserialize(read) + self._deserializer = KeyValueDeserializer(key_des, val_des) + return (tuple(keys), val) + elif type == Types.TYPE_VALUE_VALUE: + des1 = _get_deserializer(read, custom_types) + field1 = des1.deserialize(read) + des2 = _get_deserializer(read, custom_types) + field2 = des2.deserialize(read) + self._deserializer = ValueValueDeserializer(des1, des2) + return (field1, field2) + else: + raise Exception("Invalid type ID encountered: " + str(ord(type))) return self._deserializer.deserialize(self._read) else: raise StopIteration @@ -197,6 +226,16 @@ class Iterator(defIter.Iterator): self._deserializer = None +class PlanIterator(object): + def __init__(self, con, env): + self._connection = con + self._env = env + + def next(self): + deserializer = _get_deserializer(self._connection.read, self._env._types) + return deserializer.deserialize(self._connection.read) + + class DummyIterator(Iterator): def __init__(self): super(Iterator, self).__init__() @@ -211,12 +250,11 @@ class DummyIterator(Iterator): return False -def _get_deserializer(group, read, custom_types, type=None): - if type is None: - type = read(1, group) - return _get_deserializer(group, read, custom_types, type) - elif type == Types.TYPE_TUPLE: - return TupleDeserializer(read, group, custom_types) +#=====Deserializer====================================================================================================== +def _get_deserializer(read, custom_types): + type = read(1) + if 0 < ord(type) < 26: + return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))]) elif type == Types.TYPE_BYTE: return ByteDeserializer() elif type == Types.TYPE_BYTES: @@ -238,101 +276,125 @@ def _get_deserializer(group, read, custom_types, type=None): else: for entry in custom_types: if type == entry[0]: - return entry[3] - raise Exception("Unable to find deserializer for type ID " + str(type)) + return CustomTypeDeserializer(entry[3]) + raise Exception("Unable to find deserializer for type ID " + str(ord(type))) -class TupleDeserializer(object): - def __init__(self, read, group, custom_types): - size = unpack(">I", read(4, group))[0] - self.deserializer = [_get_deserializer(group, read, custom_types) for _ in range(size)] +class Deserializer(object): + def get_type_info_size(self): + return 1 def deserialize(self, read): - return tuple([s.deserialize(read) for s in self.deserializer]) + pass + +class ArrayDeserializer(Deserializer): + def __init__(self, deserializer): + self._deserializer = deserializer + self._d_skip = deserializer.get_type_info_size() -class ByteDeserializer(object): + def deserialize(self, read): + read(1) #array type + read(self._d_skip) + return self._deserializer.deserialize(read) + + +class KeyValueDeserializer(Deserializer): + def __init__(self, key_deserializer, value_deserializer): + self._key_deserializer = [(k, k.get_type_info_size()) for k in key_deserializer] + self._value_deserializer = value_deserializer + self._value_deserializer_skip = value_deserializer.get_type_info_size() + + def deserialize(self, read): + fields = [] + read(1) #key value type + read(1) #key count + for dk in self._key_deserializer: + read(dk[1]) + fields.append(dk[0].deserialize(read)) + dv = self._value_deserializer + read(self._value_deserializer_skip) + return (tuple(fields), dv.deserialize(read)) + + +class ValueValueDeserializer(Deserializer): + def __init__(self, d1, d2): + self._d1 = d1 + self._d1_skip = self._d1.get_type_info_size() + self._d2 = d2 + self._d2_skip = self._d2.get_type_info_size() + + def deserialize(self, read): + read(1) + read(self._d1_skip) + f1 = self._d1.deserialize(read) + read(self._d2_skip) + f2 = self._d2.deserialize(read) + return (f1, f2) + + +class CustomTypeDeserializer(Deserializer): + def __init__(self, deserializer): + self._deserializer = deserializer + + def deserialize(self, read): + read(4) #discard binary size + return self._deserializer.deserialize(read) + + +class TupleDeserializer(Deserializer): + def __init__(self, deserializer): + self._deserializer = deserializer + + def get_type_info_size(self): + return 1 + sum([d.get_type_info_size() for d in self._deserializer]) + + def deserialize(self, read): + return tuple([s.deserialize(read) for s in self._deserializer]) + + +class ByteDeserializer(Deserializer): def deserialize(self, read): return unpack(">c", read(1))[0] -class ByteArrayDeserializer(object): +class ByteArrayDeserializer(Deserializer): def deserialize(self, read): size = unpack(">i", read(4))[0] return bytearray(read(size)) if size else bytearray(b"") -class BooleanDeserializer(object): +class BooleanDeserializer(Deserializer): def deserialize(self, read): return unpack(">?", read(1))[0] -class FloatDeserializer(object): +class FloatDeserializer(Deserializer): def deserialize(self, read): return unpack(">f", read(4))[0] -class DoubleDeserializer(object): +class DoubleDeserializer(Deserializer): def deserialize(self, read): return unpack(">d", read(8))[0] -class IntegerDeserializer(object): +class IntegerDeserializer(Deserializer): def deserialize(self, read): return unpack(">i", read(4))[0] -class LongDeserializer(object): +class LongDeserializer(Deserializer): def deserialize(self, read): return unpack(">q", read(8))[0] -class StringDeserializer(object): +class StringDeserializer(Deserializer): def deserialize(self, read): length = unpack(">i", read(4))[0] return read(length).decode("utf-8") if length else "" -class NullDeserializer(object): - def deserialize(self): +class NullDeserializer(Deserializer): + def deserialize(self, read): return None - - -class TypedIterator(object): - def __init__(self, con, env): - self._connection = con - self._env = env - - def next(self): - read = self._connection.read - type = read(1) - if type == Types.TYPE_TUPLE: - size = unpack(">i", read(4))[0] - return tuple([self.next() for x in range(size)]) - elif type == Types.TYPE_BYTE: - return unpack(">c", read(1))[0] - elif type == Types.TYPE_BYTES: - size = unpack(">i", read(4))[0] - return bytearray(read(size)) if size else bytearray(b"") - elif type == Types.TYPE_BOOLEAN: - return unpack(">?", read(1))[0] - elif type == Types.TYPE_FLOAT: - return unpack(">f", read(4))[0] - elif type == Types.TYPE_DOUBLE: - return unpack(">d", read(8))[0] - elif type == Types.TYPE_INTEGER: - return unpack(">i", read(4))[0] - elif type == Types.TYPE_LONG: - return unpack(">q", read(8))[0] - elif type == Types.TYPE_STRING: - length = unpack(">i", read(4))[0] - return read(length).decode("utf-8") if length else "" - elif type == Types.TYPE_NULL: - return None - else: - for entry in self._env._types: - if type == entry[0]: - return entry[3]() - raise Exception("Unable to find deserializer for type ID " + str(type)) - - http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py index cc9e7cf..c107987 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py @@ -76,7 +76,7 @@ if __name__ == "__main__": STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING], '\n', '|') \ .project(0,5,6,8) \ .filter(LineitemFilter()) \ - .map(ComputeRevenue(), [INT, FLOAT]) + .map(ComputeRevenue()) nation = env \ .read_csv(sys.argv[4], [INT, STRING, INT, STRING], '\n', '|') \ http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py index 3eb72c9..aaa4e55 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py @@ -87,13 +87,13 @@ if __name__ == "__main__": .join(order) \ .where(0) \ .equal_to(1) \ - .using(CustomerOrderJoin(),[INT, FLOAT, STRING, INT]) + .using(CustomerOrderJoin()) result = customerWithOrder \ .join(lineitem) \ .where(0) \ .equal_to(0) \ - .using(CustomerOrderLineitemJoin(), [INT, FLOAT, STRING, INT]) \ + .using(CustomerOrderLineitemJoin()) \ .group_by(0, 2, 3) \ .reduce(SumReducer()) http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py index b1b3ef4..f13cc04 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ from flink.plan.Environment import get_environment -from flink.plan.Constants import INT, Order +from flink.plan.Constants import Order from flink.functions.FlatMapFunction import FlatMapFunction from flink.functions.GroupReduceFunction import GroupReduceFunction from flink.functions.ReduceFunction import ReduceFunction @@ -123,27 +123,27 @@ if __name__ == "__main__": (1, 2), (1, 3), (1, 4), (1, 5), (2, 3), (2, 5), (3, 4), (3, 7), (3, 8), (5, 6), (7, 8)) edges_with_degrees = edges \ - .flat_map(EdgeDuplicator(), [INT, INT]) \ + .flat_map(EdgeDuplicator()) \ .group_by(0) \ .sort_group(1, Order.ASCENDING) \ - .reduce_group(DegreeCounter(), [INT, INT, INT, INT]) \ + .reduce_group(DegreeCounter()) \ .group_by(0, 2) \ .reduce(DegreeJoiner()) edges_by_degree = edges_with_degrees \ - .map(EdgeByDegreeProjector(), [INT, INT]) + .map(EdgeByDegreeProjector()) edges_by_id = edges_by_degree \ - .map(EdgeByIdProjector(), [INT, INT]) + .map(EdgeByIdProjector()) triangles = edges_by_degree \ .group_by(0) \ .sort_group(1, Order.ASCENDING) \ - .reduce_group(TriadBuilder(), [INT, INT, INT]) \ + .reduce_group(TriadBuilder()) \ .join(edges_by_id) \ .where(1, 2) \ .equal_to(0, 1) \ - .using(TriadFilter(), [INT, INT, INT]) + .using(TriadFilter()) triangles.output() http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py index 676043f..1ea3e78 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py @@ -19,7 +19,7 @@ import sys from datetime import datetime from flink.plan.Environment import get_environment -from flink.plan.Constants import INT, STRING, FLOAT, WriteMode +from flink.plan.Constants import WriteMode from flink.functions.CoGroupFunction import CoGroupFunction from flink.functions.FilterFunction import FilterFunction @@ -54,16 +54,16 @@ if __name__ == "__main__": sys.exit("Usage: ./bin/pyflink.sh WebLogAnalysis ") documents = env \ - .read_csv(sys.argv[1], [STRING, STRING], "\n", "|") \ + .read_csv(sys.argv[1], "\n", "|") \ .filter(DocumentFilter()) \ .project(0) ranks = env \ - .read_csv(sys.argv[2], [INT, STRING, INT], "\n", "|") \ + .read_csv(sys.argv[2], "\n", "|") \ .filter(RankFilter()) visits = env \ - .read_csv(sys.argv[3], [STRING, STRING, STRING, FLOAT, STRING, STRING, STRING, STRING, INT], "\n", "|") \ + .read_csv(sys.argv[3], "\n", "|") \ .project(1,2) \ .filter(VisitFilter()) \ .project(0) @@ -78,7 +78,7 @@ if __name__ == "__main__": .co_group(visits) \ .where(1) \ .equal_to(0) \ - .using(AntiJoinVisits(), [INT, STRING, INT]) + .using(AntiJoinVisits()) result.write_csv(sys.argv[4], '\n', '|', WriteMode.OVERWRITE) http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py index 71c2e28..2ab724c 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py @@ -18,7 +18,6 @@ import sys from flink.plan.Environment import get_environment -from flink.plan.Constants import INT, STRING from flink.functions.FlatMapFunction import FlatMapFunction from flink.functions.GroupReduceFunction import GroupReduceFunction @@ -47,9 +46,9 @@ if __name__ == "__main__": data = env.from_elements("hello","world","hello","car","tree","data","hello") result = data \ - .flat_map(Tokenizer(), (INT, STRING)) \ + .flat_map(Tokenizer()) \ .group_by(1) \ - .reduce_group(Adder(), (INT, STRING), combinable=True) \ + .reduce_group(Adder(), combinable=True) \ if len(sys.argv) == 3: result.write_csv(sys.argv[2]) http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py index 9c55787..4cb337a 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py @@ -25,13 +25,16 @@ class CoGroupFunction(Function.Function): self._keys1 = None self._keys2 = None - def _configure(self, input_file, output_file, port, env): + def _configure(self, input_file, output_file, port, env, info): self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port) self._iterator = Iterator.Iterator(self._connection, env, 0) self._iterator2 = Iterator.Iterator(self._connection, env, 1) self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2) + self._collector = Collector.Collector(self._connection, env, info) self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector) - self._configure_chain(Collector.Collector(self._connection, env)) + if info.chained_info is not None: + info.chained_info.operator._configure_chain(self.context, self._collector, info.chained_info) + self._collector = info.chained_info.operator def _run(self): collector = self._collector http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py index f874a25..dfe6a28 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py @@ -16,9 +16,9 @@ # limitations under the License. ################################################################################ from abc import ABCMeta, abstractmethod -import sys from collections import deque from flink.connection import Connection, Iterator, Collector +from flink.connection.Iterator import IntegerDeserializer, StringDeserializer, _get_deserializer from flink.functions import RuntimeContext @@ -30,55 +30,56 @@ class Function(object): self._iterator = None self._collector = None self.context = None - self._chain_operator = None + self._env = None - def _configure(self, input_file, output_file, port, env): + def _configure(self, input_file, output_file, port, env, info): self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port) self._iterator = Iterator.Iterator(self._connection, env) + self._collector = Collector.Collector(self._connection, env, info) self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector) - self._configure_chain(Collector.Collector(self._connection, env)) + self._env = env + if info.chained_info is not None: + info.chained_info.operator._configure_chain(self.context, self._collector, info.chained_info) + self._collector = info.chained_info.operator - def _configure_chain(self, collector): - if self._chain_operator is not None: - self._collector = self._chain_operator - self._collector.context = self.context - self._collector._configure_chain(collector) - self._collector._open() - else: + def _configure_chain(self, context, collector, info): + self.context = context + if info.chained_info is None: self._collector = collector - - def _chain(self, operator): - self._chain_operator = operator + else: + self._collector = info.chained_info.operator + info.chained_info.operator._configure_chain(context, collector, info.chained_info) @abstractmethod def _run(self): pass - def _open(self): - pass - def _close(self): self._collector._close() - self._connection.close() + if self._connection is not None: + self._connection.close() def _go(self): self._receive_broadcast_variables() self._run() def _receive_broadcast_variables(self): - broadcast_count = self._iterator.next() - self._iterator._reset() - self._connection.reset() + con = self._connection + deserializer_int = IntegerDeserializer() + broadcast_count = deserializer_int.deserialize(con.read_secondary) + deserializer_string = StringDeserializer() for _ in range(broadcast_count): - name = self._iterator.next() - self._iterator._reset() - self._connection.reset() + name = deserializer_string.deserialize(con.read_secondary) bc = deque() - while(self._iterator.has_next()): - bc.append(self._iterator.next()) + if con.read_secondary(1) == b"\x01": + serializer_data = _get_deserializer(con.read_secondary, self._env._types) + value = serializer_data.deserialize(con.read_secondary) + bc.append(value) + while con.read_secondary(1) == b"\x01": + con.read_secondary(serializer_data.get_type_info_size()) #skip type info + value = serializer_data.deserialize(con.read_secondary) + bc.append(value) self.context._add_broadcast_variable(name, bc) - self._iterator._reset() - self._connection.reset() http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py index b758c19..340497d 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py @@ -24,21 +24,14 @@ from flink.plan.Constants import Order class GroupReduceFunction(Function.Function): def __init__(self): super(GroupReduceFunction, self).__init__() - self._keys = None - def _configure(self, input_file, output_file, port, env): - self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port) - self._iterator = Iterator.Iterator(self._connection, env) - if self._keys is None: + def _configure(self, input_file, output_file, port, env, info): + super(GroupReduceFunction, self)._configure(input_file, output_file, port, env, info) + if info.key1 is None: self._run = self._run_all_group_reduce else: self._run = self._run_grouped_group_reduce - self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys) - self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector) - self._collector = Collector.Collector(self._connection, env) - - def _set_grouping_keys(self, keys): - self._keys = keys + self._group_iterator = Iterator.GroupIterator(self._iterator, info.key1) def _run(self): pass http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/KeySelectorFunction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/KeySelectorFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/KeySelectorFunction.py new file mode 100644 index 0000000..a75961f --- /dev/null +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/KeySelectorFunction.py @@ -0,0 +1,28 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +class KeySelectorFunction: + def __call__(self, value): + return self.get_key(value) + + def callable(self): + return True + + def get_key(self, value): + pass http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py index 45a22da..95e8b8a 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py @@ -23,21 +23,14 @@ from flink.connection import Connection, Iterator, Collector class ReduceFunction(Function.Function): def __init__(self): super(ReduceFunction, self).__init__() - self._keys = None - def _configure(self, input_file, output_file, port, env): - self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port) - self._iterator = Iterator.Iterator(self._connection, env) - if self._keys is None: + def _configure(self, input_file, output_file, port, env, info): + super(ReduceFunction, self)._configure(input_file, output_file, port, env, info) + if info.key1 is None: self._run = self._run_all_reduce else: self._run = self._run_grouped_reduce - self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys) - self._collector = Collector.Collector(self._connection, env) - self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector) - - def _set_grouping_keys(self, keys): - self._keys = keys + self._group_iterator = Iterator.GroupIterator(self._iterator, info.key1) def _run(self): pass http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py index 0c9fe80..8881463 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Constants.py @@ -63,11 +63,6 @@ import sys PY2 = sys.version_info[0] == 2 PY3 = sys.version_info[0] == 3 - -class _Dummy(object): - pass - - if PY2: BOOL = True INT = 1 @@ -75,11 +70,17 @@ if PY2: FLOAT = 2.5 STRING = "type" BYTES = bytearray(b"byte") - CUSTOM = _Dummy() elif PY3: BOOL = True INT = 1 FLOAT = 2.5 STRING = "type" BYTES = bytearray(b"byte") - CUSTOM = _Dummy() + + +def _createKeyValueTypeInfo(keyCount): + return (tuple([BYTES for _ in range(keyCount)]), BYTES) + + +def _createArrayTypeInfo(): + return BYTES http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py index 25ec8b8..eda8d02 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py @@ -15,11 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import inspect import copy import types as TYPES -from flink.plan.Constants import _Identifier, WriteMode, STRING +from flink.plan.Constants import _Identifier, WriteMode, _createKeyValueTypeInfo, _createArrayTypeInfo from flink.plan.OperationInfo import OperationInfo from flink.functions.CoGroupFunction import CoGroupFunction from flink.functions.FilterFunction import FilterFunction @@ -30,52 +29,33 @@ from flink.functions.JoinFunction import JoinFunction from flink.functions.MapFunction import MapFunction from flink.functions.MapPartitionFunction import MapPartitionFunction from flink.functions.ReduceFunction import ReduceFunction +from flink.functions.KeySelectorFunction import KeySelectorFunction -def deduct_output_type(dataset): - skip = set([_Identifier.GROUP, _Identifier.SORT, _Identifier.UNION]) - source = set([_Identifier.SOURCE_CSV, _Identifier.SOURCE_TEXT, _Identifier.SOURCE_VALUE]) - default = set([_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.JOINT, _Identifier.JOINH, _Identifier.JOIN]) - - while True: - dataset_type = dataset.identifier - if dataset_type in skip: - dataset = dataset.parent - continue - if dataset_type in source: - if dataset_type == _Identifier.SOURCE_TEXT: - return STRING - if dataset_type == _Identifier.SOURCE_VALUE: - return dataset.values[0] - if dataset_type == _Identifier.SOURCE_CSV: - return dataset.types - if dataset_type == _Identifier.PROJECTION: - return tuple([deduct_output_type(dataset.parent)[k] for k in dataset.keys]) - if dataset_type in default: - if dataset.operator is not None: #udf-join/cross - return dataset.types - if len(dataset.projections) == 0: #defaultjoin/-cross - return (deduct_output_type(dataset.parent), deduct_output_type(dataset.other)) - else: #projectjoin/-cross - t1 = deduct_output_type(dataset.parent) - t2 = deduct_output_type(dataset.other) - out_type = [] - for prj in dataset.projections: - if len(prj[1]) == 0: #projection on non-tuple dataset - if prj[0] == "first": - out_type.append(t1) - else: - out_type.append(t2) - else: #projection on tuple dataset - for key in prj[1]: - if prj[0] == "first": - out_type.append(t1[key]) - else: - out_type.append(t2[key]) - return tuple(out_type) - return dataset.types - - -class Set(object): + +class Stringify(MapFunction): + def map(self, value): + if isinstance(value, (tuple, list)): + return "(" + b", ".join([self.map(x) for x in value]) + ")" + else: + return str(value) + + +class CsvStringify(MapFunction): + def __init__(self, f_delim): + super(CsvStringify, self).__init__() + self.delim = f_delim + + def map(self, value): + return self.delim.join([self._map(field) for field in value]) + + def _map(self, value): + if isinstance(value, (tuple, list)): + return "(" + b", ".join([self.map(x) for x in value]) + ")" + else: + return str(value) + + +class DataSet(object): def __init__(self, env, info): self._env = env self._info = info @@ -86,6 +66,9 @@ class Set(object): """ Writes a DataSet to the standard output stream (stdout). """ + self.map(Stringify())._output(to_error) + + def _output(self, to_error): child = OperationInfo() child.identifier = _Identifier.SINK_PRINT child.parent = self._info @@ -100,6 +83,9 @@ class Set(object): :param path: he path pointing to the location the text file is written to. :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten """ + return self.map(Stringify())._write_text(path, write_mode) + + def _write_text(self, path, write_mode): child = OperationInfo() child.identifier = _Identifier.SINK_TEXT child.parent = self._info @@ -116,6 +102,9 @@ class Set(object): :param path: The path pointing to the location the CSV file is written to. :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten """ + return self.map(CsvStringify(field_delimiter))._write_csv(path, line_delimiter, field_delimiter, write_mode) + + def _write_csv(self, path, line_delimiter, field_delimiter, write_mode): child = OperationInfo() child.identifier = _Identifier.SINK_CSV child.path = path @@ -126,7 +115,7 @@ class Set(object): self._info.sinks.append(child) self._env._sinks.append(child) - def reduce_group(self, operator, types, combinable=False): + def reduce_group(self, operator, combinable=False): """ Applies a GroupReduce transformation. @@ -136,7 +125,6 @@ class Set(object): emit any number of output elements including none. :param operator: The GroupReduceFunction that is applied on the DataSet. - :param types: The type of the resulting DataSet. :return:A GroupReduceOperator that represents the reduced DataSet. """ if isinstance(operator, TYPES.FunctionType): @@ -148,17 +136,12 @@ class Set(object): child.identifier = _Identifier.GROUPREDUCE child.parent = self._info child.operator = operator - child.types = types + child.types = _createArrayTypeInfo() child.name = "PythonGroupReduce" self._info.children.append(child) self._env._sets.append(child) return child_set - -class ReduceSet(Set): - def __init__(self, env, info): - super(ReduceSet, self).__init__(env, info) - def reduce(self, operator): """ Applies a Reduce transformation on a non-grouped DataSet. @@ -179,16 +162,11 @@ class ReduceSet(Set): child.parent = self._info child.operator = operator child.name = "PythonReduce" - child.types = deduct_output_type(self._info) + child.types = _createArrayTypeInfo() self._info.children.append(child) self._env._sets.append(child) return child_set - -class DataSet(ReduceSet): - def __init__(self, env, info): - super(DataSet, self).__init__(env, info) - def project(self, *fields): """ Applies a Project transformation on a Tuple DataSet. @@ -201,14 +179,7 @@ class DataSet(ReduceSet): :return: The projected DataSet. """ - child = OperationInfo() - child_set = DataSet(self._env, child) - child.identifier = _Identifier.PROJECTION - child.parent = self._info - child.keys = fields - self._info.children.append(child) - self._env._sets.append(child) - return child_set + return self.map(lambda x: tuple([x[key] for key in fields])) def group_by(self, *keys): """ @@ -223,6 +194,9 @@ class DataSet(ReduceSet): :param keys: One or more field positions on which the DataSet will be grouped. :return:A Grouping on which a transformation needs to be applied to obtain a transformed DataSet. """ + return self.map(lambda x: x)._group_by(keys) + + def _group_by(self, keys): child = OperationInfo() child_chain = [] child_set = UnsortedGrouping(self._env, child, child_chain) @@ -251,9 +225,8 @@ class DataSet(ReduceSet): other_set._info.children.append(child) child_set = CoGroupOperatorWhere(self._env, child) child.identifier = _Identifier.COGROUP - child.parent = self._info - child.other = other_set._info - self._info.children.append(child) + child.parent_set = self + child.other_set = other_set return child_set def cross(self, other_set): @@ -323,14 +296,13 @@ class DataSet(ReduceSet): child.identifier = _Identifier.FILTER child.parent = self._info child.operator = operator - child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) child.name = "PythonFilter" - child.types = deduct_output_type(self._info) + child.types = _createArrayTypeInfo() self._info.children.append(child) self._env._sets.append(child) return child_set - def flat_map(self, operator, types): + def flat_map(self, operator): """ Applies a FlatMap transformation on a DataSet. @@ -338,7 +310,6 @@ class DataSet(ReduceSet): Each FlatMapFunction call can return any number of elements including none. :param operator: The FlatMapFunction that is called for each element of the DataSet. - :param types: The type of the resulting DataSet. :return:A FlatMapOperator that represents the transformed DataSe """ if isinstance(operator, TYPES.FunctionType): @@ -350,8 +321,7 @@ class DataSet(ReduceSet): child.identifier = _Identifier.FLATMAP child.parent = self._info child.operator = operator - child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) - child.types = types + child.types = _createArrayTypeInfo() child.name = "PythonFlatMap" self._info.children.append(child) self._env._sets.append(child) @@ -398,14 +368,11 @@ class DataSet(ReduceSet): child = OperationInfo() child_set = JoinOperatorWhere(self._env, child) child.identifier = identifier - child.parent = self._info - child.other = other_set._info - self._info.children.append(child) - other_set._info.children.append(child) - self._env._sets.append(child) + child.parent_set = self + child.other_set = other_set return child_set - def map(self, operator, types): + def map(self, operator): """ Applies a Map transformation on a DataSet. @@ -413,7 +380,6 @@ class DataSet(ReduceSet): Each MapFunction call returns exactly one element. :param operator: The MapFunction that is called for each element of the DataSet. - :param types: The type of the resulting DataSet :return:A MapOperator that represents the transformed DataSet """ if isinstance(operator, TYPES.FunctionType): @@ -425,14 +391,13 @@ class DataSet(ReduceSet): child.identifier = _Identifier.MAP child.parent = self._info child.operator = operator - child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) - child.types = types + child.types = _createArrayTypeInfo() child.name = "PythonMap" self._info.children.append(child) self._env._sets.append(child) return child_set - def map_partition(self, operator, types): + def map_partition(self, operator): """ Applies a MapPartition transformation on a DataSet. @@ -444,7 +409,6 @@ class DataSet(ReduceSet): sees is non deterministic and depends on the degree of parallelism of the operation. :param operator: The MapFunction that is called for each element of the DataSet. - :param types: The type of the resulting DataSet :return:A MapOperator that represents the transformed DataSet """ if isinstance(operator, TYPES.FunctionType): @@ -456,8 +420,7 @@ class DataSet(ReduceSet): child.identifier = _Identifier.MAPPARTITION child.parent = self._info child.operator = operator - child.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) - child.types = types + child.types = _createArrayTypeInfo() child.name = "PythonMapPartition" self._info.children.append(child) self._env._sets.append(child) @@ -506,7 +469,10 @@ class Grouping(object): info.id = env._counter env._counter += 1 - def reduce_group(self, operator, types, combinable=False): + def _finalize(self): + pass + + def reduce_group(self, operator, combinable=False): """ Applies a GroupReduce transformation. @@ -516,21 +482,21 @@ class Grouping(object): emit any number of output elements including none. :param operator: The GroupReduceFunction that is applied on the DataSet. - :param types: The type of the resulting DataSet. :return:A GroupReduceOperator that represents the reduced DataSet. """ + self._finalize() if isinstance(operator, TYPES.FunctionType): f = operator operator = GroupReduceFunction() operator.reduce = f - operator._set_grouping_keys(self._child_chain[0].keys) child = OperationInfo() child_set = OperatorSet(self._env, child) child.identifier = _Identifier.GROUPREDUCE child.parent = self._info child.operator = operator - child.types = types + child.types = _createArrayTypeInfo() child.name = "PythonGroupReduce" + child.key1 = self._child_chain[0].keys self._info.children.append(child) self._env._sets.append(child) @@ -573,26 +539,96 @@ class UnsortedGrouping(Grouping): :param operator:The ReduceFunction that is applied on the DataSet. :return:A ReduceOperator that represents the reduced DataSet. """ - operator._set_grouping_keys(self._child_chain[0].keys) - for i in self._child_chain: - self._env._sets.append(i) + self._finalize() + if isinstance(operator, TYPES.FunctionType): + f = operator + operator = ReduceFunction() + operator.reduce = f child = OperationInfo() child_set = OperatorSet(self._env, child) child.identifier = _Identifier.REDUCE child.parent = self._info child.operator = operator child.name = "PythonReduce" - child.types = deduct_output_type(self._info) + child.types = _createArrayTypeInfo() + child.key1 = self._child_chain[0].keys self._info.children.append(child) self._env._sets.append(child) return child_set + def _finalize(self): + grouping = self._child_chain[0] + keys = grouping.keys + f = None + if isinstance(keys[0], TYPES.FunctionType): + f = lambda x: (keys[0](x),) + if isinstance(keys[0], KeySelectorFunction): + f = lambda x: (keys[0].get_key(x),) + if f is None: + f = lambda x: tuple([x[key] for key in keys]) + + grouping.parent.operator.map = lambda x: (f(x), x) + grouping.parent.types = _createKeyValueTypeInfo(len(keys)) + grouping.keys = tuple([i for i in range(len(grouping.keys))]) + class SortedGrouping(Grouping): def __init__(self, env, info, child_chain): super(SortedGrouping, self).__init__(env, info, child_chain) + def _finalize(self): + grouping = self._child_chain[0] + sortings = self._child_chain[1:] + + #list of used index keys to prevent duplicates and determine final index + index_keys = set() + + if not isinstance(grouping.keys[0], (TYPES.FunctionType, KeySelectorFunction)): + index_keys = index_keys.union(set(grouping.keys)) + + #list of sorts using indices + index_sorts = [] + #list of sorts using functions + ksl_sorts = [] + for s in sortings: + if not isinstance(s.field, (TYPES.FunctionType, KeySelectorFunction)): + index_keys.add(s.field) + index_sorts.append(s) + else: + ksl_sorts.append(s) + + used_keys = sorted(index_keys) + #all data gathered + + #construct list of extractor lambdas + lambdas = [] + i = 0 + for key in used_keys: + lambdas.append(lambda x, k=key: x[k]) + i += 1 + if isinstance(grouping.keys[0], (TYPES.FunctionType, KeySelectorFunction)): + lambdas.append(grouping.keys[0]) + for ksl_op in ksl_sorts: + lambdas.append(ksl_op.field) + + grouping.parent.operator.map = lambda x: (tuple([l(x) for l in lambdas]), x) + grouping.parent.types = _createKeyValueTypeInfo(len(lambdas)) + #modify keys + ksl_offset = len(used_keys) + if not isinstance(grouping.keys[0], (TYPES.FunctionType, KeySelectorFunction)): + grouping.keys = tuple([used_keys.index(key) for key in grouping.keys]) + else: + grouping.keys = (ksl_offset,) + ksl_offset += 1 + + for iop in index_sorts: + iop.field = used_keys.index(iop.field) + + for kop in ksl_sorts: + kop.field = ksl_offset + ksl_offset += 1 + class CoGroupOperatorWhere(object): def __init__(self, env, info): @@ -609,6 +645,18 @@ class CoGroupOperatorWhere(object): :param fields: The indexes of the Tuple fields of the first co-grouped DataSets that should be used as keys. :return: An incomplete CoGroup transformation. """ + f = None + if isinstance(fields[0], TYPES.FunctionType): + f = lambda x: (fields[0](x),) + if isinstance(fields[0], KeySelectorFunction): + f = lambda x: (fields[0].get_key(x),) + if f is None: + f = lambda x: tuple([x[key] for key in fields]) + + new_parent_set = self._info.parent_set.map(lambda x: (f(x), x)) + new_parent_set._info.types = _createKeyValueTypeInfo(len(fields)) + self._info.parent = new_parent_set._info + self._info.parent.children.append(self._info) self._info.key1 = fields return CoGroupOperatorTo(self._env, self._info) @@ -628,6 +676,18 @@ class CoGroupOperatorTo(object): :param fields: The indexes of the Tuple fields of the second co-grouped DataSet that should be used as keys. :return: An incomplete CoGroup transformation. """ + f = None + if isinstance(fields[0], TYPES.FunctionType): + f = lambda x: (fields[0](x),) + if isinstance(fields[0], KeySelectorFunction): + f = lambda x: (fields[0].get_key(x),) + if f is None: + f = lambda x: tuple([x[key] for key in fields]) + + new_other_set = self._info.other_set.map(lambda x: (f(x), x)) + new_other_set._info.types = _createKeyValueTypeInfo(len(fields)) + self._info.other = new_other_set._info + self._info.other.children.append(self._info) self._info.key2 = fields return CoGroupOperatorUsing(self._env, self._info) @@ -637,7 +697,7 @@ class CoGroupOperatorUsing(object): self._env = env self._info = info - def using(self, operator, types): + def using(self, operator): """ Finalizes a CoGroup transformation. @@ -645,7 +705,6 @@ class CoGroupOperatorUsing(object): Each CoGroupFunction call returns an arbitrary number of keys. :param operator: The CoGroupFunction that is called for all groups of elements with identical keys. - :param types: The type of the resulting DataSet. :return:An CoGroupOperator that represents the co-grouped result DataSet. """ if isinstance(operator, TYPES.FunctionType): @@ -653,11 +712,12 @@ class CoGroupOperatorUsing(object): operator = CoGroupFunction() operator.co_group = f new_set = OperatorSet(self._env, self._info) + self._info.key1 = tuple([x for x in range(len(self._info.key1))]) + self._info.key2 = tuple([x for x in range(len(self._info.key2))]) operator._keys1 = self._info.key1 operator._keys2 = self._info.key2 self._info.operator = operator - self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) - self._info.types = types + self._info.types = _createArrayTypeInfo() self._info.name = "PythonCoGroup" self._env._sets.append(self._info) return new_set @@ -679,7 +739,19 @@ class JoinOperatorWhere(object): :return:An incomplete Join transformation. """ - self._info.key1 = fields + f = None + if isinstance(fields[0], TYPES.FunctionType): + f = lambda x: (fields[0](x),) + if isinstance(fields[0], KeySelectorFunction): + f = lambda x: (fields[0].get_key(x),) + if f is None: + f = lambda x: tuple([x[key] for key in fields]) + + new_parent_set = self._info.parent_set.map(lambda x: (f(x), x)) + new_parent_set._info.types = _createKeyValueTypeInfo(len(fields)) + self._info.parent = new_parent_set._info + self._info.parent.children.append(self._info) + self._info.key1 = tuple([x for x in range(len(fields))]) return JoinOperatorTo(self._env, self._info) @@ -698,81 +770,115 @@ class JoinOperatorTo(object): :param fields:The indexes of the Tuple fields of the second join DataSet that should be used as keys. :return:An incomplete Join Transformation. """ - self._info.key2 = fields + f = None + if isinstance(fields[0], TYPES.FunctionType): + f = lambda x: (fields[0](x),) + if isinstance(fields[0], KeySelectorFunction): + f = lambda x: (fields[0].get_key(x),) + if f is None: + f = lambda x: tuple([x[key] for key in fields]) + + new_other_set = self._info.other_set.map(lambda x: (f(x), x)) + new_other_set._info.types = _createKeyValueTypeInfo(len(fields)) + self._info.other = new_other_set._info + self._info.other.children.append(self._info) + self._info.key2 = tuple([x for x in range(len(fields))]) + self._env._sets.append(self._info) return JoinOperator(self._env, self._info) -class JoinOperatorProjection(DataSet): +class Projector(DataSet): def __init__(self, env, info): - super(JoinOperatorProjection, self).__init__(env, info) + super(Projector, self).__init__(env, info) def project_first(self, *fields): """ - Initiates a ProjectJoin transformation. + Initiates a Project transformation. - Projects the first join input. - If the first join input is a Tuple DataSet, fields can be selected by their index. - If the first join input is not a Tuple DataSet, no parameters should be passed. + Projects the first input. + If the first input is a Tuple DataSet, fields can be selected by their index. + If the first input is not a Tuple DataSet, no parameters should be passed. :param fields: The indexes of the selected fields. - :return: An incomplete JoinProjection. + :return: An incomplete Projection. """ - self._info.projections.append(("first", fields)) + for field in fields: + self._info.projections.append((0, field)) + self._info.operator.map = lambda x : tuple([x[side][index] for side, index in self._info.projections]) return self def project_second(self, *fields): """ - Initiates a ProjectJoin transformation. + Initiates a Project transformation. - Projects the second join input. - If the second join input is a Tuple DataSet, fields can be selected by their index. - If the second join input is not a Tuple DataSet, no parameters should be passed. + Projects the second input. + If the second input is a Tuple DataSet, fields can be selected by their index. + If the second input is not a Tuple DataSet, no parameters should be passed. :param fields: The indexes of the selected fields. - :return: An incomplete JoinProjection. + :return: An incomplete Projection. """ - self._info.projections.append(("second", fields)) + for field in fields: + self._info.projections.append((1, field)) + self._info.operator.map = lambda x : tuple([x[side][index] for side, index in self._info.projections]) return self -class JoinOperator(DataSet): - def __init__(self, env, info): - super(JoinOperator, self).__init__(env, info) +class Projectable: + def __init__(self): + pass def project_first(self, *fields): """ - Initiates a ProjectJoin transformation. + Initiates a Project transformation. - Projects the first join input. - If the first join input is a Tuple DataSet, fields can be selected by their index. - If the first join input is not a Tuple DataSet, no parameters should be passed. + Projects the first input. + If the first input is a Tuple DataSet, fields can be selected by their index. + If the first input is not a Tuple DataSet, no parameters should be passed. :param fields: The indexes of the selected fields. - :return: An incomplete JoinProjection. + :return: An incomplete Projection. """ - return JoinOperatorProjection(self._env, self._info).project_first(*fields) + return Projectable._createProjector(self._env, self._info).project_first(*fields) def project_second(self, *fields): """ - Initiates a ProjectJoin transformation. + Initiates a Project transformation. - Projects the second join input. - If the second join input is a Tuple DataSet, fields can be selected by their index. - If the second join input is not a Tuple DataSet, no parameters should be passed. + Projects the second input. + If the second input is a Tuple DataSet, fields can be selected by their index. + If the second input is not a Tuple DataSet, no parameters should be passed. :param fields: The indexes of the selected fields. - :return: An incomplete JoinProjection. + :return: An incomplete Projection. """ - return JoinOperatorProjection(self._env, self._info).project_second(*fields) + return Projectable._createProjector(self._env, self._info).project_second(*fields) + + @staticmethod + def _createProjector(env, info): + child = OperationInfo() + child_set = Projector(env, child) + child.identifier = _Identifier.MAP + child.operator = MapFunction() + child.parent = info + child.types = _createArrayTypeInfo() + child.name = "Projector" + info.children.append(child) + env._sets.append(child) + return child_set + - def using(self, operator, types): +class JoinOperator(DataSet, Projectable): + def __init__(self, env, info): + super(JoinOperator, self).__init__(env, info) + + def using(self, operator): """ Finalizes a Join transformation. Applies a JoinFunction to each pair of joined elements. Each JoinFunction call returns exactly one element. :param operator:The JoinFunction that is called for each pair of joined elements. - :param types: :return:An Set that represents the joined result DataSet. """ if isinstance(operator, TYPES.FunctionType): @@ -780,84 +886,23 @@ class JoinOperator(DataSet): operator = JoinFunction() operator.join = f self._info.operator = operator - self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) - self._info.types = types + self._info.types = _createArrayTypeInfo() self._info.name = "PythonJoin" - self._env._sets.append(self._info) + self._info.uses_udf = True return OperatorSet(self._env, self._info) -class CrossOperatorProjection(DataSet): - def __init__(self, env, info): - super(CrossOperatorProjection, self).__init__(env, info) - - def project_first(self, *fields): - """ - Initiates a ProjectCross transformation. - - Projects the first join input. - If the first join input is a Tuple DataSet, fields can be selected by their index. - If the first join input is not a Tuple DataSet, no parameters should be passed. - - :param fields: The indexes of the selected fields. - :return: An incomplete CrossProjection. - """ - self._info.projections.append(("first", fields)) - return self - - def project_second(self, *fields): - """ - Initiates a ProjectCross transformation. - - Projects the second join input. - If the second join input is a Tuple DataSet, fields can be selected by their index. - If the second join input is not a Tuple DataSet, no parameters should be passed. - - :param fields: The indexes of the selected fields. - :return: An incomplete CrossProjection. - """ - self._info.projections.append(("second", fields)) - return self - - -class CrossOperator(DataSet): +class CrossOperator(DataSet, Projectable): def __init__(self, env, info): super(CrossOperator, self).__init__(env, info) - def project_first(self, *fields): - """ - Initiates a ProjectCross transformation. - - Projects the first join input. - If the first join input is a Tuple DataSet, fields can be selected by their index. - If the first join input is not a Tuple DataSet, no parameters should be passed. - - :param fields: The indexes of the selected fields. - :return: An incomplete CrossProjection. - """ - return CrossOperatorProjection(self._env, self._info).project_first(*fields) - - def project_second(self, *fields): - """ - Initiates a ProjectCross transformation. - - Projects the second join input. - If the second join input is a Tuple DataSet, fields can be selected by their index. - If the second join input is not a Tuple DataSet, no parameters should be passed. - - :param fields: The indexes of the selected fields. - :return: An incomplete CrossProjection. - """ - return CrossOperatorProjection(self._env, self._info).project_second(*fields) - - def using(self, operator, types): + def using(self, operator): """ Finalizes a Cross transformation. Applies a CrossFunction to each pair of joined elements. Each CrossFunction call returns exactly one element. :param operator:The CrossFunction that is called for each pair of joined elements. - :param types: The type of the resulting DataSet. :return:An Set that represents the joined result DataSet. """ if isinstance(operator, TYPES.FunctionType): @@ -865,7 +910,7 @@ class CrossOperator(DataSet): operator = CrossFunction() operator.cross = f self._info.operator = operator - self._info.meta = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) - self._info.types = types + self._info.types = _createArrayTypeInfo() self._info.name = "PythonCross" + self._info.uses_udf = True return OperatorSet(self._env, self._info)