Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 683EB18605 for ; Tue, 19 Apr 2016 12:54:26 +0000 (UTC) Received: (qmail 90694 invoked by uid 500); 19 Apr 2016 12:54:26 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 90650 invoked by uid 500); 19 Apr 2016 12:54:26 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 90641 invoked by uid 99); 19 Apr 2016 12:54:26 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Apr 2016 12:54:26 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 32A7C2C1F5A for ; Tue, 19 Apr 2016 12:54:26 +0000 (UTC) Date: Tue, 19 Apr 2016 12:54:26 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-3230) Kinesis streaming producer MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15247692#comment-15247692 ] ASF GitHub Bot commented on FLINK-3230: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60224497 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer extends RichSinkFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema schema; + + /* Optional custom partitioner */ + private KinesisPartitioner customPartitioner = null; + + + // --------------------------- Runtime fields --------------------------- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --------------------------- Initialization and configuration --------------------------- + + + /** + * Create a new FlinkKinesisProducer. + * This is a constructor supporting Flink's {@see SerializationSchema}. + * + * @param region AWS region of the stream + * @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) + * @param secretKey Secret key of the user + * @param schema Serialization schema for the data type + */ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema schema) { + // create a simple wrapper for the serialization schema + this(region, accessKey, secretKey, new KinesisSerializationSchema() { + @Override + public ByteBuffer serialize(OUT element) { + // wrap into ByteBuffer + return ByteBuffer.wrap(schema.serialize(element)); + } + // use default stream and hash key + @Override + public String getTargetStream(OUT element) { + return null; + } + }); + } + + public FlinkKinesisProducer(String region, String accessKey, String secretKey, KinesisSerializationSchema schema) { + this.region = Objects.requireNonNull(region); + this.accessKey = Objects.requireNonNull(accessKey); + this.secretKey = Objects.requireNonNull(secretKey); + ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema)); + this.schema = schema; + } + + /** + * If set to true, the producer will immediately fail with an exception on any error. + * Otherwise, the errors are logged and the producer goes on. + * + * @param failOnError Error behavior flag + */ + public void setFailOnError(boolean failOnError) { + this.failOnError = failOnError; + } + + /** + * Set a default stream name. + * @param defaultStream Name of the default Kinesis stream + */ + public void setDefaultStream(String defaultStream) { + this.defaultStream = defaultStream; + } + + /** + * Set default partition id + * @param defaultPartition Name of the default partition + */ + public void setDefaultPartition(String defaultPartition) { + this.defaultPartition = defaultPartition; + } + + public void setCustomPartitioner(KinesisPartitioner partitioner) { + Objects.requireNonNull(partitioner); + ClosureCleaner.ensureSerializable(partitioner); + this.customPartitioner = partitioner; + } + + + // --------------------------- Lifecycle methods --------------------------- + + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + KinesisProducerConfiguration config = new KinesisProducerConfiguration(); + config.setRegion(this.region); + config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + producer = new KinesisProducer(config); + callback = new FutureCallback() { + @Override + public void onSuccess(UserRecordResult result) { + if(!result.isSuccessful()) { + if(failOnError) { + thrownException = new RuntimeException("Record was not sent successful"); + } else { + LOG.warn("Record was not sent successful"); + } + } + } + + @Override + public void onFailure(Throwable t) { + if(failOnError) { + thrownException = t; + } else { + LOG.warn("An exception occurred while processing a record", t); + } + } + }; + + if(this.customPartitioner != null) { + this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + + LOG.info("Started Kinesis producer instance for region '{}'", this.region); + } + + @Override + public void invoke(OUT value) throws Exception { + if(this.producer == null) { + throw new RuntimeException("Kinesis producer has been closed"); + } + if(thrownException != null) { + String errorMessages = ""; + if(thrownException instanceof UserRecordFailedException) { + List attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts(); + for(Attempt attempt: attempts) { + if(attempt.getErrorMessage() != null) { + errorMessages += attempt.getErrorMessage() +"\n"; + } + } + } + if(failOnError) { + throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException); + } else { + LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages); + thrownException = null; // reset + } + } + + String stream = defaultStream; + String partition = defaultPartition; + + ByteBuffer serialized = schema.serialize(value); + + // maybe set custom stream + String customStream = schema.getTargetStream(value); + if(customStream != null) { + stream = customStream; + } + + String explicitHashkey = null; + // maybe set custom partition + if(customPartitioner != null) { + partition = customPartitioner.getPartitionId(value); + explicitHashkey = customPartitioner.getExplicitHashKey(value); + } + + if(stream == null) { + if(failOnError) { + throw new RuntimeException("No target stream set"); + } else { + LOG.warn("No target stream set. Skipping record"); --- End diff -- Yeah, sure. I'm just saying that it is easy to shoot yourself in the foot, if you forget to set a default stream and don't use a fancy schema. Then, the user will just wonder what is happening and the log will be flooded. What do you think? For your example case, the warning is OK with the `failOnError` configuration. > Kinesis streaming producer > -------------------------- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Reporter: Tzu-Li (Gordon) Tai > Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will be using AWS SDK implementation for code consistency with the FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the result with the tradeoff of higher put latency. Ref: https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)