flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From uce <...@git.apache.org>
Subject [GitHub] flink pull request #2069: [FLINK-3872] [table, connector-kafka] Add KafkaJso...
Date Fri, 10 Jun 2016 12:40:39 GMT
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66606216
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
    @@ -735,6 +738,123 @@ public void flatMap(Tuple3<Integer, Integer, String> value,
Collector<Integer> o
     		}
     	}
     
    +	/**
    +	 * Runs a table source test with JSON data.
    +	 *
    +	 * The table source needs to parse the following JSON fields:
    +	 * - "long" -> number
    +	 * - "string" -> "string"
    +	 * - "boolean" -> true|false
    +	 * - "double" -> fraction
    +	 */
    +	public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws
Exception {
    +		final ObjectMapper mapper = new ObjectMapper();
    +
    +		final int numElements = 1024;
    +		final long[] longs = new long[numElements];
    +		final String[] strings = new String[numElements];
    +		final boolean[] booleans = new boolean[numElements];
    +		final double[] doubles = new double[numElements];
    +
    +		final byte[][] serializedJson = new byte[numElements][];
    +
    +		ThreadLocalRandom random = ThreadLocalRandom.current();
    +
    +		for (int i = 0; i < numElements; i++) {
    +			longs[i] = random.nextLong();
    +			strings[i] = Integer.toHexString(random.nextInt());
    +			booleans[i] = random.nextBoolean();
    +			doubles[i] = random.nextDouble();
    +
    +			ObjectNode entry = mapper.createObjectNode();
    +			entry.put("long", longs[i]);
    +			entry.put("string", strings[i]);
    +			entry.put("boolean", booleans[i]);
    +			entry.put("double", doubles[i]);
    +
    +			serializedJson[i] = mapper.writeValueAsBytes(entry);
    +		}
    +
    +		// Produce serialized JSON data
    +		createTestTopic(topic, 1, 1);
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment
    +				.createRemoteEnvironment("localhost", flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +
    +		env.addSource(new SourceFunction<byte[]>() {
    +			@Override
    +			public void run(SourceContext<byte[]> ctx) throws Exception {
    +				for (int i = 0; i < numElements; i++) {
    +					ctx.collect(serializedJson[i]);
    +				}
    +			}
    +
    +			@Override
    +			public void cancel() {
    +			}
    +		}).addSink(kafkaServer.getProducer(
    +				topic,
    +				new ByteArraySerializationSchema(),
    +				standardProps,
    +				null));
    +
    +		// Execute blocks
    +		env.execute();
    +
    +		// Register as table source
    +		StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
    +		tableEnvironment.registerTableSource("kafka", kafkaTableSource);
    +
    +		Table result = tableEnvironment.ingest("kafka");
    +
    +		tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>()
{
    +
    +			int i = 0;
    +
    +			@Override
    +			public void invoke(Row value) throws Exception {
    +				if (i > numElements) {
    +					throw new IllegalStateException("Received too many rows.");
    +				}
    +
    +				assertEquals(longs[i], value.productElement(0));
    +				assertEquals(strings[i], value.productElement(1));
    +				assertEquals(booleans[i], value.productElement(2));
    +				assertEquals(doubles[i], value.productElement(3));
    +
    +				if (i == numElements-1) {
    +					throw new SuccessException();
    --- End diff --
    
    Yes, the first check can be removed. I think it's fine to just have the exact check in
place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message