Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C532A186E6 for ; Wed, 20 Apr 2016 06:00:43 +0000 (UTC) Received: (qmail 16083 invoked by uid 500); 20 Apr 2016 06:00:38 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 16017 invoked by uid 500); 20 Apr 2016 06:00:38 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 16006 invoked by uid 99); 20 Apr 2016 06:00:38 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Apr 2016 06:00:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 2B446180501 for ; Wed, 20 Apr 2016 06:00:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id mDns5PYEHBAY for ; Wed, 20 Apr 2016 06:00:36 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id A65865F474 for ; Wed, 20 Apr 2016 06:00:34 +0000 (UTC) Received: (qmail 14116 invoked by uid 99); 20 Apr 2016 06:00:33 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Apr 2016 06:00:33 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 2FFBD2C1F62 for ; Wed, 20 Apr 2016 06:00:33 +0000 (UTC) Date: Wed, 20 Apr 2016 06:00:33 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@apex.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (APEXMALHAR-1965) Create a WAL in Malhar MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/APEXMALHAR-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249325#comment-15249325 ] ASF GitHub Bot commented on APEXMALHAR-1965: -------------------------------------------- Github user chandnisingh commented on a diff in the pull request: https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60353719 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java --- @@ -0,0 +1,594 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.wal; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.utils.FileContextUtils; +import org.apache.apex.malhar.lib.utils.IOUtils; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.fs.local.LocalFs; +import org.apache.hadoop.fs.local.RawLocalFs; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.netlet.util.Slice; + +public class FileSystemWAL implements WAL +{ + + @NotNull + private String filePath; + + //max length of the file + @Min(0) + private long maxLength; + + @NotNull + private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this); + + @NotNull + private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this); + + //part => tmp file path; + private final ConcurrentSkipListMap tempPartFiles = new ConcurrentSkipListMap<>(); + + private long lastCheckpointedWindow = Stateless.WINDOW_ID; + + @Override + public void setup() + { + try { + FileContext fileContext = FileContextUtils.getFileContext(filePath); + if (maxLength == 0) { + maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize(); + } + fileSystemWALWriter.open(fileContext); + fileSystemWALReader.open(fileContext); + } catch (IOException e) { + throw new RuntimeException("during setup", e); + } + } + + @Override + public void beforeCheckpoint(long window) + { + try { + lastCheckpointedWindow = window; + fileSystemWALWriter.flush(); + } catch (IOException e) { + throw new RuntimeException("during before cp", e); + } + } + + @Override + public void committed(long window) + { + try { + fileSystemWALWriter.finalizeFiles(window); + } catch (IOException e) { + throw new RuntimeException("during committed", e); + } + } + + @Override + public void teardown() + { + try { + fileSystemWALReader.close(); + fileSystemWALWriter.close(); + } catch (IOException e) { + throw new RuntimeException("during teardown", e); + } + } + + protected long getLastCheckpointedWindow() + { + return lastCheckpointedWindow; + } + + protected String getPartFilePath(int partNumber) + { + return filePath + "_" + partNumber; + } + + @Override + public FileSystemWALReader getReader() + { + return fileSystemWALReader; + } + + /** + * Sets the File System WAL Reader. This can be used to override the default wal reader. + * + * @param fileSystemWALReader wal reader. + */ + public void setFileSystemWALReader(@NotNull FileSystemWALReader fileSystemWALReader) + { + this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader"); + } + + @Override + public FileSystemWALWriter getWriter() + { + return fileSystemWALWriter; + } + + /** + * Sets the File System WAL Writer. This can be used to override the default wal writer. + * + * @param fileSystemWALWriter wal writer. + */ + public void setFileSystemWALWriter(@NotNull FileSystemWALWriter fileSystemWALWriter) + { + this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer"); + } + + /** + * @return WAL file path + */ + public String getFilePath() + { + return filePath; + } + + /** + * Sets the WAL file path. + * + * @param filePath WAL file path + */ + public void setFilePath(@NotNull String filePath) + { + this.filePath = Preconditions.checkNotNull(filePath, "filePath"); + } + + /** + * @return max length of a WAL part file. + */ + public long getMaxLength() + { + return maxLength; + } + + /** + * Sets the maximum length of a WAL part file. + * + * @param maxLength max length of the WAL part file + */ + public void setMaxLength(long maxLength) + { + this.maxLength = maxLength; + } + + public static class FileSystemWALPointer implements Comparable + { + private final int partNum; + private long offset; + + private FileSystemWALPointer() + { + //for kryo + partNum = -1; + } + + public FileSystemWALPointer(long offset) + { + this(0, offset); + } + + public FileSystemWALPointer(int partNum, long offset) + { + this.partNum = partNum; + this.offset = offset; + } + + @Override + public int compareTo(@NotNull FileSystemWALPointer o) + { + if (this.partNum < o.partNum) { + return -1; + } + if (this.partNum > o.partNum) { + return 1; + } + if (this.offset < o.offset) { + return -1; + } + if (this.offset > o.offset) { + return 1; + } + return 0; + } + + public int getPartNum() + { + return partNum; + } + + public long getOffset() + { + return offset; + } + + @Override + public String toString() + { + return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset + '}'; + } + } + + /** + * A FileSystem Wal Reader + */ + public static class FileSystemWALReader implements WAL.WALReader + { + private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0); + + private transient DataInputStream inputStream; + private transient Path currentOpenPath; + + private final FileSystemWAL fileSystemWAL; + private transient FileContext fileContext; + + private FileSystemWALReader() + { + //for kryo + fileSystemWAL = null; + } + + public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal) + { + this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal"); + } + + protected void open(@NotNull FileContext fileContext) throws IOException + { + this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext"); + } + + protected void close() throws IOException + { + if (inputStream != null) { + inputStream.close(); + inputStream = null; + } + } + + @Override + public void seek(FileSystemWALPointer pointer) throws IOException + { + if (inputStream != null) { + close(); + } + inputStream = getInputStream(pointer); + Preconditions.checkNotNull(inputStream, "invalid pointer " + pointer); + currentPointer = pointer; + } + + /** + * Move to the next WAL segment. + * + * @return true if the next part file exists and is opened; false otherwise. + * @throws IOException + */ + private boolean nextSegment() throws IOException + { + if (inputStream != null) { + close(); + } + + currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0); + inputStream = getInputStream(currentPointer); + + return inputStream != null; + } + + private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException + { + Preconditions.checkArgument(inputStream == null, "input stream not null"); + Path pathToReadFrom; + String tmpPath = fileSystemWAL.tempPartFiles.get(walPointer.getPartNum()); + if (tmpPath != null) { + pathToReadFrom = new Path(tmpPath); + } else { + pathToReadFrom = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum)); + } + + LOG.debug("path to read {} and pointer {}", pathToReadFrom, walPointer); + if (fileContext.util().exists(pathToReadFrom)) { + DataInputStream stream = fileContext.open(pathToReadFrom); + if (walPointer.offset > 0) { + stream.skip(walPointer.offset); + } + currentOpenPath = pathToReadFrom; + return stream; + } + return null; + } + + @Override + public Slice next() throws IOException + { + do { + if (inputStream == null) { + inputStream = getInputStream(currentPointer); + } + + if (inputStream != null && !fileContext.util().exists(currentOpenPath)) { --- End diff -- Ok will do that. Was trying to minimize the overlap between reader and writer. > Create a WAL in Malhar > ---------------------- > > Key: APEXMALHAR-1965 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-1965 > Project: Apache Apex Malhar > Issue Type: Task > Reporter: Chandni Singh > Assignee: Tushar Gosavi > > In Malhar we have an IdempotentStorageManager which we use like a Write Ahead Logger. There have been some other places where we have created a different flavor of Write Ahead Logger. > We need to find overlap between all these flavors and create a common Write Ahead Logger for use in Apex core and Apex malhar. -- This message was sent by Atlassian JIRA (v6.3.4#6332)