flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Miller" <chris...@gmail.com>
Subject Temporal tables not behaving as expected
Date Mon, 21 Jan 2019 14:45:37 GMT
Hi all,

I'm new to Flink so am probably missing something simple. I'm using 
Flink 1.7.1 and am trying to use temporal table functions but aren't 
getting the results I expect. With the example code below, I would 
expect 4 records to be output (one for each order), but instead I'm only 
seeing a (random) subset of these records (it varies on each run). To 
compound my confusion further, the CSV output often shows a different 
subset of results than those written to the console. I assume there's a 
race condition of some sort but I can't figure out where it is. Any 
ideas what I'm doing wrong?


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;

public class Test {
   public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
     StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);

     List<Tuple2<String, Double>> rateData = Arrays.asList(
         new Tuple2<>("GBP", 1.29),
         new Tuple2<>("EUR", 1.14),
         new Tuple2<>("EUR", 1.15),
         new Tuple2<>("GBP", 1.30));
     DataStreamSource<Tuple2<String, Double>> rateStream = 
env.addSource(new DelayedSource<>(rateData, 1L));
     rateStream.returns(new TypeHint<Tuple2<String, Double>>() {});

     Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency, 
Rate, FxRates_ProcTime.proctime");
     TemporalTableFunction rates = 
rateHistory.createTemporalTableFunction("FxRates_ProcTime", "Currency");
     tableEnv.registerFunction("FxRates", rates);

     List<Tuple3<Integer, String, Double>> orderData = Arrays.asList(
         new Tuple3<>(1, "GBP", 4.51),
         new Tuple3<>(2, "GBP", 23.68),
         new Tuple3<>(3, "EUR", 2.99),
         new Tuple3<>(4, "EUR", 14.76));

     DataStreamSource<Tuple3<Integer, String, Double>> orderStream = 
env.addSource(new DelayedSource<>(orderData, 100L));
     orderStream.returns(new TypeHint<Tuple3<Integer, String, Double>>() 
{});

     Table orders = tableEnv.fromDataStream(orderStream, "OrderId, 
o_Currency, Amount, Order_ProcTime.proctime");
     Table usdOrders = orders.join(new Table(tableEnv, 
"FxRates(Order_ProcTime)"), "o_Currency = Currency")
                             .select("OrderId, Amount, Currency, Rate, 
(Amount * Rate) as UsdAmount");

     String[] fields = usdOrders.getSchema().getFieldNames();
     TypeInformation<?>[] types = usdOrders.getSchema().getFieldTypes();
     DataStream<Row> usdStream = tableEnv.toAppendStream(usdOrders, 
usdOrders.getSchema().toRowType());
     CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", 
",", 1, FileSystem.WriteMode.OVERWRITE);
     tableEnv.registerTableSink("csvSink", fields, types, csvTableSink);
     usdOrders.insertInto("csvSink");
     usdStream.addSink(new PrintSink());
     env.execute();
     System.out.println("Test completed at " + time());
   }

   public static String time() {
     return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
   }

   private static class DelayedSource<T> extends RichSourceFunction<T> {
     private final List<T> data;
     private final long initialDelay;
     private volatile boolean shutdown;

     private DelayedSource(List<T> data, long initialDelay) {
       this.data = data;
       this.initialDelay = initialDelay;
     }

     @Override
     public void run(SourceContext<T> ctx) throws Exception {
       Iterator<T> iterator = data.iterator();
       Thread.sleep(initialDelay);
       while (!shutdown && iterator.hasNext()) {
         T next = iterator.next();
         System.out.println(time() + " - producing " + next);
         ctx.collect(next);
       }
     }

     @Override
     public void cancel() {
       shutdown = true;
     }
   }

   private static class PrintSink extends RichSinkFunction<Row> {
     @Override
     public void invoke(Row value, Context context) {
       Integer orderId = (Integer) value.getField(0);
       Double amount = (Double) value.getField(1);
       String currency = (String) value.getField(2);
       Double rate = (Double) value.getField(3);
       Double usdAmount = (Double) value.getField(4);
       System.out.println(time() + " - order " + orderId + " was for " + 
usdAmount + " USD (" + amount + ' ' + currency + " @ " + rate + ')');
     }
   }
}
Mime
View raw message