From commits-return-103375-archive-asf-public=cust-asf.ponee.io@beam.apache.org Fri Sep 13 16:11:49 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0E77A180658 for ; Fri, 13 Sep 2019 18:11:48 +0200 (CEST) Received: (qmail 18601 invoked by uid 500); 13 Sep 2019 16:11:48 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 18541 invoked by uid 99); 13 Sep 2019 16:11:48 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Sep 2019 16:11:48 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 0A05881585; Fri, 13 Sep 2019 16:11:48 +0000 (UTC) Date: Fri, 13 Sep 2019 16:11:43 +0000 To: "commits@beam.apache.org" Subject: [beam] 01/01: Revert "[7746] Create a more user friendly external transform API" MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: altay@apache.org In-Reply-To: <156839109991.26157.17688239026551119288@gitbox.apache.org> References: <156839109991.26157.17688239026551119288@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/revert-9098-py_external_api X-Git-Reftype: branch X-Git-Rev: 971c3e343f87f357a6448c41d2445eebcf44a237 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20190913161148.0A05881585@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch revert-9098-py_external_api in repository https://gitbox.apache.org/repos/asf/beam.git commit 971c3e343f87f357a6448c41d2445eebcf44a237 Author: Ahmet Altay AuthorDate: Fri Sep 13 09:10:47 2019 -0700 Revert "[7746] Create a more user friendly external transform API" --- .../construction/expansion/ExpansionService.java | 10 +- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 68 +++++--- .../beam/sdk/io/kafka/KafkaIOExternalTest.java | 47 ++--- .../apache_beam/io/external/generate_sequence.py | 62 +++++-- sdks/python/apache_beam/io/external/kafka.py | 168 +++++++++++------- sdks/python/apache_beam/transforms/external.py | 189 +-------------------- .../python/apache_beam/transforms/external_test.py | 164 +----------------- .../apache_beam/transforms/external_test_py3.py | 93 ---------- .../apache_beam/transforms/external_test_py37.py | 71 -------- sdks/python/scripts/generate_pydoc.sh | 2 +- sdks/python/scripts/run_mini_py3lint.sh | 21 +-- sdks/python/scripts/run_pylint.sh | 2 +- sdks/python/setup.cfg | 3 - sdks/python/tox.ini | 22 +-- 14 files changed, 240 insertions(+), 682 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java index ae1a3d7..795298a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java @@ -179,15 +179,11 @@ public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplB } catch (NoSuchMethodException e) { throw new RuntimeException( String.format( - "The configuration class %s is missing a setter %s for %s with type %s", - config.getClass(), - setterName, - fieldName, - coder.getEncodedTypeDescriptor().getType().getTypeName()), + "The configuration class %s is missing a setter %s for %s", + config.getClass(), setterName, fieldName), e); } - method.invoke( - config, coder.decode(entry.getValue().getPayload().newInput(), Coder.Context.NESTED)); + method.invoke(config, coder.decode(entry.getValue().getPayload().newInput())); } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index ca74ecb..373d4b8 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -65,6 +65,7 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; @@ -401,22 +402,26 @@ public class KafkaIO { public PTransform>> buildExternal( External.Configuration config) { ImmutableList.Builder listBuilder = ImmutableList.builder(); - for (String topic : config.topics) { - listBuilder.add(topic); + for (byte[] topic : config.topics) { + listBuilder.add(utf8String(topic)); } setTopics(listBuilder.build()); - Class keyDeserializer = resolveClass(config.keyDeserializer); + String keyDeserializerClassName = utf8String(config.keyDeserializer); + Class keyDeserializer = resolveClass(keyDeserializerClassName); setKeyDeserializer(keyDeserializer); setKeyCoder(resolveCoder(keyDeserializer)); - Class valueDeserializer = resolveClass(config.valueDeserializer); + String valueDeserializerClassName = utf8String(config.valueDeserializer); + Class valueDeserializer = resolveClass(valueDeserializerClassName); setValueDeserializer(valueDeserializer); setValueCoder(resolveCoder(valueDeserializer)); Map consumerConfig = new HashMap<>(); - for (KV kv : config.consumerConfig) { - consumerConfig.put(kv.getKey(), kv.getValue()); + for (KV kv : config.consumerConfig) { + String key = utf8String(kv.getKey()); + String value = utf8String(kv.getValue()); + consumerConfig.put(key, value); } // Key and Value Deserializers always have to be in the config. consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName()); @@ -475,24 +480,24 @@ public class KafkaIO { public static class Configuration { // All byte arrays are UTF-8 encoded strings - private Iterable> consumerConfig; - private Iterable topics; - private String keyDeserializer; - private String valueDeserializer; + private Iterable> consumerConfig; + private Iterable topics; + private byte[] keyDeserializer; + private byte[] valueDeserializer; - public void setConsumerConfig(Iterable> consumerConfig) { + public void setConsumerConfig(Iterable> consumerConfig) { this.consumerConfig = consumerConfig; } - public void setTopics(Iterable topics) { + public void setTopics(Iterable topics) { this.topics = topics; } - public void setKeyDeserializer(String keyDeserializer) { + public void setKeyDeserializer(byte[] keyDeserializer) { this.keyDeserializer = keyDeserializer; } - public void setValueDeserializer(String valueDeserializer) { + public void setValueDeserializer(byte[] valueDeserializer) { this.valueDeserializer = valueDeserializer; } } @@ -1360,21 +1365,24 @@ public class KafkaIO { @Override public PTransform>, PDone> buildExternal( External.Configuration configuration) { - setTopic(configuration.topic); + String topic = utf8String(configuration.topic); + setTopic(topic); Map producerConfig = new HashMap<>(); - for (KV kv : configuration.producerConfig) { - producerConfig.put(kv.getKey(), kv.getValue()); + for (KV kv : configuration.producerConfig) { + String key = utf8String(kv.getKey()); + String value = utf8String(kv.getValue()); + producerConfig.put(key, value); } - Class keySerializer = resolveClass(configuration.keySerializer); - Class valSerializer = resolveClass(configuration.valueSerializer); + Class keySerializer = resolveClass(utf8String(configuration.keySerializer)); + Class valSerializer = resolveClass(utf8String(configuration.valueSerializer)); WriteRecords writeRecords = KafkaIO.writeRecords() .withProducerConfigUpdates(producerConfig) .withKeySerializer(keySerializer) .withValueSerializer(valSerializer) - .withTopic(configuration.topic); + .withTopic(topic); setWriteRecordsTransform(writeRecords); return build(); @@ -1397,24 +1405,24 @@ public class KafkaIO { public static class Configuration { // All byte arrays are UTF-8 encoded strings - private Iterable> producerConfig; - private String topic; - private String keySerializer; - private String valueSerializer; + private Iterable> producerConfig; + private byte[] topic; + private byte[] keySerializer; + private byte[] valueSerializer; - public void setProducerConfig(Iterable> producerConfig) { + public void setProducerConfig(Iterable> producerConfig) { this.producerConfig = producerConfig; } - public void setTopic(String topic) { + public void setTopic(byte[] topic) { this.topic = topic; } - public void setKeySerializer(String keySerializer) { + public void setKeySerializer(byte[] keySerializer) { this.keySerializer = keySerializer; } - public void setValueSerializer(String valueSerializer) { + public void setValueSerializer(byte[] valueSerializer) { this.valueSerializer = valueSerializer; } } @@ -1683,6 +1691,10 @@ public class KafkaIO { String.format("Could not extract the Kafka Deserializer type from %s", deserializer)); } + private static String utf8String(byte[] bytes) { + return new String(bytes, Charsets.UTF_8); + } + private static Class resolveClass(String className) { try { return Class.forName(className); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index a7b7f8a..83f2cc7 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -32,15 +32,17 @@ import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.core.construction.expansion.ExpansionService; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -74,7 +76,7 @@ public class KafkaIOExternalTest { "topics", ExternalTransforms.ConfigValue.newBuilder() .addCoderUrn("beam:coder:iterable:v1") - .addCoderUrn("beam:coder:string_utf8:v1") + .addCoderUrn("beam:coder:bytes:v1") .setPayload(ByteString.copyFrom(listAsBytes(topics))) .build()) .putConfiguration( @@ -82,20 +84,20 @@ public class KafkaIOExternalTest { ExternalTransforms.ConfigValue.newBuilder() .addCoderUrn("beam:coder:iterable:v1") .addCoderUrn("beam:coder:kv:v1") - .addCoderUrn("beam:coder:string_utf8:v1") - .addCoderUrn("beam:coder:string_utf8:v1") + .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:bytes:v1") .setPayload(ByteString.copyFrom(mapAsBytes(consumerConfig))) .build()) .putConfiguration( "key_deserializer", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:string_utf8:v1") + .addCoderUrn("beam:coder:bytes:v1") .setPayload(ByteString.copyFrom(encodeString(keyDeserializer))) .build()) .putConfiguration( "value_deserializer", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:string_utf8:v1") + .addCoderUrn("beam:coder:bytes:v1") .setPayload(ByteString.copyFrom(encodeString(valueDeserializer))) .build()) .build(); @@ -159,7 +161,7 @@ public class KafkaIOExternalTest { .putConfiguration( "topic", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:string_utf8:v1") + .addCoderUrn("beam:coder:bytes:v1") .setPayload(ByteString.copyFrom(encodeString(topic))) .build()) .putConfiguration( @@ -167,20 +169,20 @@ public class KafkaIOExternalTest { ExternalTransforms.ConfigValue.newBuilder() .addCoderUrn("beam:coder:iterable:v1") .addCoderUrn("beam:coder:kv:v1") - .addCoderUrn("beam:coder:string_utf8:v1") - .addCoderUrn("beam:coder:string_utf8:v1") + .addCoderUrn("beam:coder:bytes:v1") + .addCoderUrn("beam:coder:bytes:v1") .setPayload(ByteString.copyFrom(mapAsBytes(producerConfig))) .build()) .putConfiguration( "key_serializer", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:string_utf8:v1") + .addCoderUrn("beam:coder:bytes:v1") .setPayload(ByteString.copyFrom(encodeString(keySerializer))) .build()) .putConfiguration( "value_serializer", ExternalTransforms.ConfigValue.newBuilder() - .addCoderUrn("beam:coder:string_utf8:v1") + .addCoderUrn("beam:coder:bytes:v1") .setPayload(ByteString.copyFrom(encodeString(valueSerializer))) .build()) .build(); @@ -246,30 +248,37 @@ public class KafkaIOExternalTest { } private static byte[] listAsBytes(List stringList) throws IOException { - IterableCoder coder = IterableCoder.of(StringUtf8Coder.of()); + IterableCoder coder = IterableCoder.of(ByteArrayCoder.of()); + List bytesList = + stringList.stream().map(KafkaIOExternalTest::utf8Bytes).collect(Collectors.toList()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - coder.encode(stringList, baos); + coder.encode(bytesList, baos); return baos.toByteArray(); } private static byte[] mapAsBytes(Map stringMap) throws IOException { - IterableCoder> coder = - IterableCoder.of(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - List> stringList = + IterableCoder> coder = + IterableCoder.of(KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of())); + List> bytesList = stringMap.entrySet().stream() - .map(kv -> KV.of(kv.getKey(), kv.getValue())) + .map(kv -> KV.of(utf8Bytes(kv.getKey()), utf8Bytes(kv.getValue()))) .collect(Collectors.toList()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - coder.encode(stringList, baos); + coder.encode(bytesList, baos); return baos.toByteArray(); } private static byte[] encodeString(String str) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - StringUtf8Coder.of().encode(str, baos); + ByteArrayCoder.of().encode(utf8Bytes(str), baos); return baos.toByteArray(); } + private static byte[] utf8Bytes(String str) { + Preconditions.checkNotNull(str, "String must not be null."); + return str.getBytes(Charsets.UTF_8); + } + private static class TestStreamObserver implements StreamObserver { private T result; diff --git a/sdks/python/apache_beam/io/external/generate_sequence.py b/sdks/python/apache_beam/io/external/generate_sequence.py index a17ec7b..0e0b1fd 100644 --- a/sdks/python/apache_beam/io/external/generate_sequence.py +++ b/sdks/python/apache_beam/io/external/generate_sequence.py @@ -17,11 +17,15 @@ from __future__ import absolute_import -from apache_beam.transforms.external import ExternalTransform -from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder +from apache_beam import ExternalTransform +from apache_beam import pvalue +from apache_beam.coders import VarIntCoder +from apache_beam.portability.api.external_transforms_pb2 import ConfigValue +from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload +from apache_beam.transforms import ptransform -class GenerateSequence(ExternalTransform): +class GenerateSequence(ptransform.PTransform): """ An external PTransform which provides a bounded or unbounded stream of integers. @@ -45,19 +49,47 @@ class GenerateSequence(ExternalTransform): Experimental; no backwards compatibility guarantees. """ + URN = 'beam:external:java:generate_sequence:v1' def __init__(self, start, stop=None, elements_per_period=None, max_read_time=None, - expansion_service=None): - super(GenerateSequence, self).__init__( - self.URN, - ImplicitSchemaPayloadBuilder( - { - 'start': start, - 'stop': stop, - 'elements_per_period': elements_per_period, - 'max_read_time': max_read_time, - } - ), - expansion_service) + expansion_service='localhost:8097'): + super(GenerateSequence, self).__init__() + self.start = start + self.stop = stop + self.elements_per_period = elements_per_period + self.max_read_time = max_read_time + self.expansion_service = expansion_service + + def expand(self, pbegin): + if not isinstance(pbegin, pvalue.PBegin): + raise Exception("GenerateSequence must be a root transform") + + coder = VarIntCoder() + coder_urn = ['beam:coder:varint:v1'] + args = { + 'start': + ConfigValue( + coder_urn=coder_urn, + payload=coder.encode(self.start)) + } + if self.stop: + args['stop'] = ConfigValue( + coder_urn=coder_urn, + payload=coder.encode(self.stop)) + if self.elements_per_period: + args['elements_per_period'] = ConfigValue( + coder_urn=coder_urn, + payload=coder.encode(self.elements_per_period)) + if self.max_read_time: + args['max_read_time'] = ConfigValue( + coder_urn=coder_urn, + payload=coder.encode(self.max_read_time)) + + payload = ExternalConfigurationPayload(configuration=args) + return pbegin.apply( + ExternalTransform( + self.URN, + payload.SerializeToString(), + self.expansion_service)) diff --git a/sdks/python/apache_beam/io/external/kafka.py b/sdks/python/apache_beam/io/external/kafka.py index f824515..ed24c00 100644 --- a/sdks/python/apache_beam/io/external/kafka.py +++ b/sdks/python/apache_beam/io/external/kafka.py @@ -37,25 +37,18 @@ from __future__ import absolute_import -import typing - -from past.builtins import unicode - -from apache_beam.transforms.external import ExternalTransform -from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder - -ReadFromKafkaSchema = typing.NamedTuple( - 'ReadFromKafkaSchema', - [ - ('consumer_config', typing.List[typing.Tuple[unicode, unicode]]), - ('topics', typing.List[unicode]), - ('key_deserializer', unicode), - ('value_deserializer', unicode), - ] -) - - -class ReadFromKafka(ExternalTransform): +from apache_beam import ExternalTransform +from apache_beam import pvalue +from apache_beam.coders import BytesCoder +from apache_beam.coders import IterableCoder +from apache_beam.coders import TupleCoder +from apache_beam.coders.coders import LengthPrefixCoder +from apache_beam.portability.api.external_transforms_pb2 import ConfigValue +from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload +from apache_beam.transforms import ptransform + + +class ReadFromKafka(ptransform.PTransform): """ An external PTransform which reads from Kafka and returns a KV pair for each item in the specified Kafka topics. If no Kafka Deserializer for @@ -71,13 +64,11 @@ class ReadFromKafka(ExternalTransform): byte_array_deserializer = 'org.apache.kafka.common.serialization.' \ 'ByteArrayDeserializer' - URN = 'beam:external:java:kafka:read:v1' - def __init__(self, consumer_config, topics, key_deserializer=byte_array_deserializer, value_deserializer=byte_array_deserializer, - expansion_service=None): + expansion_service='localhost:8097'): """ Initializes a read operation from Kafka. @@ -97,32 +88,38 @@ class ReadFromKafka(ExternalTransform): serialization.ByteArrayDeserializer'. :param expansion_service: The address (host:port) of the ExpansionService. """ - super(ReadFromKafka, self).__init__( - self.URN, - NamedTupleBasedPayloadBuilder( - ReadFromKafkaSchema( - consumer_config=list(consumer_config.items()), - topics=topics, - key_deserializer=key_deserializer, - value_deserializer=value_deserializer, - ) - ), - expansion_service - ) - - -WriteToKafkaSchema = typing.NamedTuple( - 'WriteToKafkaSchema', - [ - ('producer_config', typing.List[typing.Tuple[unicode, unicode]]), - ('topic', unicode), - ('key_serializer', unicode), - ('value_serializer', unicode), - ] -) - - -class WriteToKafka(ExternalTransform): + super(ReadFromKafka, self).__init__() + self._urn = 'beam:external:java:kafka:read:v1' + self.consumer_config = consumer_config + self.topics = topics + self.key_deserializer = key_deserializer + self.value_deserializer = value_deserializer + self.expansion_service = expansion_service + + def expand(self, pbegin): + if not isinstance(pbegin, pvalue.PBegin): + raise Exception("ReadFromKafka must be a root transform") + + args = { + 'consumer_config': + _encode_map(self.consumer_config), + 'topics': + _encode_list(self.topics), + 'key_deserializer': + _encode_str(self.key_deserializer), + 'value_deserializer': + _encode_str(self.value_deserializer), + } + + payload = ExternalConfigurationPayload(configuration=args) + return pbegin.apply( + ExternalTransform( + self._urn, + payload.SerializeToString(), + self.expansion_service)) + + +class WriteToKafka(ptransform.PTransform): """ An external PTransform which writes KV data to a specified Kafka topic. If no Kafka Serializer for key/value is provided, then key/value are @@ -135,13 +132,11 @@ class WriteToKafka(ExternalTransform): byte_array_serializer = 'org.apache.kafka.common.serialization.' \ 'ByteArraySerializer' - URN = 'beam:external:java:kafka:write:v1' - def __init__(self, producer_config, topic, key_serializer=byte_array_serializer, value_serializer=byte_array_serializer, - expansion_service=None): + expansion_service='localhost:8097'): """ Initializes a write operation to Kafka. @@ -161,15 +156,62 @@ class WriteToKafka(ExternalTransform): serialization.ByteArraySerializer'. :param expansion_service: The address (host:port) of the ExpansionService. """ - super(WriteToKafka, self).__init__( - self.URN, - NamedTupleBasedPayloadBuilder( - WriteToKafkaSchema( - producer_config=list(producer_config.items()), - topic=topic, - key_serializer=key_serializer, - value_serializer=value_serializer, - ) - ), - expansion_service - ) + super(WriteToKafka, self).__init__() + self._urn = 'beam:external:java:kafka:write:v1' + self.producer_config = producer_config + self.topic = topic + self.key_serializer = key_serializer + self.value_serializer = value_serializer + self.expansion_service = expansion_service + + def expand(self, pvalue): + args = { + 'producer_config': + _encode_map(self.producer_config), + 'topic': + _encode_str(self.topic), + 'key_serializer': + _encode_str(self.key_serializer), + 'value_serializer': + _encode_str(self.value_serializer), + } + + payload = ExternalConfigurationPayload(configuration=args) + return pvalue.apply( + ExternalTransform( + self._urn, + payload.SerializeToString(), + self.expansion_service)) + + +def _encode_map(dict_obj): + kv_list = [(key.encode('utf-8'), val.encode('utf-8')) + for key, val in dict_obj.items()] + coder = IterableCoder(TupleCoder( + [LengthPrefixCoder(BytesCoder()), LengthPrefixCoder(BytesCoder())])) + coder_urns = ['beam:coder:iterable:v1', + 'beam:coder:kv:v1', + 'beam:coder:bytes:v1', + 'beam:coder:bytes:v1'] + return ConfigValue( + coder_urn=coder_urns, + payload=coder.encode(kv_list)) + + +def _encode_list(list_obj): + encoded_list = [val.encode('utf-8') for val in list_obj] + coder = IterableCoder(LengthPrefixCoder(BytesCoder())) + coder_urns = ['beam:coder:iterable:v1', + 'beam:coder:bytes:v1'] + return ConfigValue( + coder_urn=coder_urns, + payload=coder.encode(encoded_list)) + + +def _encode_str(str_obj): + encoded_str = str_obj.encode('utf-8') + coder = LengthPrefixCoder(BytesCoder()) + coder_urns = ['beam:coder:bytes:v1'] + return ConfigValue( + coder_urn=coder_urns, + payload=coder.encode(encoded_str)) diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index fd79fcf..a7e9fb5 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -27,18 +27,11 @@ import copy import threading from apache_beam import pvalue -from apache_beam.coders import registry from apache_beam.portability import common_urns from apache_beam.portability.api import beam_expansion_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.portability.api.external_transforms_pb2 import ConfigValue -from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload from apache_beam.runners import pipeline_context from apache_beam.transforms import ptransform -from apache_beam.typehints.native_type_compatibility import convert_to_beam_type -from apache_beam.typehints.trivial_inference import instance_to_type -from apache_beam.typehints.typehints import Union -from apache_beam.typehints.typehints import UnionConstraint # Protect against environments where grpc is not available. # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -49,170 +42,6 @@ except ImportError: grpc = None # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports -DEFAULT_EXPANSION_SERVICE = 'localhost:8097' - - -def _is_optional_or_none(typehint): - return (type(None) in typehint.union_types - if isinstance(typehint, UnionConstraint) else typehint is type(None)) - - -def _strip_optional(typehint): - if not _is_optional_or_none(typehint): - return typehint - new_types = typehint.union_types.difference({type(None)}) - if len(new_types) == 1: - return list(new_types)[0] - return Union[new_types] - - -def iter_urns(coder, context=None): - yield coder.to_runner_api_parameter(context)[0] - for child in coder._get_component_coders(): - for urn in iter_urns(child, context): - yield urn - - -class PayloadBuilder(object): - """ - Abstract base class for building payloads to pass to ExternalTransform. - """ - - @classmethod - def _config_value(cls, obj, typehint): - """ - Helper to create a ConfigValue with an encoded value. - """ - coder = registry.get_coder(typehint) - urns = list(iter_urns(coder)) - if 'beam:coder:pickled_python:v1' in urns: - raise RuntimeError("Found non-portable coder for %s" % (typehint,)) - return ConfigValue( - coder_urn=urns, - payload=coder.get_impl().encode_nested(obj)) - - def build(self): - """ - :return: ExternalConfigurationPayload - """ - raise NotImplementedError - - def payload(self): - """ - The serialized ExternalConfigurationPayload - - :return: bytes - """ - return self.build().SerializeToString() - - -class SchemaBasedPayloadBuilder(PayloadBuilder): - """ - Base class for building payloads based on a schema that provides - type information for each configuration value to encode. - - Note that if the schema defines a type as Optional, the corresponding value - will be omitted from the encoded payload, and thus the native transform - will determine the default. - """ - - def __init__(self, values, schema): - """ - :param values: mapping of config names to values - :param schema: mapping of config names to types - """ - self._values = values - self._schema = schema - - @classmethod - def _encode_config(cls, values, schema): - result = {} - for key, value in values.items(): - - try: - typehint = schema[key] - except KeyError: - raise RuntimeError("No typehint provided for key %r" % key) - - typehint = convert_to_beam_type(typehint) - - if value is None: - if not _is_optional_or_none(typehint): - raise RuntimeError("If value is None, typehint should be " - "optional. Got %r" % typehint) - # make it easy for user to filter None by default - continue - else: - # strip Optional from typehint so that pickled_python coder is not used - # for known types. - typehint = _strip_optional(typehint) - result[key] = cls._config_value(value, typehint) - return result - - def build(self): - """ - :return: ExternalConfigurationPayload - """ - args = self._encode_config(self._values, self._schema) - return ExternalConfigurationPayload(configuration=args) - - -class ImplicitSchemaPayloadBuilder(SchemaBasedPayloadBuilder): - """ - Build a payload that generates a schema from the provided values. - """ - def __init__(self, values): - schema = {key: instance_to_type(value) for key, value in values.items()} - super(ImplicitSchemaPayloadBuilder, self).__init__(values, schema) - - -class NamedTupleBasedPayloadBuilder(SchemaBasedPayloadBuilder): - """ - Build a payload based on a NamedTuple schema. - """ - def __init__(self, tuple_instance): - """ - :param tuple_instance: an instance of a typing.NamedTuple - """ - super(NamedTupleBasedPayloadBuilder, self).__init__( - values=tuple_instance._asdict(), schema=tuple_instance._field_types) - - -class AnnotationBasedPayloadBuilder(SchemaBasedPayloadBuilder): - """ - Build a payload based on an external transform's type annotations. - - Supported in python 3 only. - """ - def __init__(self, transform, **values): - """ - :param transform: a PTransform instance or class. type annotations will - be gathered from its __init__ method - :param values: values to encode - """ - schema = {k: v for k, v in - transform.__init__.__annotations__.items() - if k in values} - super(AnnotationBasedPayloadBuilder, self).__init__(values, schema) - - -class DataclassBasedPayloadBuilder(SchemaBasedPayloadBuilder): - """ - Build a payload based on an external transform that uses dataclasses. - - Supported in python 3 only. - """ - def __init__(self, transform): - """ - :param transform: a dataclass-decorated PTransform instance from which to - gather type annotations and values - """ - import dataclasses - schema = {field.name: field.type for field in - dataclasses.fields(transform)} - super(DataclassBasedPayloadBuilder, self).__init__( - dataclasses.asdict(transform), schema) - class ExternalTransform(ptransform.PTransform): """ @@ -227,29 +56,15 @@ class ExternalTransform(ptransform.PTransform): _EXPANDED_TRANSFORM_UNIQUE_NAME = 'root' _IMPULSE_PREFIX = 'impulse' - def __init__(self, urn, payload, endpoint=None): - endpoint = endpoint or DEFAULT_EXPANSION_SERVICE + def __init__(self, urn, payload, endpoint): if grpc is None and isinstance(endpoint, str): raise NotImplementedError('Grpc required for external transforms.') # TODO: Start an endpoint given an environment? self._urn = urn - self._payload = payload.payload() \ - if isinstance(payload, PayloadBuilder) \ - else payload + self._payload = payload self._endpoint = endpoint self._namespace = self._fresh_namespace() - def __post_init__(self, expansion_service): - """ - This will only be invoked if ExternalTransform is used as a base class - for a class decorated with dataclasses.dataclass - """ - ExternalTransform.__init__( - self, - self.URN, - DataclassBasedPayloadBuilder(self), - expansion_service) - def default_label(self): return '%s(%s)' % (self.__class__.__name__, self._urn) diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py index 6576419..e480f62 100644 --- a/sdks/python/apache_beam/transforms/external_test.py +++ b/sdks/python/apache_beam/transforms/external_test.py @@ -15,7 +15,7 @@ # limitations under the License. # -"""Unit tests for the transform.external classes.""" +"""Unit tests for the transform.util classes.""" from __future__ import absolute_import @@ -23,7 +23,6 @@ import argparse import os import subprocess import sys -import typing import unittest import grpc @@ -33,21 +32,12 @@ from past.builtins import unicode import apache_beam as beam from apache_beam import Pipeline -from apache_beam.coders import FloatCoder -from apache_beam.coders import IterableCoder -from apache_beam.coders import StrUtf8Coder -from apache_beam.coders import TupleCoder -from apache_beam.coders import VarIntCoder from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.portability.api.external_transforms_pb2 import ConfigValue -from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload from apache_beam.runners.portability import expansion_service from apache_beam.runners.portability.expansion_service_test import FibTransform from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to -from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder -from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -58,158 +48,6 @@ except ImportError: # pylint: enable=wrong-import-order, wrong-import-position -def get_payload(args): - return ExternalConfigurationPayload(configuration=args) - - -class PayloadBase(object): - values = { - 'integer_example': 1, - 'string_example': u'thing', - 'list_of_strings': [u'foo', u'bar'], - 'optional_kv': (u'key', 1.1), - 'optional_integer': None, - } - - bytes_values = { - 'integer_example': 1, - 'string_example': 'thing', - 'list_of_strings': ['foo', 'bar'], - 'optional_kv': ('key', 1.1), - 'optional_integer': None, - } - - args = { - 'integer_example': ConfigValue( - coder_urn=['beam:coder:varint:v1'], - payload=VarIntCoder() - .get_impl().encode_nested(values['integer_example'])), - 'string_example': ConfigValue( - coder_urn=['beam:coder:string_utf8:v1'], - payload=StrUtf8Coder() - .get_impl().encode_nested(values['string_example'])), - 'list_of_strings': ConfigValue( - coder_urn=['beam:coder:iterable:v1', - 'beam:coder:string_utf8:v1'], - payload=IterableCoder(StrUtf8Coder()) - .get_impl().encode_nested(values['list_of_strings'])), - 'optional_kv': ConfigValue( - coder_urn=['beam:coder:kv:v1', - 'beam:coder:string_utf8:v1', - 'beam:coder:double:v1'], - payload=TupleCoder([StrUtf8Coder(), FloatCoder()]) - .get_impl().encode_nested(values['optional_kv'])), - } - - def get_payload_from_typing_hints(self, values): - """Return ExternalConfigurationPayload based on python typing hints""" - raise NotImplementedError - - def get_payload_from_beam_typehints(self, values): - """Return ExternalConfigurationPayload based on beam typehints""" - raise NotImplementedError - - def test_typing_payload_builder(self): - result = self.get_payload_from_typing_hints(self.values) - expected = get_payload(self.args) - self.assertEqual(result, expected) - - def test_typing_payload_builder_with_bytes(self): - """ - string_utf8 coder will be used even if values are not unicode in python 2.x - """ - result = self.get_payload_from_typing_hints(self.bytes_values) - expected = get_payload(self.args) - self.assertEqual(result, expected) - - def test_typehints_payload_builder(self): - result = self.get_payload_from_beam_typehints(self.values) - expected = get_payload(self.args) - self.assertEqual(result, expected) - - def test_typehints_payload_builder_with_bytes(self): - """ - string_utf8 coder will be used even if values are not unicode in python 2.x - """ - result = self.get_payload_from_beam_typehints(self.bytes_values) - expected = get_payload(self.args) - self.assertEqual(result, expected) - - def test_optional_error(self): - """ - value can only be None if typehint is Optional - """ - with self.assertRaises(RuntimeError): - self.get_payload_from_typing_hints({k: None for k in self.values}) - - -class ExternalTuplePayloadTest(PayloadBase, unittest.TestCase): - - def get_payload_from_typing_hints(self, values): - TestSchema = typing.NamedTuple( - 'TestSchema', - [ - ('integer_example', int), - ('string_example', unicode), - ('list_of_strings', typing.List[unicode]), - ('optional_kv', typing.Optional[typing.Tuple[unicode, float]]), - ('optional_integer', typing.Optional[int]), - ] - ) - - builder = NamedTupleBasedPayloadBuilder(TestSchema(**values)) - return builder.build() - - def get_payload_from_beam_typehints(self, values): - raise unittest.SkipTest("Beam typehints cannot be used with " - "typing.NamedTuple") - - -class ExternalImplicitPayloadTest(unittest.TestCase): - """ - ImplicitSchemaPayloadBuilder works very differently than the other payload - builders - """ - def test_implicit_payload_builder(self): - builder = ImplicitSchemaPayloadBuilder(PayloadBase.values) - result = builder.build() - expected = get_payload(PayloadBase.args) - self.assertEqual(result, expected) - - def test_implicit_payload_builder_with_bytes(self): - values = PayloadBase.bytes_values - builder = ImplicitSchemaPayloadBuilder(values) - result = builder.build() - if sys.version_info[0] < 3: - # in python 2.x bytes coder will be inferred - args = { - 'integer_example': ConfigValue( - coder_urn=['beam:coder:varint:v1'], - payload=VarIntCoder() - .get_impl().encode_nested(values['integer_example'])), - 'string_example': ConfigValue( - coder_urn=['beam:coder:bytes:v1'], - payload=StrUtf8Coder() - .get_impl().encode_nested(values['string_example'])), - 'list_of_strings': ConfigValue( - coder_urn=['beam:coder:iterable:v1', - 'beam:coder:bytes:v1'], - payload=IterableCoder(StrUtf8Coder()) - .get_impl().encode_nested(values['list_of_strings'])), - 'optional_kv': ConfigValue( - coder_urn=['beam:coder:kv:v1', - 'beam:coder:bytes:v1', - 'beam:coder:double:v1'], - payload=TupleCoder([StrUtf8Coder(), FloatCoder()]) - .get_impl().encode_nested(values['optional_kv'])), - } - expected = get_payload(args) - self.assertEqual(result, expected) - else: - expected = get_payload(PayloadBase.args) - self.assertEqual(result, expected) - - @attr('UsesCrossLanguageTransforms') class ExternalTransformTest(unittest.TestCase): diff --git a/sdks/python/apache_beam/transforms/external_test_py3.py b/sdks/python/apache_beam/transforms/external_test_py3.py deleted file mode 100644 index 88fa870..0000000 --- a/sdks/python/apache_beam/transforms/external_test_py3.py +++ /dev/null @@ -1,93 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the transform.external classes.""" - -from __future__ import absolute_import - -import typing -import unittest - -import apache_beam as beam -from apache_beam import typehints -from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload -from apache_beam.transforms.external import AnnotationBasedPayloadBuilder -from apache_beam.transforms.external_test import PayloadBase - - -def get_payload(cls): - payload = ExternalConfigurationPayload() - payload.ParseFromString(cls._payload) - return payload - - -class ExternalAnnotationPayloadTest(PayloadBase, unittest.TestCase): - - def get_payload_from_typing_hints(self, values): - class AnnotatedTransform(beam.ExternalTransform): - URN = 'beam:external:fakeurn:v1' - - def __init__(self, - integer_example: int, - string_example: str, - list_of_strings: typing.List[str], - optional_kv: typing.Optional[typing.Tuple[str, float]] = None, - optional_integer: typing.Optional[int] = None, - expansion_service=None): - super(AnnotatedTransform, self).__init__( - self.URN, - AnnotationBasedPayloadBuilder( - self, - integer_example=integer_example, - string_example=string_example, - list_of_strings=list_of_strings, - optional_kv=optional_kv, - optional_integer=optional_integer, - ), - expansion_service - ) - - return get_payload(AnnotatedTransform(**values)) - - def get_payload_from_beam_typehints(self, values): - class AnnotatedTransform(beam.ExternalTransform): - URN = 'beam:external:fakeurn:v1' - - def __init__(self, - integer_example: int, - string_example: str, - list_of_strings: typehints.List[str], - optional_kv: typehints.Optional[typehints.KV[str, float]] = None, - optional_integer: typehints.Optional[int] = None, - expansion_service=None): - super(AnnotatedTransform, self).__init__( - self.URN, - AnnotationBasedPayloadBuilder( - self, - integer_example=integer_example, - string_example=string_example, - list_of_strings=list_of_strings, - optional_kv=optional_kv, - optional_integer=optional_integer, - ), - expansion_service - ) - - return get_payload(AnnotatedTransform(**values)) - -if __name__ == '__main__': - unittest.main() diff --git a/sdks/python/apache_beam/transforms/external_test_py37.py b/sdks/python/apache_beam/transforms/external_test_py37.py deleted file mode 100644 index ad1ff72..0000000 --- a/sdks/python/apache_beam/transforms/external_test_py37.py +++ /dev/null @@ -1,71 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the transform.external classes.""" - -from __future__ import absolute_import - -import dataclasses -import typing -import unittest - -import apache_beam as beam -from apache_beam import typehints -from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload -from apache_beam.transforms.external_test import PayloadBase - - -def get_payload(cls): - payload = ExternalConfigurationPayload() - payload.ParseFromString(cls._payload) - return payload - - -class ExternalDataclassesPayloadTest(PayloadBase, unittest.TestCase): - - def get_payload_from_typing_hints(self, values): - - @dataclasses.dataclass - class DataclassTransform(beam.ExternalTransform): - URN = 'beam:external:fakeurn:v1' - - integer_example: int - string_example: str - list_of_strings: typing.List[str] - optional_kv: typing.Optional[typing.Tuple[str, float]] = None - optional_integer: typing.Optional[int] = None - expansion_service: dataclasses.InitVar[typing.Optional[str]] = None - - return get_payload(DataclassTransform(**values)) - - def get_payload_from_beam_typehints(self, values): - - @dataclasses.dataclass - class DataclassTransform(beam.ExternalTransform): - URN = 'beam:external:fakeurn:v1' - - integer_example: int - string_example: str - list_of_strings: typehints.List[str] - optional_kv: typehints.Optional[typehints.KV[str, float]] = None - optional_integer: typehints.Optional[int] = None - expansion_service: dataclasses.InitVar[typehints.Optional[str]] = None - - return get_payload(DataclassTransform(**values)) - -if __name__ == '__main__': - unittest.main() diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh index 4a29b7a..d74dc62 100755 --- a/sdks/python/scripts/generate_pydoc.sh +++ b/sdks/python/scripts/generate_pydoc.sh @@ -77,7 +77,7 @@ excluded_patterns=( *_test.py *_test_common.py # TODO(BEAM-7847): Remove this once doc generation can parse Py3 syntax. - *_py3*.py + *py3.py ) python $(type -p sphinx-apidoc) -fMeT -o target/docs/source apache_beam \ diff --git a/sdks/python/scripts/run_mini_py3lint.sh b/sdks/python/scripts/run_mini_py3lint.sh index 0bd7e0e..27ca3ce 100755 --- a/sdks/python/scripts/run_mini_py3lint.sh +++ b/sdks/python/scripts/run_mini_py3lint.sh @@ -38,24 +38,6 @@ set -o pipefail MODULE=apache_beam -PYTHON_MINOR=$(python -c 'import sys; print(sys.version_info[1])') -if [[ "${PYTHON_MINOR}" == 5 ]]; then - EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3[6-9]\.py$') - echo -e "Excluding Py3 files:\n${EXCLUDED_PY3_FILES}" -else - EXCLUDED_PY3_FILES="" -fi - -FILES_TO_IGNORE="" -for file in ${EXCLUDED_PY3_FILES}; do - if test -z "$FILES_TO_IGNORE" - then FILES_TO_IGNORE="$(basename $file)" - else FILES_TO_IGNORE="$FILES_TO_IGNORE, $(basename $file)" - fi -done - -echo -e "Skipping lint for files:\n${FILES_TO_IGNORE}" - usage(){ echo "Usage: $0 [MODULE|--help] # The default MODULE is $MODULE"; } if test $# -gt 0; then @@ -66,5 +48,4 @@ if test $# -gt 0; then fi echo "Running flake8 for module $MODULE:" -flake8 $MODULE --count --select=E9,F821,F822,F823 --show-source --statistics \ - --exclude="${FILES_TO_IGNORE}" +flake8 $MODULE --count --select=E9,F821,F822,F823 --show-source --statistics diff --git a/sdks/python/scripts/run_pylint.sh b/sdks/python/scripts/run_pylint.sh index 27de9a3..608cd96 100755 --- a/sdks/python/scripts/run_pylint.sh +++ b/sdks/python/scripts/run_pylint.sh @@ -62,7 +62,7 @@ apache_beam/portability/api/*pb2*.py PYTHON_MAJOR=$(python -c 'import sys; print(sys.version_info[0])') if [[ "${PYTHON_MAJOR}" == 2 ]]; then - EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3[0-9]*\.py$') + EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3\.py$') echo -e "Excluding Py3 files:\n${EXCLUDED_PY3_FILES}" else EXCLUDED_PY3_FILES="" diff --git a/sdks/python/setup.cfg b/sdks/python/setup.cfg index 361fe14..69a5187 100644 --- a/sdks/python/setup.cfg +++ b/sdks/python/setup.cfg @@ -51,6 +51,3 @@ exclude_lines = [coverage:xml] output = target/site/cobertura/coverage.xml - -[isort] -known_standard_library = dataclasses diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 6fe8ecf..f792bbf 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -54,28 +54,28 @@ commands_post = [testenv:py27] commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs} + python setup.py nosetests --ignore-files '.*py3.py$' {posargs} [testenv:py35] setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3[6-9]\.py$' {posargs} + python setup.py nosetests {posargs} [testenv:py36] setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3[7-9]\.py$' {posargs} + python setup.py nosetests {posargs} [testenv:py37] setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs} + python setup.py nosetests {posargs} [testenv:py27-cython] # cython tests are only expected to work in linux (2.x and 3.x) @@ -85,7 +85,7 @@ commands = platform = linux2 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs} + python setup.py nosetests --ignore-files '.*py3.py$' {posargs} [testenv:py35-cython] # cython tests are only expected to work in linux (2.x and 3.x) @@ -97,7 +97,7 @@ setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3[5-9]\.py$' {posargs} + python setup.py nosetests {posargs} [testenv:py36-cython] # cython tests are only expected to work in linux (2.x and 3.x) @@ -109,7 +109,7 @@ setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3[7-9]\.py$' {posargs} + python setup.py nosetests {posargs} [testenv:py37-cython] # cython tests are only expected to work in linux (2.x and 3.x) @@ -121,13 +121,13 @@ setenv = RUN_SKIPPED_PY3_TESTS=0 commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs} + python setup.py nosetests {posargs} [testenv:py27-gcp] extras = test,gcp commands = python apache_beam/examples/complete/autocomplete_test.py - python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs} + python setup.py nosetests --ignore-files '.*py3.py$' {posargs} # Old and new Datastore client unit tests cannot be run in the same process # due to conflicting protobuf modules. # TODO(BEAM-4543): Remove these separate nosetests invocations once the @@ -140,14 +140,14 @@ setenv = RUN_SKIPPED_PY3_TESTS=0 extras = test,gcp commands = - python setup.py nosetests --ignore-files '.*py3[6-9]\.py$' {posargs} + python setup.py nosetests {posargs} [testenv:py37-gcp] setenv = RUN_SKIPPED_PY3_TESTS=0 extras = test,gcp commands = - python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs} + python setup.py nosetests {posargs} [testenv:py27-lint] deps =