Return-Path: X-Original-To: apmail-hadoop-common-user-archive@www.apache.org Delivered-To: apmail-hadoop-common-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A9D1F181A4 for ; Thu, 6 Aug 2015 14:15:10 +0000 (UTC) Received: (qmail 1026 invoked by uid 500); 6 Aug 2015 14:15:04 -0000 Delivered-To: apmail-hadoop-common-user-archive@hadoop.apache.org Received: (qmail 907 invoked by uid 500); 6 Aug 2015 14:15:04 -0000 Mailing-List: contact user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hadoop.apache.org Delivered-To: mailing list user@hadoop.apache.org Received: (qmail 892 invoked by uid 99); 6 Aug 2015 14:15:04 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Aug 2015 14:15:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id B078FC027D for ; Thu, 6 Aug 2015 14:15:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.13 X-Spam-Level: *** X-Spam-Status: No, score=3.13 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id k0xucgxDB16v for ; Thu, 6 Aug 2015 14:14:54 +0000 (UTC) Received: from mail-oi0-f53.google.com (mail-oi0-f53.google.com [209.85.218.53]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 356C542B12 for ; Thu, 6 Aug 2015 14:14:54 +0000 (UTC) Received: by oiev193 with SMTP id v193so9411140oie.3 for ; Thu, 06 Aug 2015 07:14:53 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:date:message-id:subject:from:to:content-type; bh=FEwAJzfOhZNk1Ffju0iHPiMrN8Npd/Y1RFX2mMs8/9A=; b=hlQWnBSbntsa9gP3GMqnD1/iJCy0YkQnoUe7h613Hp5iZplNRwCOADlGbAPwItssdV VEUr7pzSU4MO8Z8msJIf06K3Ujxmr0VFP1rHF7U9m99nBGucN0DuwlS2zEs6KgiQ2qI3 vP6bkDgQ8fksZm1bJR3Ijs1nUqVWo0oU5bA7pcn+gv1w7dN4VtSEW15vuOHxL7W4itzz uh4/PhcPS4/kqw9e7Z4HBmWKJd8HxRpaZobV+02gzjh+5F+k4//OEZpQE/N+LZX4s8AF Nkl9Wp8nC7o9pAohqnkXFNKpLPFc+WgT2DLy40mKnXNvc7/7pvCB2l7K/tQIup+w0oIA t55A== MIME-Version: 1.0 X-Received: by 10.202.199.137 with SMTP id x131mr1697511oif.121.1438870493724; Thu, 06 Aug 2015 07:14:53 -0700 (PDT) Received: by 10.202.171.130 with HTTP; Thu, 6 Aug 2015 07:14:53 -0700 (PDT) Date: Thu, 6 Aug 2015 19:44:53 +0530 Message-ID: Subject: Reading data from FTP Server in Hadoop/Cascading From: Arshad Ali Sayed To: user@hadoop.apache.org Content-Type: multipart/alternative; boundary=001a113500c8a80675051ca52419 --001a113500c8a80675051ca52419 Content-Type: text/plain; charset=UTF-8 I want to read data from FTP Server.I am providing path of the file which resides on FTP server in the format *ftp://Username:Password@host/path. * When I use map reduce program to read data from file it works fine. I want to read data from same file through Cascading framework. I am using *Hfs tap *of cascading framework to read data*. *It throws following exception *:java.io.IOException: Stream closed* at org.apache.hadoop.fs.ftp.FTPInputStream.close(FTPInputStream.java:98) at java.io.FilterInputStream.close(Unknown Source) at org.apache.hadoop.util.LineReader.close(LineReader.java:83) at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:168) at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.close(MapTask.java:254) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:440) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212) Below is the code of cascading framework from where I am reading the files: public class FTPWithHadoopDemo { public static void main(String args[]) { Tap source = new Hfs(new TextLine(new Fields("line")), "ftp://user:pwd@xx.xx.xx.xx//input1"); Tap sink = new Hfs(new TextLine(new Fields("line1")), "OP\\op", SinkMode.REPLACE); Pipe pipe = new Pipe("First"); pipe = new Each(pipe, new RegexSplitGenerator("\\s+")); pipe = new GroupBy(pipe); Pipe tailpipe = new Every(pipe, new Count()); FlowDef flowDef = FlowDef.flowDef().addSource(pipe, source).addTailSink(tailpipe, sink); new HadoopFlowConnector().connect(flowDef).complete(); } } I tried to look in Hadoop Source code for the same exception. I found that in the MapTask class there is one method runOldMapper which deals with stream. And in the same method there is finally block where stream gets closed. When I remove that line from code it works fine. Below is the code: private void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, InterruptedException, ClassNotFoundException { InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); updateJobWithSplit(job, inputSplit); reporter.setInputSplit(inputSplit); RecordReader in = isSkipping() ? new SkippingRecordReader(inputSplit, umbilical, reporter) : new TrackedRecordReader(inputSplit, job, reporter); job.setBoolean("mapred.skip.on", isSkipping()); int numReduceTasks = conf.getNumReduceTasks(); LOG.info("numReduceTasks: " + numReduceTasks); MapOutputCollector collector = null; if (numReduceTasks > 0) { collector = new MapOutputBuffer(umbilical, job, reporter); } else { collector = new DirectMapOutputCollector(umbilical, job, reporter); } MapRunnable runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { runner.run(in, new OldOutputCollector(collector, conf), reporter); collector.flush(); } finally { // close in.close(); // close input collector.close(); } } I have asked the same question on cascading-user group and they replied "*HFS supports whatever Hadoop supports, so if you supply a URI in the format ftp://, it should do the right thing.*" But still I am getting this exceptions. please assist me in solving this problem. Thanks, Arshadali --001a113500c8a80675051ca52419 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I want to read data from FTP Server.I am pr= oviding path of the file which resides on FTP server in the format ftp:/= /Username:Password@host/path.
When I use map reduce program to read= data from file=C2=A0 it works fine. I want to read data from same file thr= ough Cascading framework. I am using Hfs tap of cascading framework = to read data. It throws following exception:

java.io.IOExc= eption: Stream closed

=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.fs.ftp= .FTPInputStream.close(FTPInputStream.java:98)
=C2=A0=C2=A0=C2=A0 at java= .io.FilterInputStream.close(Unknown Source)
=C2=A0=C2=A0=C2=A0 at org.ap= ache.hadoop.util.LineReader.close(LineReader.java:83)
=C2=A0=C2=A0=C2=A0= at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:1= 68)
=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.mapred.MapTask$TrackedRecord= Reader.close(MapTask.java:254)
=C2=A0=C2=A0=C2=A0 at org.apache.hadoop.m= apred.MapTask.runOldMapper(MapTask.java:440)
=C2=A0=C2=A0=C2=A0 at org.a= pache.hadoop.mapred.MapTask.run(MapTask.java:372)
=C2=A0=C2=A0=C2=A0 at = org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)=C2=A0
Below is the code of cascading framework from where I= am reading the files:
public class FTPW= ithHadoopDemo {
=C2=A0=C2=A0=C2=A0 public static void main(String args[]= ) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 Tap source =3D new Hfs(new Tex= tLine(new Fields("line")), "ftp://user:pwd@xx.xx.xx.xx//inpu= t1");
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 Tap sink =3D new Hfs(ne= w TextLine(new Fields("line1")), "OP\\op", SinkMode.REP= LACE);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 Pipe pipe =3D new Pipe(&quo= t;First");
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 pipe =3D new Each(= pipe, new RegexSplitGenerator("\\s+"));
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 pipe =3D new GroupBy(pipe);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 Pipe tailpipe =3D new Every(pipe, new Count());
=C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 FlowDef flowDef =3D FlowDef.flowDef().addSource(p= ipe, source).addTailSink(tailpipe, sink);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 new HadoopFlowConnector().connect(flowDef).complete();
=C2=A0= =C2=A0=C2=A0 }
}

I tried to look in Hadoop Sourc= e code for the same exception. I found that in the MapTask class there is o= ne method runOldMapper which deals with stream. And in the same method ther= e is finally block where stream gets closed. When I remove that line from c= ode it works fine. Below is the code:
private <INKEY, INVALUE, OUTKEY= , OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex sp= litIndex,
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 final= TaskUmbilicalProtocol umbilical, TaskReporter reporter)
=C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 throws IOException, InterruptedException, ClassNotFoundException {=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 InputSplit inputSplit =3D getSplitD= etails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()= );

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 updateJobWithSplit(job, inp= utSplit);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 reporter.setInputSplit(i= nputSplit);

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 RecordReader<IN= KEY, INVALUE> in =3D isSkipping()
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 ? new SkippingRecordReader<INK= EY, INVALUE>(inputSplit, umbilical, reporter)
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 : new TrackedRecordRe= ader<INKEY, INVALUE>(inputSplit, job, reporter);
=C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 job.setBoolean("mapred.skip.on", isSkippin= g());

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 int numReduceTasks =3D c= onf.getNumReduceTasks();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 LOG.info(= "numReduceTasks: " + numReduceTasks);
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 MapOutputCollector collector =3D null;
=C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 if (numReduceTasks > 0) {
=C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 collector =3D new MapOutputBuffer(umb= ilical, job, reporter);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 } else {=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 collector =3D ne= w DirectMapOutputCollector(umbilical, job, reporter);
=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 MapRunnable&= lt;INKEY, INVALUE, OUTKEY, OUTVALUE> runner =3D ReflectionUtils.newInsta= nce(job.getMapRunnerClass(),
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 job);

=C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 try {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 runner.run(in, new OldOutputCollector(collector, conf), reporter);
= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 collector.flush();=
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 } finally {
=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // close
=C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 in.cl= ose(); // close input
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2= =A0=C2=A0=C2=A0 collector.close();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0= }
=C2=A0=C2=A0=C2=A0 }

I have asked the same question= on cascading-user group and they replied "HFS supports whatever Ha= doop supports, so if you supply a URI in the format ftp://, it should do th= e right thing." But still I am getting this exceptions.
p= lease assist me in solving this problem.


Thanks,
= Arshadali
--001a113500c8a80675051ca52419--