Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B2F4619B4F for ; Fri, 22 Apr 2016 20:36:41 +0000 (UTC) Received: (qmail 46257 invoked by uid 500); 22 Apr 2016 20:36:41 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 46215 invoked by uid 500); 22 Apr 2016 20:36:41 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 46206 invoked by uid 99); 22 Apr 2016 20:36:41 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Apr 2016 20:36:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 2235DC0DE6 for ; Fri, 22 Apr 2016 20:36:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id WvHLeniEUtHE for ; Fri, 22 Apr 2016 20:36:39 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 4B3C95F56D for ; Fri, 22 Apr 2016 20:36:38 +0000 (UTC) Received: (qmail 45774 invoked by uid 99); 22 Apr 2016 20:36:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Apr 2016 20:36:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5B9EBDFCDF; Fri, 22 Apr 2016 20:36:37 +0000 (UTC) From: rmetzger To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ... Content-Type: text/plain Message-Id: <20160422203637.5B9EBDFCDF@git1-us-west.apache.org> Date: Fri, 22 Apr 2016 20:36:37 +0000 (UTC) Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60799474 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A Kinesis Data Fetcher that consumes data from a specific set of Kinesis shards. + * The fetcher spawns a single thread for connection to each shard. + */ +public class KinesisDataFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + /** Config properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The AWS credentials provider as specified in config properties */ + private final AWSCredentialsProvider credentials; + + /** The name of the consumer task that this fetcher was instantiated */ + private final String taskName; + + /** Information of the shards that this fetcher handles, along with the sequence numbers that they should start from */ + private HashMap assignedShardsWithStartingSequenceNum; + + /** Reference to the thread that executed run() */ + private volatile Thread mainThread; + + /** Reference to the first error thrown by any of the spawned shard connection threads */ + private final AtomicReference error; + + private volatile boolean running = true; + + /** + * Creates a new Kinesis Data Fetcher for the specified set of shards + * + * @param assignedShards the shards that this fetcher will pull data from + * @param configProps the configuration properties of this Flink Kinesis Consumer + * @param taskName the task name of this consumer task + */ + public KinesisDataFetcher(List assignedShards, Properties configProps, String taskName) { + this.configProps = checkNotNull(configProps); + this.credentials = AWSUtil.getCredentialsProvider(configProps); + this.assignedShardsWithStartingSequenceNum = new HashMap<>(); + for (KinesisStreamShard shard : assignedShards) { + assignedShardsWithStartingSequenceNum.put(shard, SentinelSequenceNumber.SENTINEL_SEQUENCE_NUMBER_NOT_SET.toString()); + } + this.taskName = taskName; + this.error = new AtomicReference<>(); + } + + /** + * Advance a shard's starting sequence number to a specified value + * + * @param streamShard the shard to perform the advance on + * @param sequenceNum the sequence number to advance to + */ + public void advanceSequenceNumberTo(KinesisStreamShard streamShard, String sequenceNum) { + if (!assignedShardsWithStartingSequenceNum.containsKey(streamShard)) { + throw new IllegalArgumentException("Can't advance sequence number on a shard we are not going to read."); + } + assignedShardsWithStartingSequenceNum.put(streamShard, sequenceNum); + } + + public void run(SourceFunction.SourceContext sourceContext, + KinesisDeserializationSchema deserializationSchema, + HashMap lastSequenceNums) throws Exception { + + if (assignedShardsWithStartingSequenceNum == null || assignedShardsWithStartingSequenceNum.size() == 0) { + throw new IllegalArgumentException("No shards set to read for this fetcher"); + } + + this.mainThread = Thread.currentThread(); + + LOG.info("Reading from shards " + assignedShardsWithStartingSequenceNum); + + // create a thread for each individual shard + ArrayList> consumerThreads = new ArrayList<>(assignedShardsWithStartingSequenceNum.size()); + for (Map.Entry assignedShard : assignedShardsWithStartingSequenceNum.entrySet()) { + ShardConsumerThread thread = new ShardConsumerThread<>(this, configProps, assignedShard.getKey(), + assignedShard.getValue(), sourceContext, deserializationSchema, lastSequenceNums); + thread.setName(String.format("ShardConsumer - %s - %s/%s", + taskName, assignedShard.getKey().getStreamName() ,assignedShard.getKey().getShardId())); + thread.setDaemon(true); + consumerThreads.add(thread); + } + + // check that we are viable for running for the last time before starting threads + if (!running) { + return; + } + + for (ShardConsumerThread shardConsumer : consumerThreads) { + LOG.info("Starting thread {}", shardConsumer.getName()); + shardConsumer.start(); + } + + // wait until all consumer threads are done, or until the fetcher is aborted, or until + // an error occurred in one of the consumer threads + try { + boolean consumersStillRunning = true; + while (running && error.get() == null && consumersStillRunning) { + try { + // wait for the consumer threads. if an error occurs, we are interrupted + for (ShardConsumerThread consumerThread : consumerThreads) { + consumerThread.join(); + } + + // check if there are consumer threads still running + consumersStillRunning = false; + for (ShardConsumerThread consumerThread : consumerThreads) { + consumersStillRunning = consumersStillRunning | consumerThread.isAlive(); + } + } catch (InterruptedException e) { + // ignore + } + } + + // make sure any asynchronous error is noticed + Throwable error = this.error.get(); + if (error != null) { + throw new Exception(error.getMessage(), error); + } + } finally { + for (ShardConsumerThread consumerThread : consumerThreads) { + if (consumerThread.isAlive()) { + consumerThread.cancel(); + } + } + } + } + + public void close() throws IOException { + this.running = false; + } + + public void stopWithError(Throwable throwable) { + if (this.error.compareAndSet(null, throwable)) { + if (mainThread != null) { + mainThread.interrupt(); + } + } + } + + /** + * + * + * @param + */ + private static class ShardConsumerThread extends Thread { + + private final SourceFunction.SourceContext sourceContext; + private final KinesisDeserializationSchema deserializer; + private final HashMap seqNoState; + + private final KinesisProxy kinesisProxy; + + private final KinesisDataFetcher ownerRef; + + private final KinesisStreamShard assignedShard; + + private String lastSequenceNum; + private String nextShardItr; + + private volatile boolean running = true; + + public ShardConsumerThread(KinesisDataFetcher ownerRef, + Properties props, + KinesisStreamShard assignedShard, + String lastSequenceNum, + SourceFunction.SourceContext sourceContext, + KinesisDeserializationSchema deserializer, + HashMap seqNumState) { + this.ownerRef = checkNotNull(ownerRef); + this.assignedShard = checkNotNull(assignedShard); + this.lastSequenceNum = checkNotNull(lastSequenceNum); + this.sourceContext = checkNotNull(sourceContext); + this.deserializer = checkNotNull(deserializer); + this.seqNoState = checkNotNull(seqNumState); + this.kinesisProxy = new KinesisProxy(props); + } + + @Override + public void run() { + try { + if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.toString())) { + // if the shard is already closed, there will be no latest next record to get for this shard + if (assignedShard.isClosed()) { + nextShardItr = null; + } else { + nextShardItr = kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.LATEST.toString(), null); + } + } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.toString())) { + nextShardItr = kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); + } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.toString())) { + nextShardItr = null; + } else { + nextShardItr = kinesisProxy.getShardIterator(assignedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum); + } + + long lastNextShardItrUpdateMillis = System.currentTimeMillis(); + boolean noRecordsOnLastFetch = false; + while(running) { + if (nextShardItr == null) { + lastSequenceNum = SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.toString(); + + synchronized (sourceContext.getCheckpointLock()) { + seqNoState.put(assignedShard, lastSequenceNum); + } + + break; + } else { + if (noRecordsOnLastFetch) { + if (System.currentTimeMillis() - lastNextShardItrUpdateMillis >= 290000) { --- End diff -- Okay, sounds good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---