From user-return-25817-archive-asf-public=cust-asf.ponee.io@flink.apache.org Sun Feb 10 17:09:36 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9A2F518064C for ; Sun, 10 Feb 2019 18:09:35 +0100 (CET) Received: (qmail 17727 invoked by uid 500); 10 Feb 2019 17:09:29 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 17717 invoked by uid 99); 10 Feb 2019 17:09:29 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 Feb 2019 17:09:29 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 1C9DF182442 for ; Sun, 10 Feb 2019 17:09:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.797 X-Spam-Level: * X-Spam-Status: No, score=1.797 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id Cz6MJHozj38N for ; Sun, 10 Feb 2019 17:09:27 +0000 (UTC) Received: from mail-ot1-f66.google.com (mail-ot1-f66.google.com [209.85.210.66]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 629FB5F332 for ; Sun, 10 Feb 2019 17:09:27 +0000 (UTC) Received: by mail-ot1-f66.google.com with SMTP id z19so9772639otm.2 for ; Sun, 10 Feb 2019 09:09:27 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=dW781oLm/IuKGeci2LMqCvIW0YpbR6V3jyxIJH4gajs=; b=na5SYkymv0F4wg+jSAC97Eiy+EBQEF6vTDBHbjFjJ8DgbGxT1zWHrGhWuZpJw1zYZk zusIRAC2xFawtcuCTK+UVtmXqd+12ztggadNiznWJyTBpzEmzanYWF6WjYjoNhpePLV1 UenFx87QOppEz2pHIA94OFGkjG+n+wpILUc4mSaj/Qtr2JcLowbDMwxo9jsGq9yOtpTV Jownesd/mtNwV+EzTirvvOfFvcm/V7ll5lVl+yrF7VRWBRvRSjvSKNVDQqIbSMI1E1pu Vse5Qww6Ya8SkTcYNsmnUOodkCLh29AzcgJRFpMWHJrBd6RWfMNLeNrmbfIzkO/CX1og F62w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=dW781oLm/IuKGeci2LMqCvIW0YpbR6V3jyxIJH4gajs=; b=ZfgTQopiuoOzcLJlZle/xfWMq1oiqL0fgYyzLhu6+/kZLv4gSVR6c+OWlECKUVER7k N9rnhpqBP8JYllylG/mBqG0XDFAHXJjB7HcpBdq8QMwlJg9Mi1MlwghiVtpJGA3HqDXl lWK6IBwZvM8B8o4UpPIPkX7lwx/vrTMn/T9+TzUpom3OykYtA4I1jdojcItn7NVF369j 1bgdfQ4Wat9dgmrXDmpod3DS8G5tqyAEEDwZbr5KbbqA+421dZO2CJ9chvJcBbLhrHIy 6M+/lcleoGoa4WxZEySMb7H9ui1ga7IGmgDReMpTXawf2mbY5Gv83HKGDJG8TSo7zmmd NqhA== X-Gm-Message-State: AHQUAub95uEuXu7FyoGhIxQ2aqKF3E8g12CEnJr/6Pmiupl68wrA2Zv8 r637uhpcPF52gQBXAGPLZOFRYvj6G37Oql2/f6U= X-Google-Smtp-Source: AHgI3IZRq/96finUVABgKKsY0cEuI0CPxM69J/C7qlsGHI8iqdP2BlGYX2durGoBtnliSobbrSUg8SO6qqbqPt5/38I= X-Received: by 2002:a9d:6301:: with SMTP id q1mr15107225otk.9.1549818566580; Sun, 10 Feb 2019 09:09:26 -0800 (PST) MIME-Version: 1.0 References: In-Reply-To: From: Timothy Victor Date: Sun, 10 Feb 2019 11:09:15 -0600 Message-ID: Subject: Re: fllink 1.7.1 and RollingFileSink To: Vishal Santoshi Cc: user Content-Type: multipart/alternative; boundary="00000000000020a03b05818d41e9" --00000000000020a03b05818d41e9 Content-Type: text/plain; charset="UTF-8" I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once. Tim On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could. > > > This code for example > > > StreamingFileSink > .forRowFormat(new Path(PATH), > new SimpleStringEncoder()) > .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID)) > .withRollingPolicy(new RollingPolicy() { > @Override > public boolean shouldRollOnCheckpoint(PartFileInfo partFileState) throws IOException { > return false; > } > > @Override > public boolean shouldRollOnEvent(PartFileInfo partFileState, > KafkaRecord element) throws IOException { > return partFileState.getSize() > 1024 * 1024 * 1024l; > } > > @Override > public boolean shouldRollOnProcessingTime(PartFileInfo partFileState, long currentTime) throws IOException { > return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l || > currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l; > } > } > ) > .build(); > > > few things I see and am not sure I follow about the new RollingFileSink vis a vis BucketingSink > > > 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink. I would assume that it would be pending and then > > finalized on checkpoint for exactly once semantics ? > > > 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ? > > 3. The inprogress files are like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? > > > > > I have the following set up on the env > > env.enableCheckpointing(10 * 60000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1))); > StateBackend stateBackEnd = new MemoryStateBackend(); > env.setStateBackend(stateBackEnd); > > > Regards. > > > > > --00000000000020a03b05818d41e9 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I think the only rolling policy that can be used is Check= pointRollingPolicy to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santoshi@gmail.com wrote:
Can StreamingFileSink be us=
ed instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/co=
nnectors/filesystem_sink.html, even though it looks it could.

This code fo= r example

StreamingFileSink
.forRowFormat(new Path(PATH),
new SimpleStringEncoder&= lt;KafkaRecord>())
.withBucketAssigner(new KafkaRecordBucketAssigner(= DE= FAULT_FORMAT_STRING, ZONE_ID))
.withRollingP= olicy(new Rollin= gPolicy<KafkaRecord, String>() {
= @Override
public boolean shou= ldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
= return false;
= }

@Override
= public boolean shouldRollOnEvent(PartFileInfo<St= ring> partFileState,
= KafkaRecord element) throws IOException {
= re= turn partFileState.getSize() > 1024 * 1024 * 1024l;
= }

@Override
= public boolean shouldRollOnProcessingTime(PartFi= leInfo<String> partFileState, long currentTime) throws IOException {
= return currentTime - partFileState.getLastUpdateTime() > 10 * 60 = * 1000l ||
= currentTime - partFileState.getCreationTime= () > 120 * 60 * 1000l= ;
}
= }
)
.build();

few things I = see and am not sure I follow about the new RollingFileSink vis a vis Bucke= tingSink

1. I do not ever see the inprogress file go to the pending state, as =
in renamed as pending, as was the case in Bucketing Sink.  I would assume t=
hat it would be pending and then 
   finalized on checkpoint for exactly once s=
emantics ? 

2. I see dangling inprogress files at the end of the day. I would =
assume that the withBucketCheckInterval set to 1 minute by default, the sho=
uldRollOnProcessingTime should kick in ?
 
3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2=
b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? 



I have=
 the following set up on the env 
env.enableCheckpointing(10 =
* 60000);
env.getCheckp= ointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
= env.setRestartStrategy(fixedDelayRestart<= /span>(4, org.apache.flink.api.co= mmon.time.Time.minutes(1)));
StateBackend stateBackEnd =3D new
MemoryStateBacken= d();
env.setStateBackend(stateBackEnd);

Regards.

 


<= /pre>
--00000000000020a03b05818d41e9--