Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 85B3A200CF1 for ; Mon, 28 Aug 2017 23:29:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 842B9165A4E; Mon, 28 Aug 2017 21:29:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A501B165A51 for ; Mon, 28 Aug 2017 23:29:56 +0200 (CEST) Received: (qmail 33850 invoked by uid 500); 28 Aug 2017 21:29:55 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 33840 invoked by uid 99); 28 Aug 2017 21:29:55 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Aug 2017 21:29:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id C0E66CAD80 for ; Mon, 28 Aug 2017 21:29:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id rec0mzM4Xh5K for ; Mon, 28 Aug 2017 21:29:53 +0000 (UTC) Received: from mail-lf0-f51.google.com (mail-lf0-f51.google.com [209.85.215.51]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 6A5365F298 for ; Mon, 28 Aug 2017 21:29:53 +0000 (UTC) Received: by mail-lf0-f51.google.com with SMTP id z12so6751138lfd.3 for ; Mon, 28 Aug 2017 14:29:53 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=EQDK/7epp626YlsmeMqubrsHiQhbr89zm2ClBHF11BU=; b=TzFP5c3h4YTK/T8aVrvdW7DOtT3X1pebCrsQvuC7WTJ2W654e3dgPYiomHVHtETxEy iqAu4VTrbxIBfG+1vyxLOF2gRz2kpQeYk58ypg+gJGgwT+x3g39tQX+AiQdzNDVpU9PE 3DfPeoBR3BN8T+D7MLcNUv2bS6nvVDEwn7FYwoZICqCOudhpimm++VgvWJ1cE80+JG7l 4Tda1P0FpNZ3M4NOo61kTouX1VTBpqP3pCUa0YOQrlMW9IoTKOpNIq4BDGrg/VhvoAiv GV3kqpKWjLi1h17y0I+UPTnw++Kb0phF+DXlQDY0cGLxafSEUjDCfCM8cXoT5QEVAfvE lfxA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=EQDK/7epp626YlsmeMqubrsHiQhbr89zm2ClBHF11BU=; b=R6lOjpOWtBdHWiyYtEy1ptSTS7HTj2h/d7uKQnRBVoQkv8OvzwZdcJpZsXhFP5f+Gf LeAmwtEv2ynLddpgUSvmaH6jbc3qAIMUqmMjJ6cWdewIki68x46onxY9a1w/pgHZa6rb bsfrgwKWQBBz6cKnbgLG6nCbY0onPZI3jW0iIb3c+STKuTALu+3NEbfSkOgzVaRITJzc cy/2oalQ+gJr0wi3BHwrpirE0bbe30556X3l/QLokoPkY60B+fYbqVzTd2S/oDiUIYNJ 5qQgY4SveyGGPq9ZMz8xHtaCe7FIz3W2PC0H6XVTmtDa8H5d2AMgZrcZVcfR8FPE2E3c fnXg== X-Gm-Message-State: AHYfb5hATkOPMhL1/5yODGUELK/XlxDxxo2tJbXG6XOPrrYqZjw5Hayo LzQjnYlg9S8wTAZuccdbqcuj1XZUEQ== X-Received: by 10.25.20.161 with SMTP id 33mr810162lfu.144.1503955791892; Mon, 28 Aug 2017 14:29:51 -0700 (PDT) MIME-Version: 1.0 From: boci Date: Mon, 28 Aug 2017 21:29:41 +0000 Message-ID: Subject: Union limit To: user@flink.apache.org Content-Type: multipart/alternative; boundary="001a113fb358bbd9fe0557d6fec3" archived-at: Mon, 28 Aug 2017 21:29:57 -0000 --001a113fb358bbd9fe0557d6fec3 Content-Type: text/plain; charset="UTF-8" Hi guys! I have one input (from mongo) and I split the incoming data to multiple datasets (each created dynamically from configuration) and before I write back the result I want to merge it to one dataset (there is some common transformation). so the flow: DataSet from Mongod => Create Mappers dynamically (currently 74) so I have 74 DataSet => Custom filter and mapping on each dataset => Union dynamically to one (every mapper result is same type) => Some another common transformation => Count the result but when I want to union more than 64 dataset I got these exception: Exception in thread "main" org.apache.flink.optimizer.CompilerException: Cannot currently handle nodes with more than 64 outputs. at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:348) at org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202) at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:268) at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(GraphCreatingVisitor.java:82) I try to split the incoming (74) list of dataset to split to 60 + 14 dataset and create an id mapper and union the result datasets but no success: val listOfDataSet: List[DataSet[...]] = .... listOfDataSet .sliding(60,60) .map(dsg => dsg.reduce((ds1,ds2) => ds1.union(ds2)),map(new IdMapper())) //There is an iterator of DataSet .reduce((dsg1,dsg2) => dsg1.union(dsg2)) // Here I got the exception .map(finalDataSet => ... some transformation ...) .count() There is any solution to solve this? Thanks b0c1 --001a113fb358bbd9fe0557d6fec3 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi guys!

I have one input (from mongo) = and I split the incoming data to multiple datasets (each created dynamicall= y from configuration) and before I write back the result I want to merge it= to one dataset (there is some common transformation).
so the flo= w:

DataSet from Mongod =3D>=C2=A0
Cre= ate Mappers dynamically=C2=A0(currently 74) so I have 74 DataSet =3D>=C2= =A0
Custom filter and mapping on each dataset =3D>=C2=A0
=
Union dynamically to one (every mapper result is same type) =3D>=C2= =A0
Some another common transformation =3D>
Count th= e result

but when I want to union more than 64 dat= aset I got these exception:

Exception in thre= ad "main" org.apache.flink.optimizer.CompilerException: Cannot cu= rrently handle nodes with more than 64 outputs.
at org.apache.flink.optimizer.dag.OptimizerNode.addO= utgoingConnection(OptimizerNode.java:348)
at org.apache.flink.optimizer.dag.SingleInputNode.setInput= (SingleInputNode.java:202)
at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisit(G= raphCreatingVisitor.java:268)
at org.apache.flink.optimizer.traversals.GraphCreatingVisitor.postVisi= t(GraphCreatingVisitor.java:82)

I try to spl= it the incoming (74) list of dataset to split to 60=C2=A0+ 14 =C2=A0dataset= and create an id mapper and union the result datasets but no success:

val listOfDataSet: List[DataSet[...]] =3D ....

listOfDataSet
.sliding(60,60)
.map(dsg=C2=A0=3D> dsg.reduce= ((ds1,ds2) =3D> ds1.union(ds2)),map(new IdMapper()))
//There i= s an iterator of DataSet
.reduce((dsg1,dsg2) =3D> dsg1.union(d= sg2)) // Here I got the exception
.map(finalDataSet =3D> ... some tra= nsformation ...)
.count()

There is any solution= to solve this?

Thanks
b0c1
--001a113fb358bbd9fe0557d6fec3--