Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 86129200BC9 for ; Sat, 12 Nov 2016 03:28:38 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 84A5B160AF6; Sat, 12 Nov 2016 02:28:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 921C2160B14 for ; Sat, 12 Nov 2016 03:28:36 +0100 (CET) Received: (qmail 18540 invoked by uid 500); 12 Nov 2016 02:28:35 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 18521 invoked by uid 99); 12 Nov 2016 02:28:35 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Nov 2016 02:28:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 02E04C7D7A for ; Sat, 12 Nov 2016 02:28:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id k2VY3jvvxzeH for ; Sat, 12 Nov 2016 02:28:25 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 4FEEE5F246 for ; Sat, 12 Nov 2016 02:28:20 +0000 (UTC) Received: (qmail 14605 invoked by uid 99); 12 Nov 2016 02:28:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Nov 2016 02:28:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55A3DEEE32; Sat, 12 Nov 2016 02:28:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Sat, 12 Nov 2016 02:28:19 -0000 Message-Id: <243dc18a98a4482dbe87c9edfad4e7e8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/39] incubator-beam git commit: BEAM-261 Apex runner PoC archived-at: Sat, 12 Nov 2016 02:28:38 -0000 Repository: incubator-beam Updated Branches: refs/heads/master e2c21599d -> 7d069a65b http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java new file mode 100644 index 0000000..efb69ee --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translators.utils; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StandardCoder; + +public interface ApexStreamTuple +{ + /** + * Gets the value of the tuple + * + * @return + */ + T getValue(); + + /** + * Plain tuple class + * + * @param + */ + class DataTuple implements ApexStreamTuple + { + private T value; + + public static DataTuple of(T value) { + return new DataTuple<>(value); + } + + private DataTuple(T value) + { + this.value = value; + } + + @Override + public T getValue() + { + return value; + } + + public void setValue(T value) + { + this.value = value; + } + + @Override + public String toString() + { + return value.toString(); + } + + } + + /** + * Tuple that includes a timestamp + * + * @param + */ + class TimestampedTuple extends DataTuple + { + private long timestamp; + + public TimestampedTuple(long timestamp, T value) + { + super(value); + this.timestamp = timestamp; + } + + public long getTimestamp() + { + return timestamp; + } + + public void setTimestamp(long timestamp) + { + this.timestamp = timestamp; + } + } + + /** + * Tuple that represents a watermark + * + * @param + */ + class WatermarkTuple extends TimestampedTuple + { + public static WatermarkTuple of(long timestamp) { + return new WatermarkTuple<>(timestamp); + } + + protected WatermarkTuple(long timestamp) + { + super(timestamp, null); + } + + @Override + public String toString() + { + return "[Watermark " + getTimestamp() + "]"; + } + } + + /** + * Coder for {@link ApexStreamTuple}. + */ + public static class ApexStreamTupleCoder extends StandardCoder> { + private static final long serialVersionUID = 1L; + final Coder valueCoder; + + public static ApexStreamTupleCoder of(Coder valueCoder) { + return new ApexStreamTupleCoder<>(valueCoder); + } + + protected ApexStreamTupleCoder(Coder valueCoder) { + this.valueCoder = checkNotNull(valueCoder); + } + + @Override + public void encode(ApexStreamTuple value, OutputStream outStream, Context context) + throws CoderException, IOException { + if (value instanceof WatermarkTuple) { + outStream.write(1); + new DataOutputStream(outStream).writeLong(((WatermarkTuple)value).getTimestamp()); + } else { + outStream.write(0); + valueCoder.encode(value.getValue(), outStream, context); + } + } + + @Override + public ApexStreamTuple decode(InputStream inStream, Context context) + throws CoderException, IOException + { + int b = inStream.read(); + if (b == 1) { + return new WatermarkTuple(new DataInputStream(inStream).readLong()); + } else { + return new DataTuple(valueCoder.decode(inStream, context)); + } + } + + @Override + public List> getCoderArguments() + { + return Arrays.>asList(valueCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException + { + verifyDeterministic( + this.getClass().getSimpleName() + " requires a deterministic valueCoder", + valueCoder); + } + + /** + * Returns the value coder. + */ + public Coder getValueCoder() { + return valueCoder; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java new file mode 100644 index 0000000..c18765b --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translators.utils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.netlet.util.Slice; +import com.google.common.base.Throwables; + +/** + * The Apex {@link StreamCodec} adapter for using Beam {@link Coder}. + */ +public class CoderAdapterStreamCodec implements StreamCodec, Serializable { + + private static final long serialVersionUID = 1L; + private final Coder coder; + + public CoderAdapterStreamCodec(Coder coder) { + this.coder = coder; + } + + @Override + public Object fromByteArray(Slice fragment) + { + ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length); + try { + return coder.decode(bis, Context.OUTER); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public Slice toByteArray(Object wv) + { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + coder.encode(wv, bos, Context.OUTER); + } catch (IOException e) { + Throwables.propagate(e); + } + return new Slice(bos.toByteArray()); + } + + @Override + public int getPartition(Object o) + { + return o.hashCode(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java new file mode 100644 index 0000000..ffe1a29 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translators.utils; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.values.PCollectionView; + +import java.io.Serializable; + +import javax.annotation.Nullable; + +/** + * no-op side input reader. + */ +public class NoOpSideInputReader implements SideInputReader, Serializable { + @Nullable + @Override + public T get(PCollectionView view, BoundedWindow window) { + return null; + } + + @Override + public boolean contains(PCollectionView view) { + return false; + } + + @Override + public boolean isEmpty() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java new file mode 100644 index 0000000..43d92f6 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translators.utils; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.TupleTag; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Serializable {@link ExecutionContext.StepContext} that does nothing. + */ +public class NoOpStepContext implements ExecutionContext.StepContext, Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public String getStepName() { + return null; + } + + @Override + public String getTransformName() { + return null; + } + + @Override + public void noteOutput(WindowedValue output) { + } + + @Override + public void noteSideOutput(TupleTag tag, WindowedValue output) { + } + + @Override + public void writePCollectionViewData(TupleTag tag, + Iterable> data, + Coder>> dataCoder, W window, Coder windowCoder) throws + IOException { + + } + + @Override + public StateInternals stateInternals() { + return null; + } + + @Override + public TimerInternals timerInternals() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java new file mode 100644 index 0000000..7f7b3ef --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translators.utils; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * A wrapper to enable serialization of {@link PipelineOptions} + */ +public class SerializablePipelineOptions implements Externalizable { + + private transient ApexPipelineOptions pipelineOptions; + + public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) { + this.pipelineOptions = pipelineOptions; + } + + public SerializablePipelineOptions() { + } + + public ApexPipelineOptions get() { + return this.pipelineOptions; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException + { + out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions)); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException + { + String s = in.readUTF(); + this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class).as(ApexPipelineOptions.class); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java new file mode 100644 index 0000000..582d839 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.examples; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Windowed word count example on Apex runner. + */ +public class StreamingWordCountTest { + + static class ExtractWordsFn extends DoFn { + private final Aggregator emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @ProcessElement + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + static class FormatAsStringFn extends DoFn, String> { + private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class); + static final ConcurrentHashMap RESULTS = new ConcurrentHashMap<>(); + + @ProcessElement + public void processElement(ProcessContext c) { + String row = c.element().getKey() + " - " + c.element().getValue() + + " @ " + c.timestamp().toString(); + LOG.debug("output {}", row); + c.output(row); + RESULTS.put(c.element().getKey(), c.element().getValue()); + } + } + + @Test + public void testWindowedWordCount() throws Exception { + String[] args = new String[] { + "--runner=" + ApexRunner.class.getName() + }; + ApexPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() + .as(ApexPipelineOptions.class); + options.setApplicationName("StreamingWordCount"); + options.setParallelism(1); + Pipeline p = Pipeline.create(options); + + PCollection> wordCounts = + p.apply(Read.from(new UnboundedTextSource())) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))) + .apply(Count.perElement()); + + wordCounts.apply(ParDo.of(new FormatAsStringFn())); + + ApexRunnerResult result = (ApexRunnerResult)p.run(); + Assert.assertNotNull(result.getApexDAG().getOperatorMeta("Read(UnboundedTextSource)")); + long timeout = System.currentTimeMillis() + 30000; + while (System.currentTimeMillis() < timeout) { + if (FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar")) { + break; + } + Thread.sleep(1000); + } + result.cancel(); + Assert.assertTrue(FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar")); + FormatAsStringFn.RESULTS.clear(); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java new file mode 100644 index 0000000..29351e9 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.examples; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.joda.time.Instant; + +import com.google.common.base.Throwables; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +/** + * unbounded source that reads from text. + */ +public class UnboundedTextSource extends UnboundedSource { + private static final long serialVersionUID = 1L; + + @Override + public List> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + return Collections.>singletonList(this); + } + + @Override + public UnboundedReader createReader(PipelineOptions options, + @Nullable CheckpointMark checkpointMark) { + return new UnboundedTextReader(this); + } + + @Nullable + @Override + public Coder getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { + } + + @Override + public Coder getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + + /** + * reads from text. + */ + public static class UnboundedTextReader extends UnboundedReader implements Serializable { + + private static final long serialVersionUID = 7526472295622776147L; + + private final UnboundedTextSource source; + + private final String[] texts = new String[]{"foo foo foo bar bar", "foo foo bar bar bar"}; + private long index = 0; + + private String currentRecord; + + private Instant currentTimestamp; + + public UnboundedTextReader(UnboundedTextSource source) { + this.source = source; + } + + @Override + public boolean start() throws IOException { + currentRecord = texts[0]; + currentTimestamp = new Instant(0); + return true; + } + + @Override + public boolean advance() throws IOException { + index++; + currentRecord = texts[(int) index % (texts.length)]; + currentTimestamp = new Instant(index * 1000); + try { + Thread.sleep(index); // allow for downstream processing to complete + } catch (InterruptedException e) { + Throwables.propagate(e); + } + return true; + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + return new byte[0]; + } + + @Override + public String getCurrent() throws NoSuchElementException { + return this.currentRecord; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return currentTimestamp; + } + + @Override + public void close() throws IOException { + } + + @Override + public Instant getWatermark() { + return currentTimestamp; + } + + @Override + public CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource getCurrentSource() { + return this.source; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java new file mode 100644 index 0000000..d3b56bc --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.List; + +/** + * integration test for {@link FlattenPCollectionTranslator}. + */ +public class FlattenPCollectionTranslatorTest { + private static final Logger LOG = LoggerFactory.getLogger(FlattenPCollectionTranslatorTest.class); + + @Test + public void test() throws Exception { + ApexPipelineOptions options = + PipelineOptionsFactory.as(ApexPipelineOptions.class); + options.setApplicationName("FlattenPCollection"); + options.setRunner(ApexRunner.class); + Pipeline p = Pipeline.create(options); + + List collection1 = Lists.newArrayList("1", "2", "3"); + List collection2 = Lists.newArrayList("4", "5"); + List expected = Lists.newArrayList("1", "2", "3", "4", "5"); + PCollection pc1 = + p.apply(Create.of(collection1).withCoder(StringUtf8Coder.of())); + PCollection pc2 = + p.apply(Create.of(collection2).withCoder(StringUtf8Coder.of())); + PCollectionList pcs = PCollectionList.of(pc1).and(pc2); + PCollection actual = pcs.apply(Flatten.pCollections()); + actual.apply(ParDo.of(new EmbeddedCollector())); + + ApexRunnerResult result = (ApexRunnerResult)p.run(); + // TODO: verify translation + result.getApexDAG(); + long timeout = System.currentTimeMillis() + 30000; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.results.containsAll(expected)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(1000); + } + org.junit.Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results); + + } + + @SuppressWarnings("serial") + private static class EmbeddedCollector extends OldDoFn { + protected static final HashSet results = new HashSet<>(); + + public EmbeddedCollector() { + } + + @Override + public void processElement(ProcessContext c) throws Exception { + results.add(c.element()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java new file mode 100644 index 0000000..e4d4606 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; + +import com.datatorrent.api.DAG; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +/** + * integration test for {@link GroupByKeyTranslator}. + */ +public class GroupByKeyTranslatorTest { + + @SuppressWarnings({"unchecked"}) + @Test + public void test() throws Exception { + ApexPipelineOptions options = + PipelineOptionsFactory.as(ApexPipelineOptions.class); + options.setApplicationName("GroupByKey"); + options.setRunner(ApexRunner.class); + Pipeline p = Pipeline.create(options); + + List> data = + Lists.newArrayList( + KV.of("foo", new Instant(1000)), KV.of("foo", new Instant(1000)), + KV.of("foo", new Instant(2000)), + KV.of("bar", new Instant(1000)), KV.of("bar", new Instant(2000)), + KV.of("bar", new Instant(2000)) + ); + + // expected results assume outputAtLatestInputTimestamp + List>> expected = + Lists.newArrayList( + KV.of(new Instant(1000), KV.of("foo", 2L)), + KV.of(new Instant(1000), KV.of("bar", 1L)), + KV.of(new Instant(2000), KV.of("foo", 1L)), + KV.of(new Instant(2000), KV.of("bar", 2L)) + ); + + p.apply(Read.from(new TestSource(data, new Instant(5000)))) + .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))) + .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())) + .apply(Count.perElement()) + .apply(ParDo.of(new KeyedByTimestamp>())) + .apply(ParDo.of(new EmbeddedCollector())) + ; + + ApexRunnerResult result = (ApexRunnerResult)p.run(); + // TODO: verify translation + DAG dag = result.getApexDAG(); + + long timeout = System.currentTimeMillis() + 30000; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.results.containsAll(expected)) { + break; + } + Thread.sleep(1000); + } + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results); + + } + + @SuppressWarnings("serial") + private static class EmbeddedCollector extends OldDoFn { + protected static final HashSet results = new HashSet<>(); + + public EmbeddedCollector() { + } + + @Override + public void processElement(ProcessContext c) throws Exception { + results.add(c.element()); + } + } + + private static class KeyedByTimestamp extends OldDoFn> { + + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(KV.of(c.timestamp(), c.element())); + } + } + + private static class TestSource extends UnboundedSource { + + private final List> data; + private final Instant watermark; + + public TestSource(List> data, Instant watermark) { + this.data = data; + this.watermark = watermark; + } + + @Override + public List> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + return Collections.>singletonList(this); + } + + @Override + public UnboundedReader createReader(PipelineOptions options, + @Nullable CheckpointMark checkpointMark) { + return new TestReader(data, watermark, this); + } + + @Nullable + @Override + public Coder getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { + } + + @Override + public Coder getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + + private static class TestReader extends UnboundedReader implements Serializable { + + private static final long serialVersionUID = 7526472295622776147L; + + private final List> data; + private final TestSource source; + + private Iterator> iterator; + private String currentRecord; + private Instant currentTimestamp; + private Instant watermark; + private boolean collected; + + public TestReader(List> data, Instant watermark, TestSource source) { + this.data = data; + this.source = source; + this.watermark = watermark; + } + + @Override + public boolean start() throws IOException { + iterator = data.iterator(); + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (iterator.hasNext()) { + KV kv = iterator.next(); + collected = false; + currentRecord = kv.getKey(); + currentTimestamp = kv.getValue(); + return true; + } else { + return false; + } + } + + @Override + public byte[] getCurrentRecordId() throws NoSuchElementException { + return new byte[0]; + } + + @Override + public String getCurrent() throws NoSuchElementException { + collected = true; + return this.currentRecord; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return currentTimestamp; + } + + @Override + public void close() throws IOException { + } + + @Override + public Instant getWatermark() { + if (!iterator.hasNext() && collected) { + return watermark; + } else { + return new Instant(0); + } + } + + @Override + public CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource getCurrentSource() { + return this.source; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java new file mode 100644 index 0000000..06aaf55 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator; +import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +import com.datatorrent.api.DAG; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.HashSet; +import java.util.List; +import java.util.regex.Pattern; + +/** + * integration test for {@link ParDoBoundTranslator}. + */ +@RunWith(JUnit4.class) +public class ParDoBoundTranslatorTest { + private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorTest.class); + + @Test + public void test() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + options.setApplicationName("ParDoBound"); + options.setRunner(ApexRunner.class); + + Pipeline p = Pipeline.create(options); + + List collection = Lists.newArrayList(1, 2, 3, 4, 5); + List expected = Lists.newArrayList(6, 7, 8, 9, 10); + p.apply(Create.of(collection).withCoder(SerializableCoder.of(Integer.class))) + .apply(ParDo.of(new Add(5))) + .apply(ParDo.of(new EmbeddedCollector())); + + ApexRunnerResult result = (ApexRunnerResult)p.run(); + DAG dag = result.getApexDAG(); + + DAG.OperatorMeta om = dag.getOperatorMeta("Create.Values"); + Assert.assertNotNull(om); + Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class); + + om = dag.getOperatorMeta("ParDo(Add)"); + Assert.assertNotNull(om); + Assert.assertEquals(om.getOperator().getClass(), ApexParDoOperator.class); + + long timeout = System.currentTimeMillis() + 30000; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.results.containsAll(expected)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(1000); + } + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results); + } + + @SuppressWarnings("serial") + private static class Add extends OldDoFn { + private final Integer number; + + public Add(Integer number) { + this.number = number; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element() + number); + } + } + + @SuppressWarnings("serial") + private static class EmbeddedCollector extends OldDoFn { + protected static final HashSet results = new HashSet<>(); + + public EmbeddedCollector() { + } + + @Override + public void processElement(ProcessContext c) throws Exception { + results.add(c.element()); + } + } + + + @Ignore + @Test + public void testAssertionFailure() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + options.setRunner(ApexRunner.class); + Pipeline pipeline = Pipeline.create(options); + + PCollection pcollection = pipeline + .apply(Create.of(1, 2, 3, 4)); + PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3, 7); + + Throwable exc = runExpectingAssertionFailure(pipeline); + Pattern expectedPattern = Pattern.compile( + "Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any order"); + // A loose pattern, but should get the job done. + assertTrue( + "Expected error message from PAssert with substring matching " + + expectedPattern + + " but the message was \"" + + exc.getMessage() + + "\"", + expectedPattern.matcher(exc.getMessage()).find()); + } + + private static Throwable runExpectingAssertionFailure(Pipeline pipeline) { + // We cannot use thrown.expect(AssertionError.class) because the AssertionError + // is first caught by JUnit and causes a test failure. + try { + pipeline.run(); + } catch (AssertionError exc) { + return exc; + } + fail("assertion should have failed"); + throw new RuntimeException("unreachable"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java new file mode 100644 index 0000000..6260632 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.runners.apex.ApexRunner; +import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; +import org.apache.beam.runners.apex.translators.utils.CollectionSource; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.ParDo; + +import com.datatorrent.api.DAG; +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * integration test for {@link ReadUnboundedTranslator}. + */ +public class ReadUnboundTranslatorTest { + private static final Logger LOG = LoggerFactory.getLogger(ReadUnboundTranslatorTest.class); + + @Test + public void test() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + EmbeddedCollector.results.clear(); + options.setApplicationName("ReadUnbound"); + options.setRunner(ApexRunner.class); + Pipeline p = Pipeline.create(options); + + List collection = Lists.newArrayList("1", "2", "3", "4", "5"); + CollectionSource source = new CollectionSource<>(collection, StringUtf8Coder.of()); + p.apply(Read.from(source)) + .apply(ParDo.of(new EmbeddedCollector())); + + ApexRunnerResult result = (ApexRunnerResult)p.run(); + DAG dag = result.getApexDAG(); + DAG.OperatorMeta om = dag.getOperatorMeta("Read(CollectionSource)"); + Assert.assertNotNull(om); + Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class); + + long timeout = System.currentTimeMillis() + 30000; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.results.containsAll(collection)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(1000); + } + Assert.assertEquals(Sets.newHashSet(collection), EmbeddedCollector.results); + } + + @Test + public void testReadBounded() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + EmbeddedCollector.results.clear(); + options.setApplicationName("ReadBounded"); + options.setRunner(ApexRunner.class); + Pipeline p = Pipeline.create(options); + + Set expected = ContiguousSet.create(Range.closedOpen(0L, 10L), DiscreteDomain.longs()); + p.apply(Read.from(CountingSource.upTo(10))) + .apply(ParDo.of(new EmbeddedCollector())); + + ApexRunnerResult result = (ApexRunnerResult)p.run(); + DAG dag = result.getApexDAG(); + DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)/Read(BoundedCountingSource)/Read(BoundedToUnboundedSourceAdapter)"); + Assert.assertNotNull(om); + Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class); + + long timeout = System.currentTimeMillis() + 30000; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.results.containsAll(expected)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(1000); + } + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results); + } + + @SuppressWarnings("serial") + private static class EmbeddedCollector extends OldDoFn { + protected static final HashSet results = new HashSet<>(); + + public EmbeddedCollector() { + } + + @Override + public void processElement(ProcessContext c) throws Exception { + results.add(c.element()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java new file mode 100644 index 0000000..a1e8b3e --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.translators.utils; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; + +import org.joda.time.Instant; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +/** + * collection as {@link UnboundedSource}, used for tests. + */ +public class CollectionSource extends UnboundedSource { + + private final Collection collection; + private final Coder coder; + + public CollectionSource(Collection collection, Coder coder) { + this.collection = collection; + this.coder = coder; + } + + @Override + public List> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + return Collections.singletonList(this); + } + + @Override + public UnboundedReader createReader(PipelineOptions options, + @Nullable UnboundedSource.CheckpointMark checkpointMark) { + return new CollectionReader<>(collection, this); + } + + @Nullable + @Override + public Coder getCheckpointMarkCoder() { + return null; + } + + @Override + public void validate() { + } + + @Override + public Coder getDefaultOutputCoder() { + return coder; + } + + private static class CollectionReader extends UnboundedSource.UnboundedReader + implements Serializable { + + private T current; + private final CollectionSource source; + private final Collection collection; + private Iterator iterator; + + public CollectionReader(Collection collection, CollectionSource source) { + this.collection = collection; + this.source = source; + } + + @Override + public boolean start() throws IOException { + if (null == iterator) { + iterator = collection.iterator(); + } + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (iterator.hasNext()) { + current = iterator.next(); + return true; + } else { + return false; + } + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public UnboundedSource.CheckpointMark getCheckpointMark() { + return null; + } + + @Override + public UnboundedSource getCurrentSource() { + return source; + } + + @Override + public T getCurrent() throws NoSuchElementException { + return current; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public void close() throws IOException { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java new file mode 100644 index 0000000..e2fa9d9 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translators.utils; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datatorrent.common.util.FSStorageAgent; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +/** + * Tests the serialization of PipelineOptions. + */ +public class PipelineOptionsTest { + + public interface MyOptions extends ApexPipelineOptions { + @Description("Bla bla bla") + @Default.String("Hello") + String getTestOption(); + void setTestOption(String value); + } + + private static class MyOptionsWrapper { + private MyOptionsWrapper() { + this(null); // required for Kryo + } + private MyOptionsWrapper(ApexPipelineOptions options) { + this.options = new SerializablePipelineOptions(options); + } + @Bind(JavaSerializer.class) + private final SerializablePipelineOptions options; + } + + private static MyOptions options; + + private final static String[] args = new String[]{"--testOption=nothing"}; + + @BeforeClass + public static void beforeTest() { + options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class); + } + + @Test + public void testSerialization() { + MyOptionsWrapper wrapper = new MyOptionsWrapper(PipelineOptionsTest.options); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + FSStorageAgent.store(bos, wrapper); + + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + MyOptionsWrapper wrapperCopy = (MyOptionsWrapper)FSStorageAgent.retrieve(bis); + assertNotNull(wrapperCopy.options); + assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/apex/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/resources/log4j.properties b/runners/apex/src/test/resources/log4j.properties new file mode 100644 index 0000000..84a6f68 --- /dev/null +++ b/runners/apex/src/test/resources/log4j.properties @@ -0,0 +1,33 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=DEBUG, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=info +log4j.logger.org.apache.apex=debug +log4j.logger.org.apache.beam.runners.apex=debug http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaf38ddf/runners/pom.xml ---------------------------------------------------------------------- diff --git a/runners/pom.xml b/runners/pom.xml index 605c3b2..ff800d1 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -37,6 +37,7 @@ direct-java flink spark + apex