From dev-return-110512-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Jan 15 20:05:36 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id B7EA218065E for ; Wed, 15 Jan 2020 21:05:35 +0100 (CET) Received: (qmail 36578 invoked by uid 500); 15 Jan 2020 20:05:34 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 36566 invoked by uid 99); 15 Jan 2020 20:05:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jan 2020 20:05:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id CC31AC00CF for ; Wed, 15 Jan 2020 20:05:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0 X-Spam-Level: X-Spam-Status: No, score=0 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=confluent.io Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id mYE9NXlwuNVK for ; Wed, 15 Jan 2020 20:05:29 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.219.179; helo=mail-yb1-f179.google.com; envelope-from=konstantine@confluent.io; receiver= Received: from mail-yb1-f179.google.com (mail-yb1-f179.google.com [209.85.219.179]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 9AB58BC535 for ; Wed, 15 Jan 2020 20:05:29 +0000 (UTC) Received: by mail-yb1-f179.google.com with SMTP id k15so3482930ybd.10 for ; Wed, 15 Jan 2020 12:05:29 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=confluent.io; s=google; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=ik7doD8tGeZ6V2CLk9DipdBysCnxIBw8pibj+mposqg=; b=SH43DgXQoRKCrs1dJLv3fAZMQsBYPUEPJf+cR9pFipQmJfbKbYV6X14tOt524wnWU+ 2FHJRCFrKE1j7pNgoPiTd5kI+SNW7DeokZQuLpT9zO0X58r0OE525okKrpC9RT7+Bt7S gXlJNfCy6ozlIF4KAX8DjSURvjCFv83jJgHOk0qKUzQw1A1D3ZNYNaTCHl/qJ6foBy1H saFjgRhTlFKYB6xROmlOW6T0kAmt0ddFQ+Fz1ABxrp9KHZjXRnMJniQ0PKMYdFTpO8Ez Ar5PzacFDVWwqk47nkjl3ortrafjd7F15vAvk6nLRbAtkrgBiRxrnvLa2RNCpEwJF0FZ qy1w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=ik7doD8tGeZ6V2CLk9DipdBysCnxIBw8pibj+mposqg=; b=j/ApuKRoPUThxtl6bdtk7LmBMWhUisVyjQxQktn7mrTwPe9fcrWA56nouZrQTeWiPi 32UjGlMIKe5uZk5YKYkbOexuQy1xdeAs0VwF//aSAuxHmGIR8QwQK4kJPaFeyHBrtuqk 4gImmOtE6Bvgi7ZQWcuf4UbI3Jgi/zhg9r8SoWe/X5ZcN3tKjFxRTtODYb96dkn2YJRx zuagsxPxZNv1KN6cA56IHswDT9WJlXGX5ThABjo8OB0n075CZX1EV2mILGu5VwIwtJVh 749BXNXjww88r84/mY64UapSSwT0S7eymXVV+/CCUaxl4OH0OYE4hCj20Tyxgnr3gpIY guqg== X-Gm-Message-State: APjAAAWQMWf6gvKlhz30cwQpLT3vCYvV+3t3N0QteRFtjUFXrMhxMISm eggqo3A7XBnxq9CIuuOMWjowPmUv1mpTFL8kAgm51P7gCqs= X-Google-Smtp-Source: APXvYqzpstij7lUNfCq8A/34QtaiRI+8atVqybnyaLMNASKpuHftMnQFc6vnLJ1zDwtCvzg6AEaa4CxpVoFwu8GK0iM= X-Received: by 2002:a25:d410:: with SMTP id m16mr17885956ybf.256.1579118728358; Wed, 15 Jan 2020 12:05:28 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: Konstantine Karantasis Date: Wed, 15 Jan 2020 12:05:17 -0800 Message-ID: Subject: Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="000000000000dcd3ab059c333a74" --000000000000dcd3ab059c333a74 Content-Type: text/plain; charset="UTF-8" Hi Randall, Tom and Almog. I'm excited to read your comments. I'll reply in separate emails, in order. First, to Randall's comments, I'm replying below with a reference to the comment number: 1. Although I can imagine we'd be interested in adding additional metadata in the record value, I didn't see the need for a timestamp in this first draft. Now that you mention, the way I'd interpret a timestamp in the connector status record value would be as an approximation of since when this connector has been using this topic. Happy to add this if we think this info is useful. Of course, accuracy of this information depends on message retention in Kafka and on how long the workers have been running without a restart, so this might make this approximation less useful if it gets recomputed from time to time. To your reference in "Recording active topics" I'll reply below, because that's Tom's question too. 2. I'll explain with an example, that maybe is worth adding to the KIP because what's expected to happen might not be as obvious as I thought when a new topic is recorded. Let's say we have two workers, W1 and W2, each running two worker tasks T11 T12 and T21 T22 respectively associated with a connector C1. All tasks will run producers that will produce records to the same topic, "test-topic". When the connector starts, both workers track this connector's set of active topics as empty. Given the absence of synchronization (that's good) in how this information is recorded and persisted in the status topic, all four tasks might race to record status messages: For example: T11, running at worker W1, will send Kafka records with: key: topic-test-topic-connector-C1 value: "topic": { "connector": "some-source", "task": "some-source-TT11", "name": "test-topic" } and T22, running at worker W2, will send Kafka records with: key: topic-test-topic-connector-C1 value: "topic": { "connector": "some-source", "task": "some-source-TT22", "name": "test-topic" } (similarly tasks T12 and T21 might send topic status records). These four records (they might not even be four but there's going to be at least one) may be written in any order. Because the topic is compacted and these records have the same key, eventually only one message will be retained. The task ID of that message will be the ID of the task that wrote last. I can see this being used mostly for troubleshooting. 3. I believe across the whole KIP, when I'm referring to the task entity, I imply the worker task. Not the user code that is running as implementation of the SourceTask or SinkTask abstract classes. Didn't want to increase complexity by referring to a task as worker task. But I see your point and I'm going to prefer the terms "worker" and "worker task" to highlight that it's the framework that is aware of this feature and not the user code. 4. I assumed that absence of changes to the public API would indicate that these interfaces/abstract classes remain unchanged. But definitely it's worth to explicitly mention that. 5. That is correct. My intention is to make reset work well with the streaming programming model. Resetting (which btw is not mandatory) means that you are cleaning the slate for a connector that is currently running, and its currently active topics will soon be populated from scratch because new records will be produced or consumed. But resetting is not required. I see it more like a useful operation, in case users want to clean the active topics history, without having to delete a connector, since delete has further implications in the connector's progress tracking. 6. I fixed the typo - thanks! I'm very much in favor of preserving symmetry between the two connector types. This has definitely more long term benefits and may help to avoid confusion. However, the asymmetry is inherited here by the asymmetry that exists today between source and sink connectors. Source connector don't list topics in their configurations but sink connectors do. So, if a user reconfigures a sink connector with a different set of topics, if we don't reset the topics based on the new configs (and my thought here was to match the new configuration with the set of active topics), the old topics, currently not listed in the connectors configuration, will keep showing up as active topics. The user will have to explicitly reset the active topics after reconfiguring to avoid this. If there's consensus that preserving this asymmetry is worse than having to reset the active topics, I'm happy to change this in the KIP. 7. What I try to avoid here is the following situation: For some reason (a sequence of failures to write tombstones to the status topic), stale topic status records remain in that topic even after a connector has been deleted. Requiring to restart a connector with the same name just to apply a follow up reset of active topics doesn't seem necessary. I like the idea of decoupling connector existence from the maintenance of the status topic. Of course, a similar clean up is something that the workers could also perform, but to avoid complexity and potential race conditions, I'm leaving this out for the moment (it's also an implementation detail). 8. Indeed, a security section is warranted. I believe the main implication is that if you are able to query a connector's status, config, etc you will be able to also see its active topics. Furthermore, if you are allowing a worker task to create topics as well as produce or consume from topics only via connector config overrides, leaving the worker configs without permissions to these topics, meaning that you assign per connector permissions and not across the board, then this feature should respect this. The topics are still stored in common data structures within the worker and are persisted in the status topic. But this info should not be leaked to anyone who's not supposed to have access to the status topic or the Connect REST API endpoints. To this respect I feel this feature inherits the assumptions and security guarantees of similar information already stored by the Connect framework. I'm happy to add this to a security section, if we agree that the above cover the subject. 9. I assumed that partitioning is implied by default, because there's no requirement for complete ordering of topic status records. But I'll add this fact as a separate bullet. The status.storage.topic is already a partitioned topic. I'm following up with the rest of the comments, shortly. Thanks, Konstantine On Wed, Jan 15, 2020 at 9:19 AM Almog Gavra wrote: > Hi Konstantine, > > Thanks for the KIP! This is going to make automatic integration with > Connect much more powerful. > > My thoughts are mostly around freshness of the data and being able to > expose that to users. Riffing on Randall's timestamp question - have we > considered adding some interval at which point a connector will republish > any topics that it encounters and update the timestamp? That way we have > some refreshing mechanism that isn't as powerful as the complete reset > (which may not be practical in many scenarios). > > I also agree with Randall's other point (Would it be better to not > automatically reset connector's active topics when a sink connector is > restarted?). I think keeping the behavior as symmetrical between sink and > source connectors is a good idea. > > Lastly, with regards to the API, I can imagine it is also pretty useful to > answer the inverse question: "which connectors write to topic X". Perhaps > we can achieve this by letting the users compute it and just expose an API > that returns the entire mapping at once (instead of needing to call the > /connectors/{name}/topics endpoint for each connector). > > Otherwise, looks good to me! Hits the requirements that I had in mind on > the nose. > - Almog > > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley wrote: > > > Hi Konstantine, > > > > Thanks for the KIP, I can see how it could be useful. > > > > a) Did you consider using a metric for this? I don't think it would > satisfy > > all the use cases you have in mind, but you could mention it in the > > rejected alternatives. > > > > b) If the topic name contains the string "-connector" then the key format > > is ambiguous. This isn't necessarily fatal because the value will > > disambiguate, but it could be misleading. Any reason not to just use a > JSON > > key, and simplify the value? > > > > c) I didn't understand this part: "As soon as a worker detects the > addition > > of a topic to a connector's set of active topics, the worker will cease > to > > post update messages to the status.storage.topic for that connector. ". > I'm > > sure I've overlooking something but why is this necessary? Is this were > the > > task id in the value is used? > > > > Thanks again, > > > > Tom > > > > On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch wrote: > > > > > Oh, one more thing: > > > > > > 9. There's no mention of how the status topic is partitioned, or how > > > partitioning will be used by the new topic records. The KIP should > > probably > > > outline this for clarity and completeness. > > > > > > Best regards, > > > > > > Randall > > > > > > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch > wrote: > > > > > > > Thanks, Konstantine. Overall, this KIP looks interesting and really > > > > useful, and for the most part is spot on. I do have a number of > > > > questions/comments about specifics: > > > > > > > > 1. The topic records have a value that includes the connector > name, > > > > task number that last reported the topic is used, and the topic > > name. > > > > There's no mention of record timestamps, but I wonder if it'd be > > > useful to > > > > record this. One challenge might be that a connector does not > write > > > to a > > > > topic for a while or the task remains running for long periods of > > > time and > > > > therefore the worker doesn't record that this topic has been newly > > > written > > > > to since it the task was restarted. IOW, the semantics of the > > > timestamp may > > > > be a bit murky. Have you thought about recording the timestamp, > and > > > if so > > > > what are the pros and cons? > > > > - The "Recording active topics" section says the following: > > > > "As soon as a worker detects the addition of a topic to a > > > > connector's set of active topics, all the connector's tasks > that > > > inspect > > > > source or sink records will cease to post update messages to > the > > > > status.storage.topic." > > > > This probably means the timestamp won't be very useful. > > > > 2. The KIP says "the Kafka record value stores the ID of the task > > that > > > > succeeded to store a topic status record last." However, this is a > > bit > > > > unclear: is it really storing the last task that successfully > wrote > > > to that > > > > topic (as this would require very frequent writes to this topic), > or > > > is it > > > > more that this is the task that was last *recorded* as having > > written > > > > to the topic? (Here, "recorded" could be a bit of a gray area, > since > > > this > > > > would depend on the how the worker periodically records this > > > information.) > > > > Any kind of clarity here might be helpful. > > > > 3. In the "Recording active topics" section (and the surrounding > > > > sections), the "task" is used ambiguously. For example, "when its > > > tasks > > > > start processing their first records ... these tasks will start > > > inspecting > > > > which is the Kafka topic of each of these records". IIUC, the > first > > > "task" > > > > mentioned is the connector's task, and the second is the worker's > > > task. Do > > > > we need to distinguish this more clearly? > > > > 4. Maybe I missed it, but does this KIP explicitly say that the > > > > Connector API is unchanged? It's probably worth pointing out to > help > > > > assuage any concerns that connector implementations have to change > > to > > > make > > > > use of this feature. > > > > 5. In the "Resetting a connector's set of active topics" section > the > > > > behavior is not exactly clear. Consider a user running connector > > "A", > > > the > > > > connector has been fully started and is processing records, and > the > > > worker > > > > has recorded topic usage records. Then the user resets the active > > > topics > > > > for connector A while the connector is still running? If the > > connector > > > > writes to no new topics, before the tasks are rebalanced then is > it > > > correct > > > > that Connect would report no active topics? And after the tasks > are > > > > rebalance, will the worker record any topics used by connector A? > > > > 6. In the "Restaring" (misspelled) section: "Reconfiguring a > source > > > > connector has also no altering effect for a source connector. > > > However, when > > > > reconfiguring a sink connector if the new configuration no longer > > > includes > > > > any of the previously tracked topics, these topics will be removed > > > from the > > > > set of active topics for this sink connector by appending > tombstone > > > > messages appropriately after the reconfiguration of the > connector." > > > Would > > > > it be better to not automatically reset connector's active topics > > > when a > > > > sink connector is restarted? Isn't that more consistent with the > > > > "Resetting" behavior and the goals at the top of the KIP: "it'd be > > > useful > > > > for users, operators and applications to know which are the topics > > > that a > > > > connector has used since it was first created"? > > > > 7. The `PUT /connectors/{name}/topics/reset` endpoint "this > request > > > > can be reapplied after the deletion of the connector". IOW, even > > > though > > > > connector with that name doesn't exist, we can still make this > > > request? How > > > > does this compare with other methods such as "status"? > > > > 8. What are the security implications of this proposal? > > > > > > > > As you can see, most of these can probably be addressed without much > > > work. > > > > > > > > Best regards, > > > > > > > > Randall > > > > > > > > On Mon, Jan 13, 2020 at 11:05 PM Konstantine Karantasis < > > > > konstantine@confluent.io> wrote: > > > > > > > >> Hi all. > > > >> > > > >> I just posted KIP-558: Track the set of actively used topics by > > > connectors > > > >> in Kafka Connect > > > >> > > > >> Wiki link here: > > > >> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect > > > >> > > > >> I think it's a nice extension to follow up on KIP-158 and a useful > > > feature > > > >> to the ever increasing number of applications that are built around > > > Kafka > > > >> Connect. > > > >> Would love to hear what you think. > > > >> > > > >> Best, > > > >> Konstantine > > > >> > > > > > > > > > > --000000000000dcd3ab059c333a74--