flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kant kodali <kanth...@gmail.com>
Subject java.util.concurrent.ExecutionException
Date Tue, 03 Mar 2020 12:08:11 GMT
Hi All,

I am just trying to read edges which has the following format in Kafka

1,2
1,3
1,5

using the Table API and then converting to DataStream of Edge Objects and
printing them. However I am getting java.util.concurrent.ExecutionException but
not sure why?

Here is the sample code

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.graph.Edge;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.Row;

import java.util.UUID;

public class Test {

    public static void main(String... args) throws Exception {

        EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend((StateBackend) new
RocksDBStateBackend("file:///tmp/rocksdb"));

        StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(env, bsSettings);

        bsTableEnv.connect(
            new Kafka()
                .property("bootstrap.servers", "localhost:9092")
                .property("zookeeper.connect", "localhost:2181")
                .property("group.id", UUID.randomUUID().toString())
                .startFromEarliest()
                .version("universal")
                .topic("edges")
        )
        .withFormat(new Csv().fieldDelimiter(','))
        .withSchema(
            new Schema()
                .field("source", DataTypes.BIGINT())
                .field("target", DataTypes.BIGINT())
        )
        .createTemporaryTable("kafka_source");

        Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from
kafka_source");

        TypeInformation<Edge<Long, NullValue>> edgeTypeInformation =
TypeInformation.of(new TypeHint<Edge<Long, NullValue>>() {
            @Override
            public TypeInformation<Edge<Long, NullValue>> getTypeInfo() {
                return super.getTypeInfo();
            }
        });

        DataStream<Edge<Long, NullValue>> edges =
bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
                .map(row -> new Edge<>((Long) row.getField(0), (Long)
row.getField(1), NullValue.getInstance()))
                .returns(edgeTypeInformation);

        edges.print();

        bsTableEnv.execute("sample job");
    }
}

Mime
View raw message