incubator-connectors-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1203700 - in /incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog: interfaces/FilterAccessor.java interfaces/FilterRelationship.java tablestore/Table.java tablestore/TableStore.java
Date Fri, 18 Nov 2011 15:30:22 GMT
Author: kwright
Date: Fri Nov 18 15:30:21 2011
New Revision: 1203700

URL: http://svn.apache.org/viewvc?rev=1203700&view=rev
Log:
Complete the implementation of tables.

Added:
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterAccessor.java
  (with props)
Modified:
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterRelationship.java
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/tablestore/Table.java
    incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/tablestore/TableStore.java

Added: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterAccessor.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterAccessor.java?rev=1203700&view=auto
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterAccessor.java
(added)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterAccessor.java
Fri Nov 18 15:30:21 2011
@@ -0,0 +1,78 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.warthog.interfaces;
+
+/** Accessor class that performs generic filtering.
+*/
+public class FilterAccessor implements WHAccessor
+{
+  protected WHRelationship relationshipToFilter;
+  protected WHFilter filter;
+  protected WHAccessor baseAccessor;
+  protected long nextRow = -1L;
+    
+  public FilterAccessor(WHAccessor baseAccessor, WHRelationship relationshipToFilter, WHFilter
filter)
+    throws WHException
+  {
+    this.baseAccessor = baseAccessor;
+    this.relationshipToFilter = relationshipToFilter;
+    this.filter = filter;
+  }
+    
+  /** Are there any more rows?
+  */
+  public boolean hasNext()
+    throws WHException
+  {
+    if (nextRow == -1L)
+    {
+      // Locate the next row, or bump into the end.
+      nextRow = findNextRow();
+      if (nextRow == -1L)
+        return false;
+    }
+    return true;
+  }
+    
+  /** Read the next matching relationship row ID,
+  */
+  public long getNext()
+    throws WHException
+  {
+    if (nextRow == -1L)
+      nextRow = findNextRow();
+    long rval = nextRow;
+    nextRow = -1L;
+    return rval;
+  }
+
+  /** Find the next (filtered) row */
+  protected long findNextRow()
+    throws WHException
+  {
+    while (baseAccessor.hasNext())
+    {
+      long theRow = baseAccessor.getNext();
+      if (filter == null || filter.include(relationshipToFilter,theRow))
+        return theRow;
+    }
+    return -1L;
+  }
+}

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterAccessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterAccessor.java
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterRelationship.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterRelationship.java?rev=1203700&r1=1203699&r2=1203700&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterRelationship.java
(original)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/interfaces/FilterRelationship.java
Fri Nov 18 15:30:21 2011
@@ -54,69 +54,7 @@ public class FilterRelationship implemen
   public WHAccessor getAccessor()
     throws WHException
   {
-    return new FilterAccessor();
-  }
-  
-  /** Override this method to control filtering.
-  */
-  protected boolean include(long rowID)
-    throws WHException
-  {
-    return filterer.include(this,rowID);
-  }
-  
-  /** Accessor class that performs generic filtering.
-  */
-  protected class FilterAccessor implements WHAccessor
-  {
-    protected WHAccessor baseAccessor;
-    protected long nextRow = -1L;
-    
-    public FilterAccessor()
-      throws WHException
-    {
-      baseAccessor = relationshipToFilter.getAccessor();
-    }
-    
-    /** Are there any more rows?
-    */
-    public boolean hasNext()
-      throws WHException
-    {
-      if (nextRow == -1L)
-      {
-	// Locate the next row, or bump into the end.
-	nextRow = findNextRow();
-	if (nextRow == -1L)
-	  return false;
-      }
-      return true;
-    }
-    
-    /** Read the next matching relationship row ID,
-    */
-    public long getNext()
-      throws WHException
-    {
-      if (nextRow == -1L)
-	nextRow = findNextRow();
-      long rval = nextRow;
-      nextRow = -1L;
-      return rval;
-    }
-
-    /** Find the next (filtered) row */
-    protected long findNextRow()
-      throws WHException
-    {
-      while (baseAccessor.hasNext())
-      {
-	long theRow = baseAccessor.getNext();
-	if (include(theRow))
-	  return theRow;
-      }
-      return -1L;
-    }
+    return new FilterAccessor(relationshipToFilter.getAccessor(),this,filterer);
   }
   
 }

Modified: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/tablestore/Table.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/tablestore/Table.java?rev=1203700&r1=1203699&r2=1203700&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/tablestore/Table.java
(original)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/tablestore/Table.java
Fri Nov 18 15:30:21 2011
@@ -20,6 +20,7 @@
 package org.apache.warthog.tablestore;
 
 import org.apache.warthog.interfaces.*;
+import java.util.*;
 
 /** This is the standard implementation of WHTable.
 */
@@ -45,19 +46,21 @@ public class Table implements WHTable
     return columns;
   }
   
-  /** Compact the table.
-  */
-  public void compact()
-    throws WHException
-  {
-    // MHL
-  }
-  
   /** Insert a row */
   public void insertRow(String[] columns, WHValue[] values)
     throws WHException
   {
-    // MHL
+    // Allocate the row
+    long rowID = ts.allocateNewTableRow(tableName);
+    // Link in the row
+    ts.addTableRow(tableName,rowID);
+    // Set the values
+    for (int i = 0 ; i < columns.length ; i++)
+    {
+      ts.setTableColumnValue(tableName,rowID,columns[i],values[i]);
+    }
+    // Add to all indices
+    ts.addIndexRow(tableName,rowID,null);
   }
   
   /** Update row(s) */
@@ -65,30 +68,69 @@ public class Table implements WHTable
     WHAccessor accessor, WHFilter filter)
     throws WHException
   {
-    // MHL
+    // Convert the columns to a set, for convenience later
+    Set<String> columnSet = new HashSet<String>();
+    for (int i = 0 ; i < columns.length ; i++)
+    {
+      columnSet.add(columns[i]);
+    }
+    WHAccessor fa = new FilterAccessor(accessor,new TableRelationship(this),filter);
+    // Update the rows one at a time
+    while (fa.hasNext())
+    {
+      long rowID = fa.getNext();
+      
+      // Note: The problem with this loop is that I cannot guarantee that the accessor will
continue
+      // to work right after I delete and re-add the row to/from the indexes.  If the accessor
is an
+      // index accessor it may be the case that the changed row appears in the update list
at a new
+      // place after the modification, and thus can technically be visited more than once.
 We
+      // currently do nothing to detect such conditions or attempt to stop them.
+      
+      // Delete the row from all indexes which can potentially intersect the changes
+      ts.deleteIndexRow(tableName,rowID,columnSet);
+      // Change the values as specified
+      for (int i = 0 ; i < columns.length ; i++)
+      {
+        ts.setTableColumnValue(tableName,rowID,columns[i],values[i]);
+      }
+      // Add the row back to all indexes which can potentially intersect the changes
+      ts.addIndexRow(tableName,rowID,columnSet);
+    }
   }
     
   /** Delete row(s) */
   public void deleteRows(WHAccessor accessor, WHFilter filter)
     throws WHException
   {
-    // MHL
+    WHAccessor fa = new FilterAccessor(accessor,new TableRelationship(this),filter);
+    // Read the rows one at a time
+    while (fa.hasNext())
+    {
+      long rowID = fa.getNext();
+      // Delete the row from all indexes
+      ts.deleteIndexRow(tableName,rowID,null);
+      // Delete all values
+      for (int i = 0 ; i < columns.length ; i++)
+      {
+        ts.setTableColumnValue(tableName,rowID,columns[i],null);
+      }
+      // Unlink the row in the linked list
+      ts.removeTableRow(tableName,rowID);
+    }
   }
   
   /** Get the data for a given row and column. */
   public WHValue getValue(long rowID, String columnName)
     throws WHException
   {
-    // MHL
-    return null;
+    return ts.getTableColumnValue(tableName,rowID,columnName);
   }
 
   /** Build an accessor that simply scans all the rows in the table */
   public WHAccessor buildAccessor()
     throws WHException
   {
-    // MHL
-    return null;
+    return ts.buildTableAccessor(tableName);
   }
 
   /** Get name */
@@ -97,22 +139,4 @@ public class Table implements WHTable
     return tableName;
   }
   
-  // Protected methods
-  
-  /** Delete a specified table row from all indexes attached to the table.
-  */
-  protected void deleteRowFromIndexes(long rowID)
-    throws WHException
-  {
-    ts.deleteIndexRow(tableName,rowID);
-  }
-  
-  /** Add a specified table row to all indexes attached to the table.
-  */
-  protected void addRowToIndexes(long rowID)
-    throws WHException
-  {
-    ts.addIndexRow(tableName,rowID);
-  }
-  
 }
\ No newline at end of file

Modified: incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/tablestore/TableStore.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/tablestore/TableStore.java?rev=1203700&r1=1203699&r2=1203700&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/tablestore/TableStore.java
(original)
+++ incubator/lcf/branches/CONNECTORS-286/warthog/src/main/java/org/apache/warthog/tablestore/TableStore.java
Fri Nov 18 15:30:21 2011
@@ -195,28 +195,140 @@ public class TableStore implements WHTab
   
   /** Delete a table row from all the indexes that are based on a table.
   */
-  public void deleteIndexRow(String tableName, long rowID)
+  public void deleteIndexRow(String tableName, long rowID, Set<String> columns)
     throws WHException
   {
     Index[] indexes = findIndexes(tableName);
     for (int i = 0 ; i < indexes.length ; i++)
     {
-      indexes[i].deleteRow(rowID);
+      if (indexAffectedBy(indexes[i],columns))
+        indexes[i].deleteRow(rowID);
     }
   }
   
   /** Add a table row to all the indexes that are based on a table.
   */
-  public void addIndexRow(String tableName, long rowID)
+  public void addIndexRow(String tableName, long rowID, Set<String> columns)
     throws WHException
   {
     Index[] indexes = findIndexes(tableName);
     for (int i = 0 ; i < indexes.length ; i++)
     {
-      indexes[i].addNewRow(rowID);
+      if (indexAffectedBy(indexes[i],columns))
+        indexes[i].addNewRow(rowID);
     }
   }
   
+  protected static boolean indexAffectedBy(Index index, Set<String> columns)
+    throws WHException
+  {
+    if (columns == null)
+      return true;
+    String[] indexColumns = index.getColumnNames();
+    for (int j = 0 ; j < indexColumns.length ; j++)
+    {
+      if (columns.contains(indexColumns[j]))
+        return true;
+    }
+    return false;
+  }
+
+  /** Add a table row to the linked list for that table.
+  * We add it at the end, although technically we could just as well add at the
+  * beginning.
+  */
+  public void addTableRow(String tableName, long rowID)
+    throws WHException
+  {
+    TableTailKey tailKey = new TableTailKey(tableName);
+    LongValue tailValue = (LongValue)currentTransaction.get(tailKey);
+    if (tailValue == null)
+    {
+      // First row.  Set the head.
+      TableHeadKey headKey = new TableHeadKey(tableName);
+      currentTransaction.put(headKey,new LongValue(rowID));
+    }
+    else
+    {
+      // Set the next pointer for last row.
+      TableRowNextKey nextKey = new TableRowNextKey(tableName,tailValue.getValue());
+      currentTransaction.put(nextKey,new LongValue(rowID));
+    }
+    // Set the tail pointer to the new record
+    currentTransaction.put(tailKey,new LongValue(rowID));
+  }
+  
+  /** Remove a table row from the linked list for that table.
+  */
+  public void removeTableRow(String tableName, long rowID)
+    throws WHException
+  {
+    TableRowPrevKey prevKey = new TableRowPrevKey(tableName,rowID);
+    TableRowNextKey nextKey = new TableRowNextKey(tableName,rowID);
+    LongValue prevValue = (LongValue)currentTransaction.get(prevKey);
+    LongValue nextValue = (LongValue)currentTransaction.get(nextKey);
+    // Fix up the previous pointer reference
+    if (prevValue == null)
+    {
+      TableHeadKey headKey = new TableHeadKey(tableName);
+      currentTransaction.put(headKey,nextValue);
+    }
+    else
+    {
+      TableRowNextKey prevNextKey = new TableRowNextKey(tableName,prevValue.getValue());
+      currentTransaction.put(prevNextKey,nextValue);
+    }
+    // Fix up the next pointer reference
+    if (nextValue == null)
+    {
+      TableTailKey tailKey = new TableTailKey(tableName);
+      currentTransaction.put(tailKey,prevValue);
+    }
+    else
+    {
+      TableRowPrevKey nextPrevKey = new TableRowPrevKey(tableName,nextValue.getValue());
+      currentTransaction.put(nextPrevKey,prevValue);
+    }
+  }
+  
+  /** Set a table column value */
+  public void setTableColumnValue(String tableName, long rowID, String columnName, WHValue
value)
+    throws WHException
+  {
+    TableColumnKey key = new TableColumnKey(tableName,rowID,columnName);
+    currentTransaction.put(key,value);
+  }
+  
+  /** Get a table column value */
+  public WHValue getTableColumnValue(String tableName, long rowID, String columnName)
+    throws WHException
+  {
+    TableColumnKey key = new TableColumnKey(tableName,rowID,columnName);
+    return currentTransaction.get(key);
+  }
+  
+  /** Build a table accessor, which accesses all the table rows. */
+  public WHAccessor buildTableAccessor(String tableName)
+    throws WHException
+  {
+    // Read the head pointer
+    TableHeadKey headKey = new TableHeadKey(tableName);
+    LongValue headValue = (LongValue)currentTransaction.get(headKey);
+    return new TableAccessor(this,tableName,(headValue==null)?-1L:headValue.getValue());
+  }
+  
+  protected long readNextTableRowID(String tableName, long currentRowID)
+    throws WHException
+  {
+    // Read the row's next pointer
+    TableRowNextKey nextKey = new TableRowNextKey(tableName,currentRowID);
+    LongValue nextValue = (LongValue)currentTransaction.get(nextKey);
+    if (nextValue == null)
+      return -1L;
+    return nextValue.getValue();
+  }
+  
+  
   // Row and node number allocators.
   // Eventually we should be able to make it so that we allocate multiple row numbers or
node numbers
   // at a time, which persist cross transaction.  That would reduce the number of collisions
that happen
@@ -681,5 +793,94 @@ public class TableStore implements WHTab
 
   // Table row next pointer value is just LongValue
   
+  // Table column key
+  
+  protected static class TableColumnKey implements WHKey
+  {
+    protected String tableName;
+    protected long rowID;
+    protected String columnName;
+    
+    /** Constructor */
+    public TableColumnKey(String tableName, long rowID, String columnName)
+    {
+      this.tableName = tableName;
+      this.rowID = rowID;
+      this.columnName = columnName;
+    }
+    
+    public TableColumnKey(byte[] data)
+    {
+      BufferPointer bp = new BufferPointer(data);
+      this.tableName = StringKey.readObject(bp);
+      this.rowID = LongKey.readObject(bp);
+      this.columnName = StringKey.readObject(bp);
+    }
 
+    public byte[] serializeObject()
+    {
+      byte[] rval = new byte[StringKey.sizeObject(tableName)+
+        LongKey.sizeObject() + StringKey.sizeObject(columnName)];
+      BufferPointer bp = new BufferPointer(rval);
+      StringKey.writeObject(bp,tableName);
+      LongKey.writeObject(bp,rowID);
+      StringKey.writeObject(bp,columnName);
+      return rval;
+    }
+    
+    public long getHashCode()
+    {
+      return StringKey.calculateHashCode(tableName) + LongKey.calculateHashCode(rowID)
+        + StringKey.calculateHashCode(columnName);
+    }
+    
+    public boolean isEquals(WHKey o)
+    {
+      TableColumnKey key = (TableColumnKey)o;
+      return key.tableName.equals(tableName) &&
+        key.rowID == rowID &&
+        key.columnName.equals(columnName);
+    }
+  }
+
+  /** Table accessor. */
+  protected static class TableAccessor implements WHAccessor
+  {
+    protected TableStore ts;
+    protected String tableName;
+    /** This is the next row to be returned. */
+    protected long currentRowID;
+    
+    /** Constructor */
+    public TableAccessor(TableStore ts, String tableName, long firstRowID)
+      throws WHException
+    {
+      this.ts = ts;
+      this.tableName = tableName;
+      this.currentRowID = firstRowID;
+    }
+    
+    /** Are there any more rows?
+    */
+    public boolean hasNext()
+      throws WHException
+    {
+      return currentRowID != -1L;
+    }
+    
+    /** Read the next matching relationship row ID,
+    */
+    public long getNext()
+      throws WHException
+    {
+      long rval = currentRowID;
+      if (currentRowID != -1L)
+      {
+        currentRowID = ts.readNextTableRowID(tableName,currentRowID);
+      }
+      return rval;
+    }
+  }
+  
+  
 }



Mime
View raw message