Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 17C5E18A63 for ; Fri, 29 May 2015 13:03:56 +0000 (UTC) Received: (qmail 6443 invoked by uid 500); 29 May 2015 13:03:56 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 6403 invoked by uid 500); 29 May 2015 13:03:55 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 6394 invoked by uid 99); 29 May 2015 13:03:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 May 2015 13:03:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C2807E00D5; Fri, 29 May 2015 13:03:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxm@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [hotfix][scala-shell] fix location of classes according to package name Date: Fri, 29 May 2015 13:03:55 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master 5a7ceda61 -> 84fce54b6 [hotfix][scala-shell] fix location of classes according to package name Javadoc was throwing an error because it expected the class files to be organized in hierarchical directories. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84fce54b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84fce54b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84fce54b Branch: refs/heads/master Commit: 84fce54b652a3f8afca422323fa6e86c76bd55e0 Parents: 5a7ceda Author: Maximilian Michels Authored: Fri May 29 14:53:16 2015 +0200 Committer: Maximilian Michels Committed: Fri May 29 15:03:34 2015 +0200 ---------------------------------------------------------------------- .../org.apache.flink/api/java/JarHelper.java | 211 ------------------- .../api/java/ScalaShellRemoteEnvironment.java | 70 ------ .../org/apache/flink/api/java/JarHelper.java | 211 +++++++++++++++++++ .../api/java/ScalaShellRemoteEnvironment.java | 70 ++++++ 4 files changed, 281 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java deleted file mode 100644 index 5def4b0..0000000 --- a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/JarHelper.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java; - - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.FileInputStream; -import java.io.InputStream; - -import java.util.jar.JarOutputStream; -import java.util.jar.JarEntry; -import java.util.jar.JarInputStream; - -/** - * Provides utility services for jarring and unjarring files and directories. - * Note that a given instance of JarHelper is not threadsafe with respect to - * multiple jar operations. - * - * Copied from http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source - * - * @author Patrick Calahan - */ -public class JarHelper -{ - // ======================================================================== - // Constants - - private static final int BUFFER_SIZE = 2156; - - // ======================================================================== - // Variables - - private byte[] mBuffer = new byte[BUFFER_SIZE]; - private int mByteCount = 0; - private boolean mVerbose = false; - private String mDestJarName = ""; - - // ======================================================================== - // Constructor - - /** - * Instantiates a new JarHelper. - */ - public JarHelper() {} - - // ======================================================================== - // Public methods - - /** - * Jars a given directory or single file into a JarOutputStream. - */ - public void jarDir(File dirOrFile2Jar, File destJar) - throws IOException { - - if (dirOrFile2Jar == null || destJar == null) - { - throw new IllegalArgumentException(); - } - - mDestJarName = destJar.getCanonicalPath(); - FileOutputStream fout = new FileOutputStream(destJar); - JarOutputStream jout = new JarOutputStream(fout); - //jout.setLevel(0); - try { - jarDir(dirOrFile2Jar, jout, null); - } catch(IOException ioe) { - throw ioe; - } finally { - jout.close(); - fout.close(); - } - } - - /** - * Unjars a given jar file into a given directory. - */ - public void unjarDir(File jarFile, File destDir) throws IOException { - BufferedOutputStream dest = null; - FileInputStream fis = new FileInputStream(jarFile); - unjar(fis, destDir); - } - - /** - * Given an InputStream on a jar file, unjars the contents into the given - * directory. - */ - public void unjar(InputStream in, File destDir) throws IOException { - BufferedOutputStream dest = null; - JarInputStream jis = new JarInputStream(in); - JarEntry entry; - while ((entry = jis.getNextJarEntry()) != null) { - if (entry.isDirectory()) { - File dir = new File(destDir,entry.getName()); - dir.mkdir(); - if (entry.getTime() != -1) {dir.setLastModified(entry.getTime());} - continue; - } - int count; - byte[] data = new byte[ BUFFER_SIZE ]; - File destFile = new File(destDir, entry.getName()); - if (mVerbose) { - System.out.println("unjarring " + destFile + - " from " + entry.getName()); - } - FileOutputStream fos = new FileOutputStream(destFile); - dest = new BufferedOutputStream(fos, BUFFER_SIZE); - while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) { - dest.write(data, 0, count); - } - dest.flush(); - dest.close(); - if (entry.getTime() != -1) {destFile.setLastModified(entry.getTime());} - } - jis.close(); - } - - public void setVerbose(boolean b) { - mVerbose = b; - } - - // ======================================================================== - // Private methods - - private static final char SEP = '/'; - /** - * Recursively jars up the given path under the given directory. - */ - private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path) - throws IOException { - if (mVerbose) { System.out.println("checking " + dirOrFile2jar);} - if (dirOrFile2jar.isDirectory()) { - String[] dirList = dirOrFile2jar.list(); - String subPath = (path == null) ? "" : (path+dirOrFile2jar.getName()+SEP); - if (path != null) { - JarEntry je = new JarEntry(subPath); - je.setTime(dirOrFile2jar.lastModified()); - jos.putNextEntry(je); - jos.flush(); - jos.closeEntry(); - } - for (int i = 0; i < dirList.length; i++) { - File f = new File(dirOrFile2jar, dirList[i]); - jarDir(f,jos,subPath); - } - } else { - if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) - { - if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());} - return; - } - - if (mVerbose) { - System.out.println("adding " + dirOrFile2jar.getPath()); - } - FileInputStream fis = new FileInputStream(dirOrFile2jar); - try { - JarEntry entry = new JarEntry(path+dirOrFile2jar.getName()); - entry.setTime(dirOrFile2jar.lastModified()); - jos.putNextEntry(entry); - while ((mByteCount = fis.read(mBuffer)) != -1) { - jos.write(mBuffer, 0, mByteCount); - if (mVerbose) { System.out.println("wrote " + mByteCount + " bytes");} - } - jos.flush(); - jos.closeEntry(); - } catch (IOException ioe) { - throw ioe; - } finally { - fis.close(); - } - } - } - - // for debugging - public static void main(String[] args) - throws IOException - { - if (args.length < 2) - { - System.err.println("Usage: JarHelper jarname.jar directory"); - return; - } - - JarHelper jarHelper = new JarHelper(); - jarHelper.mVerbose = true; - - File destJar = new File(args[0]); - File dirOrFile2Jar = new File(args[1]); - - jarHelper.jarDir(dirOrFile2Jar, destJar); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java deleted file mode 100644 index 79f9576..0000000 --- a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java +++ /dev/null @@ -1,70 +0,0 @@ - -package org.apache.flink.api.java; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.PlanExecutor; - -import org.apache.flink.api.scala.FlinkILoop; - -/** - * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference - * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will - * use the reference of the ILoop to write the compiled classes of the current session to - * a Jar file and submit these with the program. - */ -public class ScalaShellRemoteEnvironment extends RemoteEnvironment { - - // reference to Scala Shell, for access to virtual directory - private FlinkILoop flinkILoop; - - /** - * Creates new ScalaShellRemoteEnvironment that has a reference to the FlinkILoop - * - * @param host The host name or address of the master (JobManager), where the program should be executed. - * @param port The port of the master (JobManager), where the program should be executed. - * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called. - */ - public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) { - super(host, port, jarFiles); - this.flinkILoop = flinkILoop; - } - - /** - * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment - * - * @param jobName name of the job as string - * @return Result of the computation - * @throws Exception - */ - @Override - public JobExecutionResult execute(String jobName) throws Exception { - Plan p = createProgramPlan(jobName); - - String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath(); - - // call "traditional" execution methods - PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFile); - - executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); - return executor.executePlan(p); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java new file mode 100644 index 0000000..5def4b0 --- /dev/null +++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java; + + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.FileInputStream; +import java.io.InputStream; + +import java.util.jar.JarOutputStream; +import java.util.jar.JarEntry; +import java.util.jar.JarInputStream; + +/** + * Provides utility services for jarring and unjarring files and directories. + * Note that a given instance of JarHelper is not threadsafe with respect to + * multiple jar operations. + * + * Copied from http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source + * + * @author Patrick Calahan + */ +public class JarHelper +{ + // ======================================================================== + // Constants + + private static final int BUFFER_SIZE = 2156; + + // ======================================================================== + // Variables + + private byte[] mBuffer = new byte[BUFFER_SIZE]; + private int mByteCount = 0; + private boolean mVerbose = false; + private String mDestJarName = ""; + + // ======================================================================== + // Constructor + + /** + * Instantiates a new JarHelper. + */ + public JarHelper() {} + + // ======================================================================== + // Public methods + + /** + * Jars a given directory or single file into a JarOutputStream. + */ + public void jarDir(File dirOrFile2Jar, File destJar) + throws IOException { + + if (dirOrFile2Jar == null || destJar == null) + { + throw new IllegalArgumentException(); + } + + mDestJarName = destJar.getCanonicalPath(); + FileOutputStream fout = new FileOutputStream(destJar); + JarOutputStream jout = new JarOutputStream(fout); + //jout.setLevel(0); + try { + jarDir(dirOrFile2Jar, jout, null); + } catch(IOException ioe) { + throw ioe; + } finally { + jout.close(); + fout.close(); + } + } + + /** + * Unjars a given jar file into a given directory. + */ + public void unjarDir(File jarFile, File destDir) throws IOException { + BufferedOutputStream dest = null; + FileInputStream fis = new FileInputStream(jarFile); + unjar(fis, destDir); + } + + /** + * Given an InputStream on a jar file, unjars the contents into the given + * directory. + */ + public void unjar(InputStream in, File destDir) throws IOException { + BufferedOutputStream dest = null; + JarInputStream jis = new JarInputStream(in); + JarEntry entry; + while ((entry = jis.getNextJarEntry()) != null) { + if (entry.isDirectory()) { + File dir = new File(destDir,entry.getName()); + dir.mkdir(); + if (entry.getTime() != -1) {dir.setLastModified(entry.getTime());} + continue; + } + int count; + byte[] data = new byte[ BUFFER_SIZE ]; + File destFile = new File(destDir, entry.getName()); + if (mVerbose) { + System.out.println("unjarring " + destFile + + " from " + entry.getName()); + } + FileOutputStream fos = new FileOutputStream(destFile); + dest = new BufferedOutputStream(fos, BUFFER_SIZE); + while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) { + dest.write(data, 0, count); + } + dest.flush(); + dest.close(); + if (entry.getTime() != -1) {destFile.setLastModified(entry.getTime());} + } + jis.close(); + } + + public void setVerbose(boolean b) { + mVerbose = b; + } + + // ======================================================================== + // Private methods + + private static final char SEP = '/'; + /** + * Recursively jars up the given path under the given directory. + */ + private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path) + throws IOException { + if (mVerbose) { System.out.println("checking " + dirOrFile2jar);} + if (dirOrFile2jar.isDirectory()) { + String[] dirList = dirOrFile2jar.list(); + String subPath = (path == null) ? "" : (path+dirOrFile2jar.getName()+SEP); + if (path != null) { + JarEntry je = new JarEntry(subPath); + je.setTime(dirOrFile2jar.lastModified()); + jos.putNextEntry(je); + jos.flush(); + jos.closeEntry(); + } + for (int i = 0; i < dirList.length; i++) { + File f = new File(dirOrFile2jar, dirList[i]); + jarDir(f,jos,subPath); + } + } else { + if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) + { + if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());} + return; + } + + if (mVerbose) { + System.out.println("adding " + dirOrFile2jar.getPath()); + } + FileInputStream fis = new FileInputStream(dirOrFile2jar); + try { + JarEntry entry = new JarEntry(path+dirOrFile2jar.getName()); + entry.setTime(dirOrFile2jar.lastModified()); + jos.putNextEntry(entry); + while ((mByteCount = fis.read(mBuffer)) != -1) { + jos.write(mBuffer, 0, mByteCount); + if (mVerbose) { System.out.println("wrote " + mByteCount + " bytes");} + } + jos.flush(); + jos.closeEntry(); + } catch (IOException ioe) { + throw ioe; + } finally { + fis.close(); + } + } + } + + // for debugging + public static void main(String[] args) + throws IOException + { + if (args.length < 2) + { + System.err.println("Usage: JarHelper jarname.jar directory"); + return; + } + + JarHelper jarHelper = new JarHelper(); + jarHelper.mVerbose = true; + + File destJar = new File(args[0]); + File dirOrFile2Jar = new File(args[1]); + + jarHelper.jarDir(dirOrFile2Jar, destJar); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/84fce54b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java new file mode 100644 index 0000000..79f9576 --- /dev/null +++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java @@ -0,0 +1,70 @@ + +package org.apache.flink.api.java; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.PlanExecutor; + +import org.apache.flink.api.scala.FlinkILoop; + +/** + * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference + * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will + * use the reference of the ILoop to write the compiled classes of the current session to + * a Jar file and submit these with the program. + */ +public class ScalaShellRemoteEnvironment extends RemoteEnvironment { + + // reference to Scala Shell, for access to virtual directory + private FlinkILoop flinkILoop; + + /** + * Creates new ScalaShellRemoteEnvironment that has a reference to the FlinkILoop + * + * @param host The host name or address of the master (JobManager), where the program should be executed. + * @param port The port of the master (JobManager), where the program should be executed. + * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called. + */ + public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) { + super(host, port, jarFiles); + this.flinkILoop = flinkILoop; + } + + /** + * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment + * + * @param jobName name of the job as string + * @return Result of the computation + * @throws Exception + */ + @Override + public JobExecutionResult execute(String jobName) throws Exception { + Plan p = createProgramPlan(jobName); + + String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath(); + + // call "traditional" execution methods + PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFile); + + executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); + return executor.executePlan(p); + } +}