Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4DE44200BAE for ; Fri, 28 Oct 2016 18:38:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4C6A9160AE4; Fri, 28 Oct 2016 16:38:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 921B3160ACA for ; Fri, 28 Oct 2016 18:38:45 +0200 (CEST) Received: (qmail 56016 invoked by uid 500); 28 Oct 2016 16:38:44 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 56006 invoked by uid 99); 28 Oct 2016 16:38:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2016 16:38:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 1B4A01A9A86 for ; Fri, 28 Oct 2016 16:38:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -2.2 X-Spam-Level: X-Spam-Status: No, score=-2.2 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-2.999, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id B-vaVSFeLOGh for ; Fri, 28 Oct 2016 16:38:41 +0000 (UTC) Received: from mail.net.t-labs.tu-berlin.de (mail.net.t-labs.tu-berlin.de [130.149.220.242]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 2BAB05F54E for ; Fri, 28 Oct 2016 16:38:41 +0000 (UTC) Received: from [130.149.220.43] (pyramid.net.t-labs.tu-berlin.de [130.149.220.43]) by mail.net.t-labs.tu-berlin.de (Postfix) with ESMTPSA id 36A581D35 for ; Fri, 28 Oct 2016 18:38:34 +0200 (CEST) Reply-To: niklas@inet.tu-berlin.de To: user@flink.apache.org From: Niklas Semmler Organization: INET, TU-Berlin Subject: A custom FileInputFormat Message-ID: <4b6b2aa6-1b32-8066-3683-32dc3ddabf8b@inet.tu-berlin.de> Date: Fri, 28 Oct 2016 18:38:34 +0200 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:45.0) Gecko/20100101 Thunderbird/45.4.0 MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 7bit archived-at: Fri, 28 Oct 2016 16:38:46 -0000 Hello Flink community, I am running into an issue with a custom FileInputFormat class and would appreciate your help. My goal is to read all files from a directory as paths: val env : ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment var source : DataSet[String] = env.readFile(new DirReader, "/tmp/mydir").setParallelism(1) source.writeAsText("/tmp/results", WriteMode.OVERWRITE) env.execute("Job") It works, when I execute the program from within my IDE or execute it directly as a fat jar. When I run it through the Flink CLI the file "/tmp/results" is created, but not filled with entries. There seems to be something wrong with my custom DirReader (see below). The output of the println statements is not visible when running the code from the Flink CLI. No exception is stated in the logs (see below). I am at a loss at what to try. Even worse, when I copy the fat jar to a remote system, the problem appears also when I execute the fat jar directly. Local System Flink: 1.0.2 Java: 1.8.0_102 Scala: 2.11.8 Remote System Flink: 1.1.3 Java: 1.8.0_92 Scala: 2.11.6 Help or ideas to try out are welcome! Best, Niklas ---------------------------------------- import java.io.File import org.apache.flink.api.common.io.FileInputFormat class DirReader extends FileInputFormat[String] { var running : Boolean = false var fileList : Array[String] = null override def openInputFormat() = { println("Path: " + this.filePath.toString) val directory = new File(this.filePath.toString) if (directory != null && directory.isDirectory) { fileList = directory.listFiles.filter(_.isDirectory).map(_.listFiles).flatten .map(_.toString) running = if (fileList.length > 1) true else false } println("fileList " + fileList.length + " running " + running) } override def nextRecord(reuse: String): String = { val head = fileList.head println("File: " + head) fileList = fileList.tail running = if (fileList.length == 0) false else true head } override def reachedEnd(): Boolean = ! running } ---------------------------------------- The output from the CLI: 10/28/2016 18:27:56 Job execution switched to status RUNNING. 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to SCHEDULED 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to DEPLOYING 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to RUNNING 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to SCHEDULED 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to DEPLOYING 10/28/2016 18:27:56 DataSource (at org.apache.flink.api.scala.ExecutionEnvironment.readFile(ExecutionEnvironment.scala:385) (de.tuberlin.inet.plag.DirReader))(1/1) switched to FINISHED 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to RUNNING 10/28/2016 18:27:56 DataSink (TextOutputFormat (/tmp/results) - UTF-8)(1/1) switched to FINISHED 10/28/2016 18:27:56 Job execution switched to status FINISHED. -- Niklas Semmler PhD Student / Research Assistant TU Berlin, INET, Room MAR 4.027 Marchstr 23, 10587 Berlin Tel.: +49 (0)30 314 75739 http://inet.tu-berlin.de/~nsemmler/