beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [19/50] [abbrv] incubator-beam git commit: [flink] adjust directories according to package name
Date Fri, 04 Mar 2016 18:11:14 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
new file mode 100644
index 0000000..e6c5ae2
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
@@ -0,0 +1,303 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointReader;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointUtils;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointWriter;
+import com.google.cloud.dataflow.sdk.coders.*;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.CombineWithContext;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
+import com.google.cloud.dataflow.sdk.util.TimeDomain;
+import com.google.cloud.dataflow.sdk.util.TimerInternals;
+import com.google.cloud.dataflow.sdk.util.state.*;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+
+public class StateSerializationTest {
+
+  private static final StateNamespace NAMESPACE_1 = StateNamespaces.global();
+  private static final String KEY_PREFIX = "TEST_";
+
+  // TODO: This can be replaced with the standard Sum.SumIntererFn once the state no longer
needs
+  // to create a StateTag at the point of restoring state. Currently StateTags are compared
strictly
+  // by type and combiners always use KeyedCombineFnWithContext rather than KeyedCombineFn
or CombineFn.
+  private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[],
Integer> SUM_COMBINER =
+    new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer>()
{
+      @Override
+      public int[] createAccumulator(Object key, CombineWithContext.Context c) {
+        return new int[1];
+      }
+
+      @Override
+      public int[] addInput(Object key, int[] accumulator, Integer value, CombineWithContext.Context
c) {
+        accumulator[0] += value;
+        return accumulator;
+      }
+
+      @Override
+      public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, CombineWithContext.Context
c) {
+        int[] r = new int[1];
+        for (int[] a : accumulators) {
+          r[0] += a[0];
+        }
+        return r;
+      }
+
+      @Override
+      public Integer extractOutput(Object key, int[] accumulator, CombineWithContext.Context
c) {
+        return accumulator[0];
+      }
+    };
+
+  private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of(
+    VarIntCoder.of(),
+    new DelegateCoder.CodingFunction<int[], Integer>() {
+      @Override
+      public Integer apply(int[] accumulator) {
+        return accumulator[0];
+      }
+    },
+    new DelegateCoder.CodingFunction<Integer, int[]>() {
+      @Override
+      public int[] apply(Integer value) {
+        int[] a = new int[1];
+        a[0] = value;
+        return a;
+      }
+    });
+
+  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR
=
+    StateTags.value("stringValue", StringUtf8Coder.of());
+  private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR =
+    StateTags.value("stringValue", VarIntCoder.of());
+  private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR =
+    StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, SUM_COMBINER);
+  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+    StateTags.bag("stringBag", StringUtf8Coder.of());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_BAG_ADDR
=
+    StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+
+  private Map<String, FlinkStateInternals<String>> statePerKey = new HashMap<>();
+
+  private Map<String, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+
+  private void initializeStateAndTimers() throws CannotProvideCoderException {
+    for (int i = 0; i < 10; i++) {
+      String key = KEY_PREFIX + i;
+
+      FlinkStateInternals state = initializeStateForKey(key);
+      Set<TimerInternals.TimerData> timers = new HashSet<>();
+      for (int j = 0; j < 5; j++) {
+        TimerInternals.TimerData timer = TimerInternals
+          .TimerData.of(NAMESPACE_1,
+            new Instant(1000 + i + j), TimeDomain.values()[j % 3]);
+        timers.add(timer);
+      }
+
+      statePerKey.put(key, state);
+      activeTimers.put(key, timers);
+    }
+  }
+
+  private FlinkStateInternals<String> initializeStateForKey(String key) throws CannotProvideCoderException
{
+    FlinkStateInternals<String> state = createState(key);
+
+    ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    value.write("test");
+
+    ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
+    value2.write(4);
+    value2.write(5);
+
+    AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1,
SUM_INTEGER_ADDR);
+    combiningValue.add(1);
+    combiningValue.add(2);
+
+    WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+    watermark.add(new Instant(1000));
+
+    BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
+    bag.add("v1");
+    bag.add("v2");
+    bag.add("v3");
+    bag.add("v4");
+    return state;
+  }
+
+  private boolean restoreAndTestState(DataInputView in) throws Exception {
+    StateCheckpointReader reader = new StateCheckpointReader(in);
+    final ClassLoader userClassloader = this.getClass().getClassLoader();
+    Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder();
+    Coder<String> keyCoder = StringUtf8Coder.of();
+
+    boolean comparisonRes = true;
+
+    for (String key : statePerKey.keySet()) {
+      comparisonRes &= checkStateForKey(key);
+    }
+
+    // restore the timers
+    Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = StateCheckpointUtils.decodeTimers(reader,
windowCoder, keyCoder);
+    if (activeTimers.size() != restoredTimersPerKey.size()) {
+      return false;
+    }
+
+    for (String key : statePerKey.keySet()) {
+      Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key);
+      Set<TimerInternals.TimerData> restoredTimers = restoredTimersPerKey.get(key);
+      comparisonRes &= checkTimersForKey(originalTimers, restoredTimers);
+    }
+
+    // restore the state
+    Map<String, FlinkStateInternals<String>> restoredPerKeyState =
+      StateCheckpointUtils.decodeState(reader, OutputTimeFns.outputAtEarliestInputTimestamp(),
keyCoder, windowCoder, userClassloader);
+    if (restoredPerKeyState.size() != statePerKey.size()) {
+      return false;
+    }
+
+    for (String key : statePerKey.keySet()) {
+      FlinkStateInternals<String> originalState = statePerKey.get(key);
+      FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key);
+      comparisonRes &= checkStateForKey(originalState, restoredState);
+    }
+    return comparisonRes;
+  }
+
+  private boolean checkStateForKey(String key) throws CannotProvideCoderException {
+    FlinkStateInternals<String> state = statePerKey.get(key);
+
+    ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    boolean comp = value.read().equals("test");
+
+    ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
+    comp &= value2.read().equals(5);
+
+    AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1,
SUM_INTEGER_ADDR);
+    comp &= combiningValue.read().equals(3);
+
+    WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+    comp &= watermark.read().equals(new Instant(1000));
+
+    BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
+    Iterator<String> it = bag.read().iterator();
+    int i = 0;
+    while (it.hasNext()) {
+      comp &= it.next().equals("v" + (++i));
+    }
+    return comp;
+  }
+
+  private void storeState(AbstractStateBackend.CheckpointStateOutputView out) throws Exception
{
+    StateCheckpointWriter checkpointBuilder = StateCheckpointWriter.create(out);
+    Coder<String> keyCoder = StringUtf8Coder.of();
+
+    // checkpoint the timers
+    StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, keyCoder);
+
+    // checkpoint the state
+    StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder);
+  }
+
+  private boolean checkTimersForKey(Set<TimerInternals.TimerData> originalTimers, Set<TimerInternals.TimerData>
restoredTimers) {
+    boolean comp = true;
+    if (restoredTimers == null) {
+      return false;
+    }
+
+    if (originalTimers.size() != restoredTimers.size()) {
+      return false;
+    }
+
+    for (TimerInternals.TimerData timer : originalTimers) {
+      comp &= restoredTimers.contains(timer);
+    }
+    return comp;
+  }
+
+  private boolean checkStateForKey(FlinkStateInternals<String> originalState, FlinkStateInternals<String>
restoredState) throws CannotProvideCoderException {
+    if (restoredState == null) {
+      return false;
+    }
+
+    ValueState<String> orValue = originalState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    ValueState<String> resValue = restoredState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    boolean comp = orValue.read().equals(resValue.read());
+
+    ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, INT_VALUE_ADDR);
+    ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, INT_VALUE_ADDR);
+    comp &= orIntValue.read().equals(resIntValue.read());
+
+    AccumulatorCombiningState<Integer, int[], Integer> combOrValue = originalState.state(NAMESPACE_1,
SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> combResValue = restoredState.state(NAMESPACE_1,
SUM_INTEGER_ADDR);
+    comp &= combOrValue.read().equals(combResValue.read());
+
+    WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+    WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+    comp &= orWatermark.read().equals(resWatermark.read());
+
+    BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> resBag = restoredState.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    Iterator<String> orIt = orBag.read().iterator();
+    Iterator<String> resIt = resBag.read().iterator();
+
+    while (orIt.hasNext() && resIt.hasNext()) {
+      comp &= orIt.next().equals(resIt.next());
+    }
+
+    return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() &&
resIt.hasNext())) && comp;
+  }
+
+  private FlinkStateInternals<String> createState(String key) throws CannotProvideCoderException
{
+    return new FlinkStateInternals<>(
+      key,
+      StringUtf8Coder.of(),
+      IntervalWindow.getCoder(),
+      OutputTimeFns.outputAtEarliestInputTimestamp());
+  }
+
+  @Test
+  public void test() throws Exception {
+    StateSerializationTest test = new StateSerializationTest();
+    test.initializeStateAndTimers();
+
+    MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new MemoryStateBackend.MemoryCheckpointOutputStream(32048);
+    AbstractStateBackend.CheckpointStateOutputView out = new AbstractStateBackend.CheckpointStateOutputView(memBackend);
+
+    test.storeState(out);
+
+    byte[] contents = memBackend.closeAndGetBytes();
+    DataInputView in = new DataInputDeserializer(contents, 0, contents.length);
+
+    assertEquals(test.restoreAndTestState(in), true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
new file mode 100644
index 0000000..f0b93a0
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import org.apache.beam.runners.flink.FlinkTestPipeline;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+
+/**
+ * Session window test
+ */
+public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable
{
+  protected String resultPath;
+
+  public TopWikipediaSessionsITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "user: user1 value:3",
+      "user: user1 value:1",
+      "user: user2 value:4",
+      "user: user2 value:6",
+      "user: user3 value:7",
+      "user: user3 value:2"
+  };
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForStreaming();
+
+    Long now = (System.currentTimeMillis() + 10000) / 1000;
+
+    PCollection<KV<String, Long>> output =
+      p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
+          ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now).set
+          ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
+          ("contributor_username", "user1"), new TableRow().set("timestamp", now).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
+          ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
+          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
+          ("contributor_username", "user2"), new TableRow().set("timestamp", now)
+          .set("contributor_username", "user3"))))
+
+
+
+      .apply(ParDo.of(new DoFn<TableRow, String>() {
+        @Override
+        public void processElement(ProcessContext c) throws Exception {
+          TableRow row = c.element();
+          long timestamp = (Integer) row.get("timestamp");
+          String userName = (String) row.get("contributor_username");
+          if (userName != null) {
+            // Sets the timestamp field to be used in windowing.
+            c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
+          }
+        }
+      }))
+
+      .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
+
+      .apply(Count.<String>perElement());
+
+    PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>,
String>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        KV<String, Long> el = c.element();
+        String out = "user: " + el.getKey() + " value:" + el.getValue();
+        c.output(out);
+      }
+    }));
+
+    format.apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java
new file mode 100644
index 0000000..620dace
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.util;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+
+/**
+ * Copied from {@link com.google.cloud.dataflow.examples.JoinExamples} because the code
+ * is private there.
+ */
+public class JoinExamples {
+
+  // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
+  private static final String GDELT_EVENTS_TABLE =
+      "clouddataflow-readonly:samples.gdelt_sample";
+  // A table that maps country codes to country names.
+  private static final String COUNTRY_CODES =
+      "gdelt-bq:full.crosswalk_geocountrycodetohuman";
+
+  /**
+   * Join two collections, using country code as the key.
+   */
+  public static PCollection<String> joinEvents(PCollection<TableRow> eventsTable,
+                                        PCollection<TableRow> countryCodes) throws
Exception {
+
+    final TupleTag<String> eventInfoTag = new TupleTag<>();
+    final TupleTag<String> countryInfoTag = new TupleTag<>();
+
+    // transform both input collections to tuple collections, where the keys are country
+    // codes in both cases.
+    PCollection<KV<String, String>> eventInfo = eventsTable.apply(
+        ParDo.of(new ExtractEventDataFn()));
+    PCollection<KV<String, String>> countryInfo = countryCodes.apply(
+        ParDo.of(new ExtractCountryInfoFn()));
+
+    // country code 'key' -> CGBKR (<event info>, <country name>)
+    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
+        .of(eventInfoTag, eventInfo)
+        .and(countryInfoTag, countryInfo)
+        .apply(CoGroupByKey.<String>create());
+
+    // Process the CoGbkResult elements generated by the CoGroupByKey transform.
+    // country code 'key' -> string of <event info>, <country name>
+    PCollection<KV<String, String>> finalResultCollection =
+        kvpCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String,
String>>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            KV<String, CoGbkResult> e = c.element();
+            CoGbkResult val = e.getValue();
+            String countryCode = e.getKey();
+            String countryName;
+            countryName = e.getValue().getOnly(countryInfoTag, "Kostas");
+            for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) {
+              // Generate a string that combines information from both collection values
+              c.output(KV.of(countryCode, "Country name: " + countryName
+                  + ", Event info: " + eventInfo));
+            }
+          }
+        }));
+
+    // write to GCS
+    return finalResultCollection
+        .apply(ParDo.of(new DoFn<KV<String, String>, String>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            String outputstring = "Country code: " + c.element().getKey()
+                + ", " + c.element().getValue();
+            c.output(outputstring);
+          }
+        }));
+  }
+
+  /**
+   * Examines each row (event) in the input table. Output a KV with the key the country
+   * code of the event, and the value a string encoding event information.
+   */
+  static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>>
{
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
+      String countryCode = (String) row.get("ActionGeo_CountryCode");
+      String sqlDate = (String) row.get("SQLDATE");
+      String actor1Name = (String) row.get("Actor1Name");
+      String sourceUrl = (String) row.get("SOURCEURL");
+      String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl;
+      c.output(KV.of(countryCode, eventInfo));
+    }
+  }
+
+
+  /**
+   * Examines each row (country info) in the input table. Output a KV with the key the country
+   * code, and the value the country name.
+   */
+  static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>>
{
+    @Override
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
+      String countryCode = (String) row.get("FIPSCC");
+      String countryName = (String) row.get("HumanName");
+      c.output(KV.of(countryCode, countryName));
+    }
+  }
+
+
+  /**
+   * Options supported by {@link JoinExamples}.
+   * <p>
+   * Inherits standard configuration options.
+   */
+  private interface Options extends PipelineOptions {
+    @Description("Path of the file to write to")
+    @Validation.Required
+    String getOutput();
+    void setOutput(String value);
+  }
+
+  public static void main(String[] args) throws Exception {
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+    // the following two 'applys' create multiple inputs to our pipeline, one for each
+    // of our two input sources.
+    PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
+    PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
+    PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
+    formattedResults.apply(TextIO.Write.to(options.getOutput()));
+    p.run();
+  }
+
+}


Mime
View raw message