From user-return-35869-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Jun 17 17:47:07 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id C4B01180608 for ; Wed, 17 Jun 2020 19:47:06 +0200 (CEST) Received: (qmail 84694 invoked by uid 500); 17 Jun 2020 17:47:04 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 84684 invoked by uid 99); 17 Jun 2020 17:47:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Jun 2020 17:47:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 18C091814B6 for ; Wed, 17 Jun 2020 17:47:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.052 X-Spam-Level: X-Spam-Status: No, score=0.052 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=0.001, RCVD_IN_MSPIKE_WL=0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id Pw_2Msk_2-9i for ; Wed, 17 Jun 2020 17:47:02 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.210.196; helo=mail-pf1-f196.google.com; envelope-from=jehenrik27@gmail.com; receiver= Received: from mail-pf1-f196.google.com (mail-pf1-f196.google.com [209.85.210.196]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 762C7BB8F5 for ; Wed, 17 Jun 2020 17:47:02 +0000 (UTC) Received: by mail-pf1-f196.google.com with SMTP id x22so1500048pfn.3 for ; Wed, 17 Jun 2020 10:47:02 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=to:from:subject:message-id:date:user-agent:mime-version :content-language:content-transfer-encoding; bh=JGP7q7Q9hpLLfw2vwfrElaLMDGzG95Do8p/oCvH/a6I=; b=u3FbEXCbTFlsVBYeemM+cuCCa4USAGWUEuSfbBPg0GQ5ARlOLT8HlqFqBfxPXMtARl vl/OrGFZI9jeGE04FSaLEsILVWcNF5+vzUHO/VDktsIJZInb4myUDcSFZ/RyfcBXrCeV L0x7RF5fjICRYNKR+9R/TBWofFdNc4ODOvc6Que2JYjyGpKDL/QvytKs80DIRVrA6Bw0 a7cLPaSS25XbLhFTLrJ9DuMNcioDeJga7BcDL1bKQ1t89kJoGDTAgA2p2b86pOlOvtnC PBA8Qlc5UBJJVede8Zzl7lUC4eiz006091oL6d1mCDgCeA7/qSGsHbvYZlBptQ9E8Q9b fhkw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:to:from:subject:message-id:date:user-agent :mime-version:content-language:content-transfer-encoding; bh=JGP7q7Q9hpLLfw2vwfrElaLMDGzG95Do8p/oCvH/a6I=; b=fCx9L8H5xVvhW8EaURNeUasP/hMVM53ntt7VJCBD7qybeRp9Txcyn4dx1K5M5e163v g+FYI4JcHmOVhNPmVOSAZU5yA4n0kKRKvXj//lKQsdS2h3Skuh2FX6f0gTKPNuX2WSUK D/WDlcmqovl8ZRXaF+cB4YBHORN8heQ2N86Wl8mj4QyZ7ejo+FXtVPy41o9Mh1kDKwrQ HAjYQAH8PZrZMURvlHDR/PwJ8BjabJUOp5z79S6ivcpxdBw3fQiG99uSxxZd0ROt68Yd M2I0wDC8cDKdKYAO83lCGU18t0BFDtSyF9bB24gmFU27iv0ETovyBkYusWD2cTPEjwjH KDNQ== X-Gm-Message-State: AOAM533UQgTdn+QXXTAXrEzATYsdP7EO+lTUN4dP2DyvekCCOa361p0O 1NLjERu26acKLcEDyDxO4uQ= X-Google-Smtp-Source: ABdhPJxHc03BHcPevysglgOFxLGZn3rt8QQ+XalFgIN9X/uzdkdN5UihGvh/W0Vxz6C/2PkFgw9bfg== X-Received: by 2002:a05:6a00:148c:: with SMTP id v12mr7644371pfu.171.1592416015668; Wed, 17 Jun 2020 10:46:55 -0700 (PDT) Received: from gilligan2.local (97-113-195-18.tukw.qwest.net. [97.113.195.18]) by smtp.gmail.com with ESMTPSA id o207sm472478pfd.56.2020.06.17.10.46.55 (version=TLS1_3 cipher=TLS_AES_128_GCM_SHA256 bits=128/128); Wed, 17 Jun 2020 10:46:55 -0700 (PDT) To: user From: Jeff Henrikson Subject: Trouble with large state Message-ID: <0078d733-b311-e985-d1e2-8b6e1dc2ec2b@gmail.com> Date: Wed, 17 Jun 2020 10:46:54 -0700 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:68.0) Gecko/20100101 Thunderbird/68.8.1 MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit Hello Flink users, I have an application of around 10 enrichment joins. All events are read from kafka and have event timestamps. The joins are built using .cogroup, with a global window, triggering on every 1 event, plus a custom evictor that drops records once a newer record for the same ID has been processed. Deletes are represented by empty events with timestamp and ID (tombstones). That way, we can drop records when business logic dictates, as opposed to when a maximum retention has been attained. The application runs RocksDBStateBackend, on Kubernetes on AWS with local SSDs. Unit tests show that the joins produce expected results. On an 8 node cluster, watermark output progress seems to indicate I should be able to bootstrap my state of around 500GB in around 1 day. I am able to save and restore savepoints for the first half an hour of run time. My current trouble is that after around 50GB of state, I stop being able to reliably take checkpoints or savepoints. Some time after that, I start getting a variety of failures where the first suspicious log event is a generic cluster connectivity error, such as: 1) java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.67.7.101:38955' has failed. This might indicate that the remote task manager has been lost. 2) org.apache.flink.runtime.io.network.netty.exception .RemoteTransportException: Connection unexpectedly closed by remote task manager 'null'. This might indicate that the remote task manager was lost. 3) Association with remote system [akka.tcp://flink@10.67.6.66:34987] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@10.67.6.66:34987]] Caused by: [java.net.NoRouteToHostException: No route to host] I don't see any obvious out of memory errors on the TaskManager UI. Adding nodes to the cluster does not seem to increase the maximum savable state size. I could enable HA, but for the time being I have been leaving it out to avoid the possibility of masking deterministic faults. Below are my configurations. Thanks in advance for any advice. Regards, Jeff Henrikson Flink version: 1.10 Configuration set via code: parallelism=8 maxParallelism=64 setStreamTimeCharacteristic(TimeCharacteristic.EventTime) setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) setTolerableCheckpointFailureNumber(1000) setMaxConcurrentCheckpoints(1) enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) RocksDBStateBackend setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED) setNumberOfTransferThreads(25) setDbStoragePath points to a local nvme SSD Configuration in flink-conf.yaml: jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 jobmanager.heap.size: 28000m taskmanager.memory.process.size: 28000m taskmanager.memory.jvm-metaspace.size: 512m taskmanager.numberOfTaskSlots: 1 parallelism.default: 1 jobmanager.execution.failover-strategy: full cluster.evenly-spread-out-slots: false taskmanager.memory.network.fraction: 0.2 # default 0.1 taskmanager.memory.framework.off-heap.size: 2GB taskmanager.memory.task.off-heap.size: 2GB taskmanager.network.memory.buffers-per-channel: 32 # default 2 taskmanager.memory.managed.fraction: 0.4 # docs say default 0.1, but something seems to set 0.4 taskmanager.memory.task.off-heap.size: 2048MB # default 128M state.backend.fs.memory-threshold: 1048576 state.backend.fs.write-buffer-size: 10240000 state.backend.local-recovery: true state.backend.rocksdb.writebuffer.size: 64MB state.backend.rocksdb.writebuffer.count: 8 state.backend.rocksdb.writebuffer.number-to-merge: 4 state.backend.rocksdb.timer-service.factory: heap state.backend.rocksdb.block.cache-size: 64000000 # default 8MB state.backend.rocksdb.write-batch-size: 16000000 # default 2MB web.checkpoints.history: 250