zookeeper-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "assaf.epstein" <assa...@gmail.com>
Subject Re: Bulk load or ingest
Date Sun, 14 Sep 2014 11:25:36 GMT
/**
There you go.
You'll need these in your build.gradle/pom.xml
    compile 'ch.qos.logback:logback-classic:1.1.2'
    compile 'org.apache.zookeeper:zookeeper:pom:3.4.6'
*/


package om.services.cl;

import ch.qos.logback.classic.Level;
import om.services.Utils;
import org.apache.zookeeper.ZooKeeperMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

/**
 * CL tool for executing bulk of operations read from a file into zookeeper
 */
public class ZkBulkLoad {

    private ZooKeeperMain zkMain;
    private int currentLineNumber;
    private List<String> failureMessages = new ArrayList<>();

    public static void main(String[] args) throws IOException,
InterruptedException {

        final String PROP_KEY_BULK_FILE = "zkBulkLoad.bulkFile";
        final String PROP_KEY_LOG_LEVEL = "zkBulkLoad.logLevel";
        final Level DEFAULT_LOG_LEVEL = Level.WARN;

        setRootLogLevel(PROP_KEY_LOG_LEVEL, DEFAULT_LOG_LEVEL);

        ZooKeeperMain zkMain = new ZooKeeperMain(args);
        final String bulkFilePath =
Utils.getMandatoryProperty(PROP_KEY_BULK_FILE);
        final ZkBulkLoad zkBulkLoad = new ZkBulkLoad(zkMain);
        try {
            final Result result = zkBulkLoad.load(new File(bulkFilePath));
            if (!result.getFailures().isEmpty()) {
                System.err.println(result.getFailures());
            }
        } finally {
            zkBulkLoad.close();
        }
    }

    public ZkBulkLoad(final ZooKeeperMain zkMain) {
        this.zkMain = zkMain;
    }

    public Result load(final File bulkFile) throws IOException {
        resetState();
        final Stream<String> lines =
Files.lines(bulkFile.toPath()).filter(line -> !line.isEmpty());
        try {
            lines.forEach(this::executeLine);
            return new Result(failureMessages);
        } finally {
            if (lines!=null) {
                lines.close();
            }
            if (!failureMessages.isEmpty()) {
                failureMessages.add(0, String.format("\nThere were %s
failures:\n", failureMessages.size()));
            }
        }
    }

    public void close() {}

    // ----------------------------------------------- private
---------------------------------------------------------


    private static void setRootLogLevel(final String propKeyLogLevel,
                                        final Level defaultLogLevel) {
        final ch.qos.logback.classic.Logger rootLogger =
                (ch.qos.logback.classic.Logger)
LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
        final String providedLogLevel = System.getProperty(propKeyLogLevel);
        if (providedLogLevel == null) {
            System.out.printf("property %s was not provided, setting root
logger level by default to WARN\n", propKeyLogLevel);
            rootLogger.setLevel(defaultLogLevel);
        } else {
            Level logLevel = Level.toLevel(providedLogLevel,
defaultLogLevel);
            System.out.printf("property %s was provided [%s]. setting root
logger level to [%s]\n", propKeyLogLevel, providedLogLevel, logLevel);
            rootLogger.setLevel(logLevel);
        }
    }

    private void resetState() {
        currentLineNumber = 1;
        failureMessages.clear();
    }

    private void executeLine(String line) {
        try {
            System.out.println("executing: " + line);
            zkMain.executeLine(line);
        } catch (Exception e) {
            failureMessages.add("line " + currentLineNumber + ": " +
e.getMessage());
        }
        currentLineNumber++;
    }

    class Result {
        List<String> failures = new ArrayList<>();

        public Result(List<String> failures) {
            this.failures = failures;
        }

        public List<String> getFailures() {
            return failures;
        }

        public void setFailures(List<String> failures) {
            this.failures = failures;
        }
    }
}




--
View this message in context: http://zookeeper-user.578899.n2.nabble.com/Bulk-load-or-ingest-tp7579884p7580267.html
Sent from the zookeeper-user mailing list archive at Nabble.com.

Mime
View raw message