asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ima...@apache.org
Subject [14/50] [abbrv] incubator-asterixdb git commit: Merge branch 'master' into hyracks-merge2
Date Thu, 07 Apr 2016 14:59:49 GMT
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
index 4eec348,0000000..ea5cc8f
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@@ -1,276 -1,0 +1,283 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.util;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.ClosedWatchServiceException;
 +import java.nio.file.FileSystems;
 +import java.nio.file.Files;
 +import java.nio.file.LinkOption;
 +import java.nio.file.Path;
 +import java.nio.file.StandardWatchEventKinds;
 +import java.nio.file.WatchEvent;
 +import java.nio.file.WatchEvent.Kind;
 +import java.nio.file.WatchKey;
 +import java.nio.file.WatchService;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.LinkedList;
++import java.util.List;
++import java.util.concurrent.locks.ReentrantLock;
 +
- import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +
 +public class FileSystemWatcher {
 +
 +    private static final Logger LOGGER = Logger.getLogger(FileSystemWatcher.class.getName());
 +    private WatchService watcher;
 +    private final HashMap<WatchKey, Path> keys;
 +    private final LinkedList<File> files = new LinkedList<File>();
 +    private Iterator<File> it;
 +    private final String expression;
 +    private FeedLogManager logManager;
-     private final Path path;
++    private final List<Path> paths;
 +    private final boolean isFeed;
 +    private boolean done;
-     private File current;
-     private AbstractFeedDataFlowController controller;
 +    private final LinkedList<Path> dirs;
++    private final ReentrantLock lock = new ReentrantLock();
 +
-     public FileSystemWatcher(Path inputResource, String expression, boolean isFeed) {
++    public FileSystemWatcher(List<Path> inputResources, String expression, boolean
isFeed) throws HyracksDataException {
++        this.isFeed = isFeed;
 +        this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
 +        this.expression = expression;
-         this.path = inputResource;
-         this.isFeed = isFeed;
++        this.paths = inputResources;
 +        this.dirs = new LinkedList<Path>();
++        if (!isFeed) {
++            init();
++        }
 +    }
 +
-     public void setFeedLogManager(FeedLogManager feedLogManager) {
-         this.logManager = feedLogManager;
++    public synchronized void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException
{
++        if (logManager == null) {
++            this.logManager = feedLogManager;
++            init();
++        }
 +    }
 +
-     public void init() throws HyracksDataException {
++    public synchronized void init() throws HyracksDataException {
 +        try {
 +            dirs.clear();
-             LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
-             it = files.iterator();
-             if (isFeed) {
-                 keys.clear();
-                 if (watcher != null) {
-                     try {
-                         watcher.close();
-                     } catch (IOException e) {
-                         LOGGER.warn("Failed to close watcher service", e);
++            for (Path path : paths) {
++                LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
++                it = files.iterator();
++                if (isFeed) {
++                    keys.clear();
++                    if (watcher != null) {
++                        try {
++                            watcher.close();
++                        } catch (IOException e) {
++                            LOGGER.warn("Failed to close watcher service", e);
++                        }
++                    }
++                    watcher = FileSystems.getDefault().newWatchService();
++                    for (Path dirPath : dirs) {
++                        register(dirPath);
++                    }
++                    resume();
++                } else {
++                    if (files.isEmpty()) {
++                        throw new HyracksDataException(path + ": no files found");
 +                    }
 +                }
-                 watcher = FileSystems.getDefault().newWatchService();
-                 for (Path path : dirs) {
-                     register(path);
-                 }
-                 resume();
 +            }
 +        } catch (IOException e) {
 +            throw new HyracksDataException(e);
 +        }
 +    }
 +
 +    /**
 +     * Register the given directory, and all its sub-directories, with the
 +     * WatchService.
 +     */
 +    private void register(Path dir) throws IOException {
 +        WatchKey key = dir.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE,
 +                StandardWatchEventKinds.ENTRY_MODIFY);
 +        keys.put(key, dir);
 +    }
 +
-     private void resume() throws IOException {
++    private synchronized void resume() throws IOException {
 +        if (logManager == null) {
 +            return;
 +        }
 +        /*
 +         * Done processing the progress log file. We now have:
 +         * the files that were completed.
 +         */
 +
 +        if (it == null) {
 +            return;
 +        }
 +        while (it.hasNext()) {
 +            File file = it.next();
 +            if (logManager.isSplitRead(file.getAbsolutePath())) {
 +                // File was read completely, remove it from the files list
 +                it.remove();
 +            }
 +        }
 +        // reset the iterator
 +        it = files.iterator();
 +    }
 +
 +    @SuppressWarnings("unchecked")
 +    static <T> WatchEvent<T> cast(WatchEvent<?> event) {
 +        return (WatchEvent<T>) event;
 +    }
 +
 +    private void handleEvents(WatchKey key) throws IOException {
 +        // get dir associated with the key
 +        Path dir = keys.get(key);
 +        if (dir == null) {
 +            // This should never happen
 +            if (LOGGER.isEnabledFor(Level.WARN)) {
 +                LOGGER.warn("WatchKey not recognized!!");
 +            }
 +            return;
 +        }
 +        for (WatchEvent<?> event : key.pollEvents()) {
 +            Kind<?> kind = event.kind();
-             // TODO: Do something about overflow events
 +            // An overflow event means that some events were dropped
 +            if (kind == StandardWatchEventKinds.OVERFLOW) {
 +                if (LOGGER.isEnabledFor(Level.WARN)) {
 +                    LOGGER.warn("Overflow event. Some events might have been missed");
 +                }
 +                // need to read and validate all files.
-                 //TODO: use btrees for all logs
 +                init();
 +                return;
 +            }
 +
 +            // Context for directory entry event is the file name of entry
 +            WatchEvent<Path> ev = cast(event);
 +            Path name = ev.context();
 +            Path child = dir.resolve(name);
 +            // if directory is created then register it and its sub-directories
 +            if ((kind == StandardWatchEventKinds.ENTRY_CREATE)) {
 +                try {
 +                    if (Files.isDirectory(child, LinkOption.NOFOLLOW_LINKS)) {
 +                        register(child);
 +                    } else {
 +                        // it is a file, add it to the files list.
 +                        LocalFileSystemUtils.validateAndAdd(child, expression, files);
 +                    }
 +                } catch (IOException e) {
 +                    if (LOGGER.isEnabledFor(Level.ERROR)) {
 +                        LOGGER.error(e);
 +                    }
 +                }
 +            }
 +        }
++        it = files.iterator();
 +    }
 +
-     public void close() throws IOException {
++    public synchronized void close() throws IOException {
 +        if (!done) {
 +            if (watcher != null) {
 +                watcher.close();
 +                watcher = null;
 +            }
-             if (logManager != null) {
-                 if (current != null) {
-                     logManager.startPartition(current.getAbsolutePath());
-                     logManager.endPartition();
-                 }
-                 logManager.close();
-                 current = null;
-             }
 +            done = true;
 +        }
 +    }
 +
-     public File next() throws IOException {
-         if ((current != null) && (logManager != null)) {
-             logManager.startPartition(current.getAbsolutePath());
-             logManager.endPartition();
-         }
-         current = it.next();
-         return current;
-     }
- 
-     private boolean endOfEvents(WatchKey key) {
-         // reset key and remove from set if directory no longer accessible
-         if (!key.reset()) {
-             keys.remove(key);
-             if (keys.isEmpty()) {
-                 return true;
-             }
-         }
-         return false;
-     }
- 
-     public boolean hasNext() throws IOException {
++    // poll is not blocking
++    public synchronized File poll() throws IOException {
 +        if (it.hasNext()) {
-             return true;
++            return it.next();
 +        }
 +        if (done || !isFeed) {
-             return false;
++            return null;
 +        }
 +        files.clear();
++        it = files.iterator();
 +        if (keys.isEmpty()) {
-             return false;
++            close();
++            return null;
 +        }
 +        // Read new Events (Polling first to add all available files)
 +        WatchKey key;
 +        key = watcher.poll();
 +        while (key != null) {
 +            handleEvents(key);
 +            if (endOfEvents(key)) {
 +                close();
-                 return false;
++                return null;
 +            }
 +            key = watcher.poll();
 +        }
-         // No file was found, wait for the filesystem to push events
-         if (controller != null) {
-             controller.flush();
++        return null;
++    }
++
++    // take is blocking
++    public synchronized File take() throws IOException {
++        File next = poll();
++        if (next != null) {
++            return next;
 +        }
-         while (files.isEmpty()) {
-             try {
-                 key = watcher.take();
-             } catch (InterruptedException x) {
-                 if (LOGGER.isEnabledFor(Level.WARN)) {
-                     LOGGER.warn("Feed Closed");
-                 }
-                 if (watcher == null) {
-                     return false;
-                 }
-                 continue;
-             } catch (ClosedWatchServiceException e) {
-                 if (LOGGER.isEnabledFor(Level.WARN)) {
-                     LOGGER.warn("The watcher has exited");
++        if (done || !isFeed) {
++            return null;
++        }
++        // No file was found, wait for the filesystem to push events
++        WatchKey key = null;
++        lock.lock();
++        try {
++            while (!it.hasNext()) {
++                try {
++                    key = watcher.take();
++                } catch (InterruptedException x) {
++                    if (LOGGER.isEnabledFor(Level.WARN)) {
++                        LOGGER.warn("Feed Closed");
++                    }
++                    if (watcher == null) {
++                        return null;
++                    }
++                    continue;
++                } catch (ClosedWatchServiceException e) {
++                    if (LOGGER.isEnabledFor(Level.WARN)) {
++                        LOGGER.warn("The watcher has exited");
++                    }
++                    if (watcher == null) {
++                        return null;
++                    }
++                    continue;
 +                }
-                 if (watcher == null) {
-                     return false;
++                handleEvents(key);
++                if (endOfEvents(key)) {
++                    return null;
 +                }
-                 continue;
-             }
-             handleEvents(key);
-             if (endOfEvents(key)) {
-                 return false;
 +            }
++        } finally {
++            lock.unlock();
 +        }
 +        // files were found, re-create the iterator and move it one step
-         it = files.iterator();
-         return it.hasNext();
++        return it.next();
 +    }
 +
-     public void setController(AbstractFeedDataFlowController controller) {
-         this.controller = controller;
++    private boolean endOfEvents(WatchKey key) {
++        // reset key and remove from set if directory no longer accessible
++        if (!key.reset()) {
++            keys.remove(key);
++            if (keys.isEmpty()) {
++                return true;
++            }
++        }
++        return false;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
index d6e9463,0000000..16dd1e9
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/LocalFileSystemUtils.java
@@@ -1,75 -1,0 +1,76 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.util;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.FileVisitResult;
 +import java.nio.file.Files;
 +import java.nio.file.LinkOption;
 +import java.nio.file.Path;
 +import java.nio.file.SimpleFileVisitor;
 +import java.nio.file.attribute.BasicFileAttributes;
 +import java.util.LinkedList;
 +import java.util.regex.Pattern;
 +
++import org.apache.hyracks.api.exceptions.HyracksDataException;
++
 +public class LocalFileSystemUtils {
 +
-     //TODO: replace this method by FileUtils.iterateFilesAndDirs(.)
 +    public static void traverse(final LinkedList<File> files, File root, final String
expression,
 +            final LinkedList<Path> dirs) throws IOException {
-         if (!Files.exists(root.toPath())) {
-             return;
++        final Path path = root.toPath();
++        if (!Files.exists(path)) {
++            throw new HyracksDataException(path + ": path not found");
 +        }
-         if (!Files.isDirectory(root.toPath())) {
-             validateAndAdd(root.toPath(), expression, files);
++        if (!Files.isDirectory(path)) {
++            validateAndAdd(path, expression, files);
 +        }
-         //FileUtils.iterateFilesAndDirs(directory, fileFilter, dirFilter)
-         Files.walkFileTree(root.toPath(), new SimpleFileVisitor<Path>() {
++        Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
 +            @Override
 +            public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes attrs)
throws IOException {
 +                if (!Files.exists(path, LinkOption.NOFOLLOW_LINKS)) {
 +                    return FileVisitResult.TERMINATE;
 +                }
 +                if (Files.isDirectory(path, LinkOption.NOFOLLOW_LINKS)) {
 +                    if (dirs != null) {
 +                        dirs.add(path);
 +                    }
 +                    //get immediate children files
 +                    File[] content = path.toFile().listFiles();
 +                    for (File file : content) {
 +                        if (!file.isDirectory()) {
 +                            validateAndAdd(file.toPath(), expression, files);
 +                        }
 +                    }
 +                } else {
 +                    // Path is a file, add to list of files if it matches the expression
 +                    validateAndAdd(path, expression, files);
 +                }
 +                return FileVisitResult.CONTINUE;
 +            }
 +        });
 +    }
 +
 +    public static void validateAndAdd(Path path, String expression, LinkedList<File>
files) {
 +        if (expression == null || Pattern.matches(expression, path.toString())) {
 +            files.add(new File(path.toString()));
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8516517e/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
----------------------------------------------------------------------
diff --cc asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
index d822310,0000000..493bd3b
mode 100644,000000..100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
@@@ -1,110 -1,0 +1,189 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.classad.test;
 +
++import java.io.File;
++import java.io.PrintStream;
++import java.nio.file.Files;
++import java.nio.file.Path;
 +import java.nio.file.Paths;
++import java.util.ArrayList;
++import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import org.apache.asterix.external.api.IRawRecord;
 +import org.apache.asterix.external.classad.CaseInsensitiveString;
 +import org.apache.asterix.external.classad.CharArrayLexerSource;
 +import org.apache.asterix.external.classad.ClassAd;
 +import org.apache.asterix.external.classad.ExprTree;
 +import org.apache.asterix.external.classad.Value;
 +import org.apache.asterix.external.classad.object.pool.ClassAdObjectPool;
 +import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
 +import org.apache.asterix.external.input.stream.LocalFSInputStream;
 +import org.apache.asterix.external.library.ClassAdParser;
- import org.apache.hyracks.api.io.FileReference;
- import org.apache.hyracks.dataflow.std.file.FileSplit;
++import org.apache.asterix.external.util.FileSystemWatcher;
++import org.apache.asterix.formats.nontagged.AqlADMPrinterFactoryProvider;
++import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
++import org.apache.asterix.om.types.ARecordType;
++import org.apache.asterix.om.types.BuiltinType;
++import org.apache.asterix.om.types.IAType;
++import org.apache.commons.io.FileUtils;
++import org.apache.hyracks.algebricks.data.IPrinter;
++import org.apache.hyracks.algebricks.data.IPrinterFactory;
++import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
++import org.apache.hyracks.api.exceptions.HyracksDataException;
++import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
++import org.junit.Assert;
 +
 +import junit.framework.Test;
 +import junit.framework.TestCase;
 +import junit.framework.TestSuite;
 +
 +public class ClassAdToADMTest extends TestCase {
 +    /**
 +     * Create the test case
 +     *
 +     * @param testName
 +     *            name of the test case
 +     */
 +    public ClassAdToADMTest(String testName) {
 +        super(testName);
 +    }
 +
 +    /**
 +     * @return the suite of tests being tested
 +     */
 +    public static Test suite() {
 +        return new TestSuite(ClassAdToADMTest.class);
 +    }
 +
++    private void printTuple(ArrayTupleBuilder tb, IPrinter[] printers, PrintStream printStream)
++            throws HyracksDataException {
++        int[] offsets = tb.getFieldEndOffsets();
++        for (int i = 0; i < printers.length; i++) {
++            int offset = i == 0 ? 0 : offsets[i - 1];
++            int length = i == 0 ? offsets[0] : offsets[i] - offsets[i - 1];
++            printers[i].print(tb.getByteArray(), offset, length, printStream);
++            printStream.println();
++        }
++    }
++
++    @SuppressWarnings("rawtypes")
++    public void testSchemaful() {
++        try {
++            File file = new File("target/classad-wtih-temporals.adm");
++            File expected = new File(getClass().getResource("/results/classad-with-temporals.adm").toURI().getPath());
++            FileUtils.deleteQuietly(file);
++            PrintStream printStream = new PrintStream(Files.newOutputStream(Paths.get(file.toURI())));
++            String[] recordFieldNames = { "GlobalJobId", "Owner", "ClusterId", "ProcId",
"RemoteWallClockTime",
++                    "CompletionDate", "QDate", "JobCurrentStartDate", "JobStartDate", "JobCurrentStartExecutingDate"
};
++            IAType[] recordFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
++                    BuiltinType.AINT32, BuiltinType.ADURATION, BuiltinType.ADATETIME, BuiltinType.ADATETIME,
++                    BuiltinType.ADATETIME, BuiltinType.ADATETIME, BuiltinType.ADATETIME
};
++            ARecordType recordType = new ARecordType("value", recordFieldNames, recordFieldTypes,
true);
++            int numOfTupleFields = 1;
++            ISerializerDeserializer[] serdes = new ISerializerDeserializer[1];
++            serdes[0] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(recordType);
++            IPrinterFactory[] printerFactories = new IPrinterFactory[1];
++            printerFactories[0] = AqlADMPrinterFactoryProvider.INSTANCE.getPrinterFactory(recordType);
++            // create output descriptor
++            IPrinter[] printers = new IPrinter[printerFactories.length];
++            for (int i = 0; i < printerFactories.length; i++) {
++                printers[i] = printerFactories[i].createPrinter();
++            }
++            ClassAdObjectPool objectPool = new ClassAdObjectPool();
++            String[] files = new String[] { "/classad-with-temporals.classads" };
++            ClassAdParser parser = new ClassAdParser(recordType, false, false, false, null,
null, null, objectPool);
++            ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
++            for (String path : files) {
++                List<Path> paths = new ArrayList<>();
++                paths.add(Paths.get(getClass().getResource(path).toURI()));
++                FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
++                LocalFSInputStream in = new LocalFSInputStream(watcher);
++                SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in,
"[", "]");
++                while (recordReader.hasNext()) {
++                    tb.reset();
++                    IRawRecord<char[]> record = recordReader.next();
++                    parser.parse(record, tb.getDataOutput());
++                    tb.addFieldEndOffset();
++                    printTuple(tb, printers, printStream);
++                }
++                recordReader.close();
++                printStream.close();
++                Assert.assertTrue(FileUtils.contentEquals(file, expected));
++            }
++        } catch (Throwable th) {
++            System.err.println("TEST FAILED");
++            th.printStackTrace();
++            Assert.assertTrue(false);
++        }
++        System.err.println("TEST PASSED");
++    }
++
 +    /**
 +     *
 +     */
-     public void test() {
++    public void testSchemaless() {
 +        try {
-             // test here
 +            ClassAdObjectPool objectPool = new ClassAdObjectPool();
 +            ClassAd pAd = new ClassAd(objectPool);
 +            String[] files = new String[] { "/jobads.txt" };
 +            ClassAdParser parser = new ClassAdParser(objectPool);
 +            CharArrayLexerSource lexerSource = new CharArrayLexerSource();
 +            for (String path : files) {
-                 LocalFSInputStream in = new LocalFSInputStream(
-                         new FileSplit[] { new FileSplit("",
-                                 new FileReference(Paths.get(getClass().getResource(path).toURI()).toFile()))
},
-                         null, null, 0, null, false);
-                 SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in,
null, "[", "]");
++                List<Path> paths = new ArrayList<>();
++                paths.add(Paths.get(getClass().getResource(path).toURI()));
++                FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
++                LocalFSInputStream in = new LocalFSInputStream(watcher);
++                SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in,
"[", "]");
 +                Value val = new Value(objectPool);
 +                while (recordReader.hasNext()) {
 +                    val.reset();
 +                    IRawRecord<char[]> record = recordReader.next();
 +                    lexerSource.setNewSource(record.get());
 +                    parser.setLexerSource(lexerSource);
 +                    parser.parseNext(pAd);
 +                    Map<CaseInsensitiveString, ExprTree> attrs = pAd.getAttrList();
 +                    for (Entry<CaseInsensitiveString, ExprTree> entry : attrs.entrySet())
{
 +                        ExprTree tree = entry.getValue();
 +                        switch (tree.getKind()) {
 +                            case ATTRREF_NODE:
 +                            case CLASSAD_NODE:
 +                            case EXPR_ENVELOPE:
 +                            case EXPR_LIST_NODE:
 +                            case FN_CALL_NODE:
 +                            case OP_NODE:
 +                                break;
 +                            case LITERAL_NODE:
 +                                break;
 +                            default:
 +                                System.out.println("Something is wrong");
 +                                break;
 +                        }
 +                    }
 +                }
 +                recordReader.close();
 +            }
 +        } catch (Exception e) {
 +            e.printStackTrace();
 +            assertTrue(false);
 +        }
 +    }
 +}


Mime
View raw message