hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r685258 [1/3] - in /hadoop/hbase/trunk: ./ src/examples/ src/examples/uploaders/ src/examples/uploaders/hbrep/ src/examples/uploaders/hbrep/Hbase/ src/java/org/apache/hadoop/hbase/io/
Date Tue, 12 Aug 2008 18:43:39 GMT
Author: stack
Date: Tue Aug 12 11:43:38 2008
New Revision: 685258

URL: http://svn.apache.org/viewvc?rev=685258&view=rev
Log:
HBASE-787 Postgresql to HBase table replication

Added:
    hadoop/hbase/trunk/src/examples/REAME.txt
    hadoop/hbase/trunk/src/examples/uploaders/
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/HBaseConnection.py
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/HBaseConsumer.py
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/Hbase/
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/Hbase/Hbase-remote   (with props)
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/Hbase/Hbase.py
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/Hbase/__init__.py
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/Hbase/constants.py
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/Hbase/ttypes.py
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/README
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/__init__.py
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/bootstrap.py
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/hbrep.ini
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/hbrep.py   (with props)
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/pgq.ini
    hadoop/hbase/trunk/src/examples/uploaders/hbrep/tablemapping.py
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseMapWritable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/RowResult.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=685258&r1=685257&r2=685258&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Tue Aug 12 11:43:38 2008
@@ -25,6 +25,8 @@
    HBASE-812  Compaction needs little better skip algo (Daniel Leffel via Stack)
 
   NEW FEATURES
+   HBASE-787  Postgresql to HBase table replication example
+
   OPTIMIZATIONS
 
 Release 0.2.0 - August 8, 2008.

Added: hadoop/hbase/trunk/src/examples/REAME.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/examples/REAME.txt?rev=685258&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/examples/REAME.txt (added)
+++ hadoop/hbase/trunk/src/examples/REAME.txt Tue Aug 12 11:43:38 2008
@@ -0,0 +1,2 @@
+Example code.  Includes thrift clients and uploader examples including
+a script to replicate a postgres database in hbase by Tim Sell.

Added: hadoop/hbase/trunk/src/examples/uploaders/hbrep/HBaseConnection.py
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/examples/uploaders/hbrep/HBaseConnection.py?rev=685258&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/examples/uploaders/hbrep/HBaseConnection.py (added)
+++ hadoop/hbase/trunk/src/examples/uploaders/hbrep/HBaseConnection.py Tue Aug 12 11:43:38
2008
@@ -0,0 +1,39 @@
+import sys, os
+
+from Hbase.ttypes import *
+from Hbase import Hbase
+
+from thrift import Thrift
+from thrift.transport import TSocket, TTransport
+from thrift.protocol import TBinaryProtocol
+
+class HBaseConnection:
+  def __init__(self, hostname, port):
+    # Make socket
+    self.transport = TSocket.TSocket(hostname, port)
+    # Buffering is critical. Raw sockets are very slow
+    self.transport = TTransport.TBufferedTransport(self.transport)
+    # Wrap in a protocol
+    self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
+    # Create a client to use the protocol encoder
+    self.client = Hbase.Client(self.protocol)
+ 
+  def connect(self):  
+    self.transport.open()
+   
+  def disconnect(self):
+    self.transport.close()
+    
+  def validate_column_descriptors(self, table_name, column_descriptors):
+    hbase_families = self.client.getColumnDescriptors(table_name)
+    for col_desc in column_descriptors:
+      family, column = col_desc.split(":")
+      if not family in hbase_families:
+        raise Exception("Invalid column descriptor \"%s\" for hbase table \"%s\"" % (col_desc,table_name))
+      
+  def validate_table_name(self, table_name):
+    if not table_name in self.client.getTableNames():
+      raise Exception("hbase table '%s' not found." % (table_name))
+    
+  
+  
\ No newline at end of file

Added: hadoop/hbase/trunk/src/examples/uploaders/hbrep/HBaseConsumer.py
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/examples/uploaders/hbrep/HBaseConsumer.py?rev=685258&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/examples/uploaders/hbrep/HBaseConsumer.py (added)
+++ hadoop/hbase/trunk/src/examples/uploaders/hbrep/HBaseConsumer.py Tue Aug 12 11:43:38 2008
@@ -0,0 +1,90 @@
+import sys, os, pgq, skytools, ConfigParser
+
+from thrift import Thrift
+from thrift.transport import TSocket, TTransport
+from thrift.protocol import TBinaryProtocol
+
+from HBaseConnection import *
+import tablemapping
+
+INSERT = 'I'
+UPDATE = 'U'
+DELETE = 'D'
+
+class HBaseConsumer(pgq.Consumer):
+  """HBaseConsumer is a pgq.Consumer that sends processed events to hbase as mutations."""
+  
+  def __init__(self, service_name, args):
+    pgq.Consumer.__init__(self, service_name, "postgresql_db", args)
+    
+    config_file = self.args[0]
+    if len(self.args) < 2:
+      print "need table names"
+      sys.exit(1)
+    else:
+      self.table_names = self.args[1:]
+      
+    #just to check this option exists
+    self.cf.get("postgresql_db")
+    
+    self.max_batch_size = int(self.cf.get("max_batch_size", "10000"))
+    self.hbase_hostname = self.cf.get("hbase_hostname", "localhost")
+    self.hbase_port = int(self.cf.get("hbase_port", "9090"))
+    self.row_limit = int(self.cf.get("bootstrap_row_limit", 0))
+    self.table_mappings = tablemapping.load_table_mappings(config_file, self.table_names)
+  
+  def process_batch(self, source_db, batch_id, event_list):
+    try:
+      self.log.debug("processing batch %s" % (batch_id))
+      hbase = HBaseConnection(self.hbase_hostname, self.hbase_port)
+      try:
+        self.log.debug("Connecting to HBase")
+        hbase.connect()
+              
+        i = 0L
+        for event in event_list:
+          i = i+1
+          self.process_event(event, hbase)
+        print "%i events processed" % (i)
+        
+      except Exception, e:
+        #self.log.info(e)
+        sys.exit(e)
+      
+    finally:
+      hbase.disconnect()
+  
+  def process_event(self, event, hbase):
+    if event.ev_extra1 in self.table_mappings:
+      table_mapping = self.table_mappings[event.ev_extra1]
+    else:
+      self.log.info("table name not found in config, skipping event")
+      return
+    #hbase.validate_table_name(table_mapping.hbase_table_name)
+    #hbase.validate_column_descriptors(table_mapping.hbase_table_name, table_mapping.hbase_column_descriptors)
+    event_data = skytools.db_urldecode(event.data)
+    event_type = event.type.split(':')[0]
+    
+    batch = BatchMutation()
+    batch.row = table_mapping.hbase_row_prefix + str(event_data[table_mapping.psql_key_column])
+        
+    batch.mutations = []
+    for psql_column, hbase_column in zip(table_mapping.psql_columns, table_mapping.hbase_column_descriptors):
+      if event_type == INSERT or event_type == UPDATE:
+        m = Mutation()
+        m.column = hbase_column
+        m.value = str(event_data[psql_column])
+      elif event_type == DELETE:
+        # delete this column entry
+        m = Mutation()
+        m.isDelete = True
+        m.column = hbase_column
+      else:
+        raise Exception("Invalid event type: %s, event data was: %s" % (event_type, str(event_data)))
+      batch.mutations.append(m)
+    hbase.client.mutateRow(table_mapping.hbase_table_name, batch.row, batch.mutations)
+    event.tag_done()
+    
+if __name__ == '__main__':
+  script = HBaseConsumer("HBaseReplic",sys.argv[1:])
+  script.start()

Added: hadoop/hbase/trunk/src/examples/uploaders/hbrep/Hbase/Hbase-remote
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/examples/uploaders/hbrep/Hbase/Hbase-remote?rev=685258&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/examples/uploaders/hbrep/Hbase/Hbase-remote (added)
+++ hadoop/hbase/trunk/src/examples/uploaders/hbrep/Hbase/Hbase-remote Tue Aug 12 11:43:38
2008
@@ -0,0 +1,247 @@
+#!/usr/bin/env python
+#
+# Autogenerated by Thrift
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+
+import sys
+import pprint
+from urlparse import urlparse
+from thrift.transport import TTransport
+from thrift.transport import TSocket
+from thrift.transport import THttpClient
+from thrift.protocol import TBinaryProtocol
+
+import Hbase
+from ttypes import *
+
+if len(sys.argv) <= 1 or sys.argv[1] == '--help':
+  print ''
+  print 'Usage: ' + sys.argv[0] + ' [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]'
+  print ''
+  print 'Functions:'
+  print '   getTableNames()'
+  print '   getColumnDescriptors(Text tableName)'
+  print '   getTableRegions(Text tableName)'
+  print '  void createTable(Text tableName,  columnFamilies)'
+  print '  void deleteTable(Text tableName)'
+  print '  Bytes get(Text tableName, Text row, Text column)'
+  print '   getVer(Text tableName, Text row, Text column, i32 numVersions)'
+  print '   getVerTs(Text tableName, Text row, Text column, i64 timestamp, i32 numVersions)'
+  print '   getRow(Text tableName, Text row)'
+  print '   getRowTs(Text tableName, Text row, i64 timestamp)'
+  print '  void put(Text tableName, Text row, Text column, Bytes value)'
+  print '  void mutateRow(Text tableName, Text row,  mutations)'
+  print '  void mutateRowTs(Text tableName, Text row,  mutations, i64 timestamp)'
+  print '  void mutateRows(Text tableName,  rowBatches)'
+  print '  void mutateRowsTs(Text tableName,  rowBatches, i64 timestamp)'
+  print '  void deleteAll(Text tableName, Text row, Text column)'
+  print '  void deleteAllTs(Text tableName, Text row, Text column, i64 timestamp)'
+  print '  void deleteAllRow(Text tableName, Text row)'
+  print '  void deleteAllRowTs(Text tableName, Text row, i64 timestamp)'
+  print '  ScannerID scannerOpen(Text tableName, Text startRow,  columns)'
+  print '  ScannerID scannerOpenWithStop(Text tableName, Text startRow, Text stopRow,  columns)'
+  print '  ScannerID scannerOpenTs(Text tableName, Text startRow,  columns, i64 timestamp)'
+  print '  ScannerID scannerOpenWithStopTs(Text tableName, Text startRow, Text stopRow, 
columns, i64 timestamp)'
+  print '  ScanEntry scannerGet(ScannerID id)'
+  print '  void scannerClose(ScannerID id)'
+  print ''
+  sys.exit(0)
+
+pp = pprint.PrettyPrinter(indent = 2)
+host = 'localhost'
+port = 9090
+uri = ''
+framed = False
+http = False
+argi = 1
+
+if sys.argv[argi] == '-h':
+  parts = sys.argv[argi+1].split(':') 
+  host = parts[0]
+  port = int(parts[1])
+  argi += 2
+
+if sys.argv[argi] == '-u':
+  url = urlparse(sys.argv[argi+1])
+  parts = url[1].split(':') 
+  host = parts[0]
+  if len(parts) > 1:
+    port = int(parts[1])
+  else:
+    port = 80
+  uri = url[2]
+  http = True
+  argi += 2
+
+if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed':
+  framed = True
+  argi += 1
+
+cmd = sys.argv[argi]
+args = sys.argv[argi+1:]
+
+if http:
+  transport = THttpClient.THttpClient(host, port, uri)
+else:
+  socket = TSocket.TSocket(host, port)
+  if framed:
+    transport = TTransport.TFramedTransport(socket)
+  else:
+    transport = TTransport.TBufferedTransport(socket)
+protocol = TBinaryProtocol.TBinaryProtocol(transport)
+client = Hbase.Client(protocol)
+transport.open()
+
+if cmd == 'getTableNames':
+  if len(args) != 0:
+    print 'getTableNames requires 0 args'
+    sys.exit(1)
+  pp.pprint(client.getTableNames())
+
+elif cmd == 'getColumnDescriptors':
+  if len(args) != 1:
+    print 'getColumnDescriptors requires 1 args'
+    sys.exit(1)
+  pp.pprint(client.getColumnDescriptors(eval(args[0]),))
+
+elif cmd == 'getTableRegions':
+  if len(args) != 1:
+    print 'getTableRegions requires 1 args'
+    sys.exit(1)
+  pp.pprint(client.getTableRegions(eval(args[0]),))
+
+elif cmd == 'createTable':
+  if len(args) != 2:
+    print 'createTable requires 2 args'
+    sys.exit(1)
+  pp.pprint(client.createTable(eval(args[0]),eval(args[1]),))
+
+elif cmd == 'deleteTable':
+  if len(args) != 1:
+    print 'deleteTable requires 1 args'
+    sys.exit(1)
+  pp.pprint(client.deleteTable(eval(args[0]),))
+
+elif cmd == 'get':
+  if len(args) != 3:
+    print 'get requires 3 args'
+    sys.exit(1)
+  pp.pprint(client.get(eval(args[0]),eval(args[1]),eval(args[2]),))
+
+elif cmd == 'getVer':
+  if len(args) != 4:
+    print 'getVer requires 4 args'
+    sys.exit(1)
+  pp.pprint(client.getVer(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
+
+elif cmd == 'getVerTs':
+  if len(args) != 5:
+    print 'getVerTs requires 5 args'
+    sys.exit(1)
+  pp.pprint(client.getVerTs(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),eval(args[4]),))
+
+elif cmd == 'getRow':
+  if len(args) != 2:
+    print 'getRow requires 2 args'
+    sys.exit(1)
+  pp.pprint(client.getRow(eval(args[0]),eval(args[1]),))
+
+elif cmd == 'getRowTs':
+  if len(args) != 3:
+    print 'getRowTs requires 3 args'
+    sys.exit(1)
+  pp.pprint(client.getRowTs(eval(args[0]),eval(args[1]),eval(args[2]),))
+
+elif cmd == 'put':
+  if len(args) != 4:
+    print 'put requires 4 args'
+    sys.exit(1)
+  pp.pprint(client.put(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
+
+elif cmd == 'mutateRow':
+  if len(args) != 3:
+    print 'mutateRow requires 3 args'
+    sys.exit(1)
+  pp.pprint(client.mutateRow(eval(args[0]),eval(args[1]),eval(args[2]),))
+
+elif cmd == 'mutateRowTs':
+  if len(args) != 4:
+    print 'mutateRowTs requires 4 args'
+    sys.exit(1)
+  pp.pprint(client.mutateRowTs(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
+
+elif cmd == 'mutateRows':
+  if len(args) != 2:
+    print 'mutateRows requires 2 args'
+    sys.exit(1)
+  pp.pprint(client.mutateRows(eval(args[0]),eval(args[1]),))
+
+elif cmd == 'mutateRowsTs':
+  if len(args) != 3:
+    print 'mutateRowsTs requires 3 args'
+    sys.exit(1)
+  pp.pprint(client.mutateRowsTs(eval(args[0]),eval(args[1]),eval(args[2]),))
+
+elif cmd == 'deleteAll':
+  if len(args) != 3:
+    print 'deleteAll requires 3 args'
+    sys.exit(1)
+  pp.pprint(client.deleteAll(eval(args[0]),eval(args[1]),eval(args[2]),))
+
+elif cmd == 'deleteAllTs':
+  if len(args) != 4:
+    print 'deleteAllTs requires 4 args'
+    sys.exit(1)
+  pp.pprint(client.deleteAllTs(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
+
+elif cmd == 'deleteAllRow':
+  if len(args) != 2:
+    print 'deleteAllRow requires 2 args'
+    sys.exit(1)
+  pp.pprint(client.deleteAllRow(eval(args[0]),eval(args[1]),))
+
+elif cmd == 'deleteAllRowTs':
+  if len(args) != 3:
+    print 'deleteAllRowTs requires 3 args'
+    sys.exit(1)
+  pp.pprint(client.deleteAllRowTs(eval(args[0]),eval(args[1]),eval(args[2]),))
+
+elif cmd == 'scannerOpen':
+  if len(args) != 3:
+    print 'scannerOpen requires 3 args'
+    sys.exit(1)
+  pp.pprint(client.scannerOpen(eval(args[0]),eval(args[1]),eval(args[2]),))
+
+elif cmd == 'scannerOpenWithStop':
+  if len(args) != 4:
+    print 'scannerOpenWithStop requires 4 args'
+    sys.exit(1)
+  pp.pprint(client.scannerOpenWithStop(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
+
+elif cmd == 'scannerOpenTs':
+  if len(args) != 4:
+    print 'scannerOpenTs requires 4 args'
+    sys.exit(1)
+  pp.pprint(client.scannerOpenTs(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
+
+elif cmd == 'scannerOpenWithStopTs':
+  if len(args) != 5:
+    print 'scannerOpenWithStopTs requires 5 args'
+    sys.exit(1)
+  pp.pprint(client.scannerOpenWithStopTs(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),eval(args[4]),))
+
+elif cmd == 'scannerGet':
+  if len(args) != 1:
+    print 'scannerGet requires 1 args'
+    sys.exit(1)
+  pp.pprint(client.scannerGet(eval(args[0]),))
+
+elif cmd == 'scannerClose':
+  if len(args) != 1:
+    print 'scannerClose requires 1 args'
+    sys.exit(1)
+  pp.pprint(client.scannerClose(eval(args[0]),))
+
+transport.close()

Propchange: hadoop/hbase/trunk/src/examples/uploaders/hbrep/Hbase/Hbase-remote
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message