From triplesoup-commits-return-56-apmail-incubator-triplesoup-commits-archive=incubator.apache.org@incubator.apache.org Fri Apr 13 08:57:53 2007 Return-Path: Delivered-To: apmail-incubator-triplesoup-commits-archive@locus.apache.org Received: (qmail 40171 invoked from network); 13 Apr 2007 08:57:41 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 13 Apr 2007 08:57:41 -0000 Received: (qmail 70005 invoked by uid 500); 13 Apr 2007 08:57:42 -0000 Delivered-To: apmail-incubator-triplesoup-commits-archive@incubator.apache.org Received: (qmail 69977 invoked by uid 500); 13 Apr 2007 08:57:41 -0000 Mailing-List: contact triplesoup-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: triplesoup-dev@incubator.apache.org Delivered-To: mailing list triplesoup-commits@incubator.apache.org Received: (qmail 69943 invoked by uid 99); 13 Apr 2007 08:57:41 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Apr 2007 01:57:41 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Apr 2007 01:57:29 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 649DF1A9850; Fri, 13 Apr 2007 01:56:43 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r528394 [6/35] - in /incubator/triplesoup/donations/TRIPLES-3-RDFStore: ./ dbms/ dbms/client/ dbms/client/t/ dbms/dbmsproxy/ dbms/deamon/ dbms/doc/ dbms/include/ dbms/libdbms/ dbms/utils/ doc/ include/ lib/ lib/DBD/ lib/RDFStore/ lib/RDFSto... Date: Fri, 13 Apr 2007 08:56:16 -0000 To: triplesoup-commits@incubator.apache.org From: leosimons@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070413085643.649DF1A9850@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/conf.h URL: http://svn.apache.org/viewvc/incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/conf.h?view=auto&rev=528394 ============================================================================== --- incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/conf.h (added) +++ incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/conf.h Fri Apr 13 01:56:01 2007 @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2000-2006 Alberto Reggiori + * Dirk-Willem van Gulik + * + * NOTICE + * + * This product is distributed under a BSD/ASF like license as described in the 'LICENSE' + * file you should have received together with this source code. If you did not get a + * a copy of such a license agreement you can pick up one at: + * + * http://rdfstore.sourceforge.net/LICENSE + * + * + * $Id: conf.h,v 1.6 2006/06/19 10:10:22 areggiori Exp $ + */ +#ifndef _H_CONF +#define _H_CONF + +typedef enum opstypes { + T_ERR, T_NONE, T_RDONLY, T_RDWR, T_CREAT, T_DROP, T_ALL +} tops; + +extern const char * op2string(tops op); /* Translate operation level into a string */ +extern tops allowed_ops(u_long ip); /* Return max operations level in dbase for given IP */ +extern tops allowed_ops_on_dbase(u_long ip, char *db); /* Return max operations level in dbase for given IP and db */ +extern const char * parse_config(char * configfile); /* Parse a config file or stdin on '-'. return NULL or an error */ +#endif Added: incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbms-allow-all.conf URL: http://svn.apache.org/viewvc/incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbms-allow-all.conf?view=auto&rev=528394 ============================================================================== --- incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbms-allow-all.conf (added) +++ incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbms-allow-all.conf Fri Apr 13 01:56:01 2007 @@ -0,0 +1,13 @@ +# Baseline reject + + order allow,deny + deny all from all + + +# And then just allow from all +# to make testing possible. +# + + order deny, allow + allow all from all + Added: incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbms.conf URL: http://svn.apache.org/viewvc/incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbms.conf?view=auto&rev=528394 ============================================================================== --- incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbms.conf (added) +++ incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbms.conf Fri Apr 13 01:56:01 2007 @@ -0,0 +1,89 @@ +# This is a comment, I love comments. +# +# BNF: +# File +# '<' '>' block ' '>' +# NAME +# ::= \w+ +# Block +# [by] , +# [operation] [from] +# ops +# none | rdonly | rdwr | all +# spec +# IP [ [>netmask>] | '/' ] +# FQHN [ [] | '/' ] +# 'all' +# mask +# IP | FQHN +# IP +# dotted quad +# FQHN +# anything DNS +# len +# 0 .. 32 +# + + +# test blank lines + + + +# test record +# + + # Order allow,deny or deny,allow + order deny,allow + # operation levels + # none nothng allowed + # rdonly just read allowed + # rdwr read and write allowed + # all read, write and create allowed. + # + # + deny all from all + allow rdonly from all + allow operation rdwr from 127.0.0.1 + allow all from 10.0.1.2/8 + allow none from 10.0.1.2/1 + allow rdonly from 10.0.1.2/27 + + +# Base line which gets ALWAYS applied +# + + order allow,deny + deny all from all + + + + order allow,deny + allow all from all + deny rdwr from 1.2.3.4/16 + + + + deny all from all + + + + allow all from all + + +# Fall through which ONLY gets applied if there +# are no specific dbase+IP rule was defined. +# + + order deny,allow + allow rdonly from all + + + + order allow,deny + allow rdonly from all + allow rdwr from 9.8.7.6 + allow all from 4.5.6.7/24 + #deny all from www.news.com + allow drop from 127.0.0.1 + + Added: incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmscheckconf.8 URL: http://svn.apache.org/viewvc/incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmscheckconf.8?view=auto&rev=528394 ============================================================================== --- incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmscheckconf.8 (added) +++ incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmscheckconf.8 Fri Apr 13 01:56:01 2007 @@ -0,0 +1,11 @@ +Config checker + +Usage + + ./dbmscheckconf + +or + ./dbmscheckconf 'dbase' 'ip|host' ... + +and for each dbase and ip or hostname the xs control will be evaluated. + Added: incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmsd.8 URL: http://svn.apache.org/viewvc/incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmsd.8?view=auto&rev=528394 ============================================================================== --- incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmsd.8 (added) +++ incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmsd.8 Fri Apr 13 01:56:01 2007 @@ -0,0 +1,107 @@ +.\"\* +.\"\* Copyright (c) 2000-2006 Alberto Reggiori +.\"\* Dirk-Willem van Gulik +.\"\* +.\"\* NOTICE +.\"\* +.\"\* This product is distributed under a BSD/ASF like license as described in the 'LICENSE' +.\"\* file you should have received together with this source code. If you did not get a +.\"\* a copy of such a license agreement you can pick up one at: +.\"\* +.\"\* http://rdfstore.sourceforge.net/LICENSE +.\"\* +.\"\* Perl 'tie' interface to a socket connection. Possibly to +.\"\* the a server which runs a thin feneer to the Berkely DB. +.\"\* +.\"\* Based on DB_File, which came with perl and UKD which +.\"\* came with CILS. +.\"\*/ +.\" +.Dd November, 2000 +.Dt DBMSD 8 +.Os +.Sh NAME +.Nm dbmsd +.Nd remote +.Tn DB +server +.Sh SYNOPSIS +.Nm dbmsd +.Op Fl x +.Op Fl v +.Op Fl t +.Op Fl d +.Op Fl d Arg dbase directory +.Op Fl p Arg port number +.Op Fl p Arg address to bind to +.Op Fl c configfile +.Op Fl C configfile +.Op Fl U +.Op Fl u userid or username +.Sh DESCRIPTION +.Nm dbmsd +runs on a server machine to service +.Tn DB +requests from client machines. Mainly a perl library +.Pp +.Pp +The following options are available: +.Bl -tag -width Ds +.It Fl v +Just print the version number, and exit. +.It Fl X +Run in debug mode, does not fork, does not detatch. +.It Fl t +Simple command tracing; to stdout +.It Fl d directory +Specifies the prefix for the directory to create the *.db files in. When compiled +with HASHING set; the +.Nm dbmsd +will in fact create another level of directories below this. +.It Fl p Arg port +Specifies the port number. +.It Fl b addresss +Specifies the address to bind the server to. If none is specified the server will listen on +all addresses (INADDR_ANY). +.It Fl c configfile +Use spefified config file (use the -v flag to see default config file). Or use '-' for stdin. +.It Fl C configfile +Check the specified config file and exit immediately with ok/not-ok. See also +,Xref 8 dbmscheckconf +.It Fl u userid or username. +Specifies as which user the dbm(s) will be accesses. For sanity +and security reasons, the server +uses +.Xr setuid 2 +to change to that userid (or username) as soon as the relevant +ports are opened, the loggin is startend and, for forking servers, +the dbmsd.pid file is writen. If not specified the default +.Dq nobdoy +is used. +.It Fl U +Specifies no user ID change; the server will run from the user ID +is was started from (usually root). This counters the +.Dq -u +flag above. +.El +.Pp +For example, +.Dq Li "dbmsd -p 1234 -d /tmp" +Caused the server to listen to port 1234 (the default) and create +its Berkely DB files in /tmp (the default). +.Pp +The +.Nm dbmsd +utility exits 0 on success, and >0 if an error occurs. +.Sh SEE ALSO +.Xr DB 3 , +.Xr Perl 1 +and +.Xr perltie 1 +.Sh Author +Dirk-Willem van Gulik, Alberto Reggiori at STA/ISIS, Joint Research Center Ispra +for the ParlEuNet project. +.Sh HISTORY +The +.Nm dbmsd +Started live with the ParlEuNet project. Added: incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmsd.h URL: http://svn.apache.org/viewvc/incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmsd.h?view=auto&rev=528394 ============================================================================== --- incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmsd.h (added) +++ incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/dbmsd.h Fri Apr 13 01:56:01 2007 @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2000-2006 Alberto Reggiori + * Dirk-Willem van Gulik + * + * NOTICE + * + * This product is distributed under a BSD/ASF like license as described in the 'LICENSE' + * file you should have received together with this source code. If you did not get a + * a copy of such a license agreement you can pick up one at: + * + * http://rdfstore.sourceforge.net/LICENSE + * + * + * $Id: dbmsd.h,v 1.13 2006/06/19 10:10:22 areggiori Exp $ + */ +#ifndef _H_DBMSD +#define _H_DBMSD + +#include "dbms.h" +#include "dbms_compat.h" +#include "dbms_comms.h" +#include "deamon.h" + +#ifdef RDFSTORE_DBMS_DEBUG_TIME +extern float total_time; +#endif + +extern connection * client_list, *mum; +extern struct child_rec * children; +extern fd_set rset,wset,eset,alleset,allrset,allwset; +extern char * default_dir; +extern char * dir; +extern int sockfd,maxfd,mum_pgid,mum_pid,max_dbms,max_processes,max_clients; +extern char * my_dir; +extern char * pid_file; +extern char * conf_file; +extern int check_children; +extern dbase * first_dbp; + +void select_loop(); + +/* Some reasonable limit, to avoid running out of + * all sorts of resources, such as file descriptors + * and all that.. + */ +#define MAX_CLIENT 2048 + +/* An absolute limit, above this limit, connections + * are no longer accepted, and simply dropped without + * as much as an error. + */ +#define HARD_MAX_CLIENTS MAX_CLIENT+5 +#define HARD_MAX_DBASE 256 + +/* hard number for the total number of DBMS-es we + * are willing to server (in total) + */ + +#define MAX_DBMS_CHILD 256 +#define MAX_CHILD 32 +#define MAX_DBMS (MAX_DBMS_CHILD * MAX_CHILD) + + +#define SERVER_NAME "DBMS-Dirkx/3.00" + +#define SERVER 1 +#define CLIENT 0 + +/* some connection types... */ +#define C_UNK 0 +#define C_MUM 1 +#define C_CLIENT 2 +#define C_NEW_CLIENT 3 +#define C_CHILD 4 +#define C_LEGACY 5 + +struct child_rec * create_new_child(void); +int handoff_fd( struct child_rec * child, connection * r ); +int takeon_fd(int conn_fd); +connection * handle_new_local_connection( int sockfd , int type); +connection * handle_new_connection( int sockfd , int type, struct sockaddr_in addr); + +#define MX dbms_log(L_DEBUG,"@@ %s:%d",__FILE__,__LINE__); +#endif Added: incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/deamon.c URL: http://svn.apache.org/viewvc/incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/deamon.c?view=auto&rev=528394 ============================================================================== --- incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/deamon.c (added) +++ incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/deamon.c Fri Apr 13 01:56:01 2007 @@ -0,0 +1,746 @@ +/* + * Copyright (c) 2000-2006 Alberto Reggiori + * Dirk-Willem van Gulik + * + * NOTICE + * + * This product is distributed under a BSD/ASF like license as described in the 'LICENSE' + * file you should have received together with this source code. If you did not get a + * a copy of such a license agreement you can pick up one at: + * + * http://rdfstore.sourceforge.net/LICENSE + * + * + * $Id: deamon.c,v 1.26 2006/06/19 10:10:22 areggiori Exp $ + */ + +#include "dbms.h" +#include "dbms_compat.h" +#include "dbms_comms.h" +#include "dbmsd.h" + +#include "deamon.h" +#include "handler.h" +#include "mymalloc.h" + +static int going_down = 0; +int client_counter = 0; /* XX static perhaps */ + +#ifdef STATIC_BUFF +static connection * free_connection_list = NULL; +static int free_connection_keep = 2; +static int free_connection_keep_max = 4; +static int free_connection_len = 0; +#endif + +#define PTOK { int i; for(i=0; icmd.token ) { dbms_log(L_DEBUG,"Token at %s:%d %s s=%d,%d",__FILE__,__LINE__, cmd_table[i].info,r->cmd.len1,r->cmd.len2); break; }; if(i>=sizeof(cmd_table) / sizeof(struct command_req)) dbms_log(L_DEBUG,"Token at %s:%d %s %d,%d",__FILE__,__LINE__, "**UNKOWN**",r->cmd.len1,r->cmd.len2); } + +void +close_connection ( connection * r ) +{ + /* assert(r->clientfd); */ + FD_CLR(r->clientfd,&allwset); + FD_CLR(r->clientfd,&allrset); + FD_CLR(r->clientfd,&alleset); + close(r->clientfd); + r->clientfd = 0; + + /* shutdown(r->clientfd,2); */ + + if (r->sendbuff != NULL) { + myfree(r->sendbuff); + r->sendbuff = NULL; + }; + + if (r->recbuff != NULL) { + myfree(r->recbuff); + r->recbuff = NULL; + }; + + r->send = r->tosend = 0; + r->gotten = r->toget = 0; + + r->close = 2; MX; + +#ifdef STATIC_BUFF + if (free_connection_len < free_connection_keep) { + r->next = free_connection_list; + /* assert( free_connection_list != r ); */ + free_connection_list = r; + free_connection_len++; + } else +#endif + myfree(r); + + client_counter --; + + return; +} + +void free_connection (connection * r ) { + + if (r->type == C_MUM) { + /* connection to the mother lost.. we _must_ exit now.. */ + if (!going_down) + dbms_log(L_FATAL,"Mamma has died.. suicide time for child, fd=%d",r->clientfd); + cleandown(0); + } else + if (r->type == C_CLIENT) { + if (r->dbp) { + /* check if this is the last child using + * a certain database, and clean if such is + * indeed the case.. + */ + r->dbp->num_cls --; + if (r->dbp->num_cls<=0) { + dbms_log(L_INFORM,"Child was the last one to use %s, closing", + r->dbp->pfile); + r->dbp->close = 1; MX; + }; + dbms_log(L_DEBUG,"Connection to client closed"); + } + else { + dbms_log(L_WARN,"C_Client marked with no database ?"); + } + } else + if (r->type == C_NEW_CLIENT) { + /* tough.. but that is about it.. or shall we try + * to send a message... + * + */ + dbms_log(L_ERROR,"New child closing.. but then what ?"); + } else + if (r->type == C_LEGACY) { + dbms_log(L_DEBUG,"Legacy close"); + } else + if (r->type == C_CHILD) { +#ifdef FORKING + dbase * p=0; + child_rec * c=NULL; + /* we lost a connection to a child.. so try to kill + * it and forget about it... + * work out which databases are handled by this child.. + */ + for(p=first_dbp;p;p=p->nxt) + if (p->handled_by->r == r) { + if ((c) && (p->handled_by != c)) + dbms_log(L_ERROR,"More than one child pointer ?"); + p->close = 1; MX; + c = p->handled_by; + }; + if (c==NULL) { + dbms_log(L_ERROR,"Child died, but no record.."); + } + else { + /* overkill, as we do not even wait for the + * child to be clean up after itself. + */ + c->close = 1; MX; + if (kill(c->pid,0) == 0) { + /* so the child is still alive ? */ + dbms_log(L_DEBUG,"Sending kill signal (%d) to %d", + SIGTERM,c->pid); + kill(c->pid,SIGTERM); + }; + }; +#else + dbms_log(L_ERROR,"We are non forking, but still see a C_CHILD"); +#endif + } + else { + dbms_log(L_ERROR,"Zapping a rather unkown connection type?"); + }; + + r->type = C_UNK; + close_connection(r); + return; +} + +void zap ( connection * r ) { + connection * * p; + + for ( p = &client_list ; *p && *p != r; ) + p = &((*p)->next); + + if ( *p == NULL) { + dbms_log(L_ERROR,"Connection to zap not found"); + return; + }; + + *p = r->next; + free_connection(r); + } + + +void cleandown( int signo ) +{ + if (going_down) + dbms_log(L_ERROR,"Re-entry of cleandown()."); + + going_down = 1; + + shutdown(sockfd,2); + close(sockfd); + + /* send a kill to my children + */ + if (!mum_pid) + kill(SIGTERM,0); + + close_all_dbps(); +#ifdef FORKING + clean_children(); +#endif +{ + connection * r; + for(r=client_list; r;) { + connection * q; + q = r; r=r->next; + assert(q != r); + close_connection(q); + } +} + + /* close connection to mother */ + if (mum) + close_connection(mum); + + +#ifdef RDFSTORE_DBMS_DEBUG_MALLOC + debug_malloc_dump(stderr); +#endif +#ifdef RDFSTORE_DBMS_DEBUG_TIME + fprintf(stderr,"Timedebug: Time waiting=%f, handling=%f network=%f\n",.1,.2,.3); +#endif + if (!mum_pid) + unlink(pid_file); + + dbms_log(L_WARN,"Shutdown completed"); + exit(0); + } + +void continue_send( connection * r ) { + int s; + + if ((r->tosend==0) || ( r->send >= r->tosend)) { + dbms_log(L_ERROR,"How did we get here ?"); + r->close=1; MX; + return; + }; + + s = write(r->clientfd,r->sendbuff+r->send,r->tosend - r->send); + + if ((s<=0) && (errno == EINTR)) { + dbms_log(L_INFORM,"Continued send interrupted. Retry."); + return; + } + else + if ((s<0) && (errno == EAGAIN)) { + dbms_log(L_WARN,"Continued send would still block"); + return; + } + else + if (s<0) { + dbms_log(L_ERROR,"Failed to continue write %s",strerror(errno)); + r->close=1; + return; + } + else + if (s==0) { + dbms_log(L_ERROR,"Client closed the connection on us"); + r->close=1; + return; + } + + r->send += s; + if ( r->send < r->tosend ) + return; + + r->send = r->tosend = 0; + +#ifndef STATIC_SC_BUFF + if (r->sendbuff) + myfree( r->sendbuff ); + r->sendbuff = NULL; +#endif + + FD_CLR(r->clientfd,&allwset); + return; +} + +void dispatch( connection * r, int token, DBT * v1, DBT * v2) { + int s; + + if ((r->tosend != 0) && (r->send !=0)) { + dbms_log(L_WARN,"dispatch, but still older data left to send"); + goto fail_dispatch; + }; + + r->iov[0].iov_base = (void *) &(r->cmd); + r->iov[0].iov_len = sizeof(r->cmd); + + r->iov[1].iov_base = r->v1.data = + (v1 == NULL) ? NULL : v1->data; + r->iov[1].iov_len = r->v1.size = r->cmd.len1 = + ( v1 == NULL ) ? 0 : v1->size; + + r->iov[2].iov_base = r->v2.data = + (v2 == NULL) ? NULL : v2->data; + r->iov[2].iov_len = r->v2.size = r->cmd.len2 = + ( v2 == NULL ) ? 0 : v2->size; + + r->tosend = sizeof(r->cmd) + r->cmd.len1 + r->cmd.len2; + r->send =0; + + r->cmd.token = token; + r->cmd.len1 = htonl( r->cmd.len1 ); + r->cmd.len2 = htonl( r->cmd.len2 ); + +#ifdef RDFSTORE_DBMS_DEBUG_TIME + gettimeofday(&(r->cmd.stamp),NULL); +#endif + /* BUG: we also use this with certain errors, in an attempt to + * inform the other side of the error. So it might well be + * that we block here... one day... + */ + s=writev(r->clientfd,r->iov,3); + + if (s<0) { + if (errno == EINTR) { + dbms_log(L_INFORM,"Initial write interrupted. Ignored"); + s=0; + } + else + if (errno == EAGAIN) { + dbms_log(L_INFORM,"Initial write would block"); + s = 0; + } + else { + dbms_log(L_ERROR,"Initial write error: %s",strerror(errno)); + goto fail_dispatch; + }; + } + else + if ((s==0) && (errno != EINTR)) { + dbms_log(L_ERROR,"Intial write; client closed connection"); + goto fail_dispatch; + }; + + r->send += s; + if (r->send == r->tosend) { + r->send = 0; + r->tosend =0; +#ifndef STATIC_SC_BUFF + if (r->sendbuff) + myfree(r->sendbuff); + r->sendbuff = NULL; +#endif + } + else { + int at,i; void * p; + /* create a buffer for the remaining data + */ + +#if STATIC_SC_BUFF + if (r->tosend-r->send > MAX_SC_PAYLOAD) { + dbms_log(L_ERROR, + "Secondary write buffer of %d>%d bytes to big", + r->tosend - r->send, + MAX_SC_PAYLOAD + ); + goto fail_dispatch; + }; +#else + assert(r->tosend > r->send ); + r->sendbuff = mymalloc( r->tosend - r->send ); +#endif + assert(r->sendbuff); + + if (r->sendbuff == NULL) { + dbms_log(L_ERROR, + "Out of memory whilst creating a secondary write buffer of %d bytes", + r->tosend - r->send + ); + goto fail_dispatch; + }; + + for(p=r->sendbuff,i=0,at=0; i < 3; i++) { + if ( at > r->send ) { + memcpy(p, r->iov[i].iov_base,r->iov[i].iov_len); + p+=r->iov[i].iov_len; + } else + if ( at + r->iov[i].iov_len > r->send ) { + int offset = r->send - at; + int len=r->iov[i].iov_len - offset; + memcpy(p, r->iov[i].iov_base + offset, len); + p+=len; + } + else { + /* skip, done */ + } + at += r->iov[i].iov_len; + }; + + /* redo our bookkeeping, as we have moved it all in + * just one contineous buffer. We had to copy, as the + * v1 and v2's propably just contained pointers to either + * a static error string or a memmap file form the DB inter + * face; neither which are going to live long. + */ + r->tosend -= s; + r->send = 0; + + FD_SET(r->clientfd,&allwset); + }; + + return; + +fail_dispatch: + dbms_log(L_WARN,"dispatch failed"); + r->close=1;MX; + return; + } + +void do_msg ( connection * r, int token, char * msg) { + DBT rr; + + rr.size = strlen(msg) +1; + rr.data = msg; + + dispatch(r,token | F_SERVER_SIDE, &rr, NULL); + return; + } + +connection * +handle_new_local_connection( + int clientfd, int type + ) +{ + struct sockaddr_in none; + none.sin_addr.s_addr = INADDR_NONE; + return handle_new_connection(clientfd, type, none); +} + +connection * +handle_new_connection( + int clientfd, int type, struct sockaddr_in addr + ) +{ + connection * new; + int v; + + if (client_counter > HARD_MAX_CLIENTS) { + dbms_log(L_ERROR,"Max number of clients reached (hard max), completely ignoring"); + close(clientfd); + return NULL; + }; + + if (client_counter >= max_clients) { + connection tmp; + tmp.clientfd = clientfd; + tmp.close = tmp.send = tmp.tosend = tmp.gotten = tmp.toget = 0; + reply_log(&tmp,L_ERROR,"Too many connections fd=%d",clientfd); + close(clientfd); + return NULL; + }; + + if ( (v=fcntl( clientfd, F_GETFL, 0)<0) || (fcntl(clientfd, F_SETFL,v | O_NONBLOCK)<0) ) { + dbms_log(L_ERROR,"Could not make socket non blocking: %s",strerror(errno)); + close(clientfd); + return NULL; + }; + + FD_SET(clientfd,&allrset); + FD_SET(clientfd,&alleset); + + /* XXX we could try to fill holes in the bit array at this point; + * and get max fd as low as possible. But it seems that the OS + * already keeps the FDs as low as it can (except for OpenBSD ??) + */ + if ( clientfd > maxfd ) + maxfd=clientfd; + + /* if still space, use, otherwise tack another + * one to the end.. + */ +#if STATIC_BUFF + if (free_connection_list != NULL) { + new = free_connection_list; + free_connection_list = new->next; + free_connection_len --; + } else { + assert(free_connection_len == 0); + if (free_connection_keep < free_connection_keep_max/2) + free_connection_keep *= 2; + else + if (free_connection_keep < free_connection_keep_max) + free_connection_keep += 2; +#endif + if ((new = (connection *) mymalloc(sizeof(connection))) == NULL ) + { + dbms_log(L_ERROR,"Could not claim enough memory"); + close(clientfd); + return NULL; + }; +#if STATIC_BUFF + } +#endif + bzero(new,sizeof(connection)); + new->next = client_list; + client_list = new; + + bzero(new,sizeof(new)); + + /* Copy the needed information. */ + new->clientfd = clientfd; + + new->sendbuff = NULL; +#ifdef STATIC_SC_BUFF + if ((type != C_CHILD)) + new->sendbuff = (unsigned char *) mymalloc(MAX_SC_PAYLOAD); +#endif + new->recbuff = NULL; +#ifdef STATIC_CS_BUFF + if ((type != C_CHILD)) + new->recbuff = (unsigned char *) mymalloc(MAX_CS_PAYLOAD); +#endif + + new->dbp = NULL; + new->type = type; + +#ifdef TIMEOUT + new->start = time(NULL); + new->last = time(NULL); +#endif + new->address = addr; + new->close = 0; + new->send = new->tosend = new->gotten = new->toget = 0; + + client_counter ++; + return new; +} + +void final_read( connection * r) +{ + r->toget = r->gotten = 0; + parse_request(r); + +#ifndef STATIC_CS_BUFF + if (r->recbuff) { + myfree(r->recbuff); + r->recbuff = NULL; + }; +#endif + return; +} + + +void initial_read( connection * r ) { + struct header skip_cmd; + int n=0; + + /* we peek, untill we have the full command buffer, and + * only then do we give it any attention. This safes a + * few syscalls. + */ + errno = 0; + n=recv(r->clientfd,&(r->cmd),sizeof(r->cmd),MSG_PEEK); + + if (n<0) dbms_log(L_DEBUG,"Read fd=%d n=%d errno=%d/%s",r->clientfd,n,errno,strerror(errno)); + + if ((n < 0) && (errno == EAGAIN)) { + dbms_log(L_ERROR,"Again read %s on %d",strerror(errno),r->clientfd); + return; + } + else + if ((n <= 0) && (errno == EINTR)) { + dbms_log(L_ERROR,"Interruped read %s",strerror(errno)); + return; + } + else + if (n<0) { + if (errno != ECONNRESET) + dbms_log(L_ERROR,"Read error %s",strerror(errno)); + r->close=1;MX; + return; + } + else + if (n==0) { + dbms_log(L_INFORM,"Client side close on read %d/%s (fd=%d)", + errno,strerror(errno),r->clientfd); + r->close=1;MX; + return; + } + else + if ( n != sizeof(r->cmd) ) { + /* lets log this, as we want to get an idea if this actually happens . + * seems not, BSD, on high load, SCO. + */ + dbms_log(L_WARN,"Still waitingn for those 5 bytes, gotten LESS"); + return; + } + else { +#ifdef RDFSTORE_DBMS_DEBUG_TIME + float s,m; + struct timeval t; + gettimeofday(&t,NULL); + s=t.tv_sec - r->cmd.stamp.tv_sec; + m=t.tv_usec - r->cmd.stamp.tv_usec; + MDEBUG((stderr,"Time taken %f seconds\n", s + m / 1000000.0 )); + total_time += s + m / 1000000.0; +#endif + + /* check if this is ok ?, if not, do not + * touch it with a stick. + */ +#if 0 + if (( (r->cmd.token) & ~MASK_TOKEN ) != F_CLIENT_SIDE ) { + reply_log(r,L_ERROR,"Not a client side token.."); + r->close=1; MX; + return; + }; +#endif + r->cmd.token &= MASK_TOKEN; + + /* set up a single buffer to get the remainder of this + * message + */ + r->v1.size= r->cmd.len1 = ntohl( r->cmd.len1); + r->v2.size= r->cmd.len2 = ntohl( r->cmd.len2); + + // silly endian check. +#if 1 + if (r->v1.size > 2*1024*1024) { + reply_log(r,L_ERROR,"Size one to big"); + r->close=1; MX; + return; + }; + + if (r->v1.size > 2*1024*1024) { + reply_log(r,L_ERROR,"Size two to big"); + r->close=1; MX; + return; + }; +#endif + +#ifndef STATIC_CS_BUFF + if (r->recbuff) + myfree(r->recbuff); + r->recbuff = NULL; +#endif + r->v2.data = r->v1.data = NULL; + r->toget = r->gotten = 0; + + if (r->cmd.len1 + r->cmd.len2 > 0) { +#if STATIC_CS_BUFF + if (r->cmd.len1 + r->cmd.len2 > MAX_CS_PAYLOAD) { + reply_log(r,L_ERROR, + "RQ string(s) to big %d>%d bytes", + r->cmd.len1 + r->cmd.len2, + MAX_CS_PAYLOAD + ); + r->close=1; MX; + return; + } +#else + r->recbuff = mymalloc( r->cmd.len1 + r->cmd.len2 ); +#endif + if (r->recbuff == NULL) { + reply_log(r,L_ERROR, + "No Memrory for RQ string(s) %d bytes", + r->cmd.len1 + r->cmd.len2); + r->close=1; MX; + return; + }; + r->v1.data = r->recbuff; + r->v2.data = r->recbuff + r->cmd.len1; + r->toget = r->cmd.len1 + r->cmd.len2; + } + + r->iov[0].iov_base = (void *) &skip_cmd; + r->iov[0].iov_len = sizeof( r->cmd ); + + r->iov[1].iov_base = r->recbuff; + r->iov[1].iov_len = r->toget; + +reread: + errno = 0; + n = readv( r->clientfd, r->iov, 2); + + if ((n<=0) && (errno == EINTR)) { + dbms_log(L_INFORM,"Interrupted readv. Ignored"); + goto reread; + } + else + if ((n<0) && (errno == EAGAIN)) { + dbms_log(L_ERROR,"Would block. Even though we peeked at the cmd string. Retry"); + goto reread; + } + else + if (n<0) { + dbms_log(L_ERROR,"Error while reading remainder: (1 %s",strerror(errno)); + r->close=1; MX; + return; + } + else + if (n==0) { + dbms_log(L_INFORM,"Read, but client closed"); + r->close=1; MX; + return; + }; + + assert(n >= sizeof(r->cmd)); + + n -= sizeof(r->cmd); + r->gotten += n; + + if ( r->gotten >= r->toget) { + final_read(r); + return; + }; + } + /* should not get here.. */ + return; + } + +void +continue_read( connection * r ) { + /* fill up the two buffers.. */ + int s; + + dbms_log(L_VERBOSE,"continued read for %d..",r->toget); + errno = 0; + s = read( r->clientfd, r->gotten + r->v1.data, r->toget - r->gotten); + + if ((s<=0) && (errno == EINTR)) { + dbms_log(L_INFORM,"Interrupted continued read. Ignored"); + return; + } + else + if (((s<0) && (errno == EAGAIN)) ) { + dbms_log(L_ERROR,"continued read, but nothing there"); + return; + } + else + if (s<0) { + dbms_log(L_ERROR,"Error while reading remainder: (2 %s",strerror(errno)); + r->close=1; MX; + return; + } + else + if (s==0) { + dbms_log(L_ERROR,"continued read, but client closed connection (%d/%s)", + errno,strerror(errno)); + r->close=1; MX; + return; + }; + + r->gotten +=s; + + if (r->gotten >= r->toget) + final_read(r); + + return; + } + + Added: incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/deamon.h URL: http://svn.apache.org/viewvc/incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/deamon.h?view=auto&rev=528394 ============================================================================== --- incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/deamon.h (added) +++ incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/deamon.h Fri Apr 13 01:56:01 2007 @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2000-2006 Alberto Reggiori + * Dirk-Willem van Gulik + * + * NOTICE + * + * This product is distributed under a BSD/ASF like license as described in the 'LICENSE' + * file you should have received together with this source code. If you did not get a + * a copy of such a license agreement you can pick up one at: + * + * http://rdfstore.sourceforge.net/LICENSE + * + * + * $Id: deamon.h,v 1.14 2006/06/19 10:10:22 areggiori Exp $ + */ +#ifndef _H_DEAMON +#define _H_DEAMON + +#include "dbms.h" +#include "conf.h" +#include "dbms_comms.h" + +#ifdef RDFSTORE_DBMS_DEBUG +#ifdef RDFSTORE_DBMS_DEBUG_TIME +extern float total_time; + +#define MDEBUG( x ) { \ + struct timeval t; \ + struct timezone tz; \ + gettimeofday(&t,&tz); \ + fprintf(stderr,"MDEBUG[%5d]: %f: ",getpid(),total_time);\ + fprintf x ; \ + fflush(stderr);\ + } +#else +#define MDEBUG( x ) { fprintf(stderr,"MDEBUG[%5d %s]: ",getpid(),getpid()==mum_pid ? "mum" : "cld"); fprintf x ; fflush(stderr); } +#endif + +#else +#define MDEBUG( x ) { ; } +#endif + + +#ifdef FORKING +typedef struct child_rec { + int close; + struct child_rec * nxt; + struct connection * r; + int pid; /* pid of the child (for sig detects) */ + int num_dbs; /* Number of DBS-es assigned sofar */ + } child_rec; +#endif + +typedef struct dbase { +#ifdef FORKING + struct child_rec * handled_by; +#endif + int sname; + int mode; + int bt_compare_fcn_type; +#ifdef STATIC_BUFF + char pfile[ MAX_STATIC_PFILE ]; + char name[ MAX_STATIC_NAME ]; +#else + char * pfile; + char * name; +#endif + DB * handle; +#ifdef DB_VERSION_MAJOR + DBC * cursor ; +#endif + int lastfd; /* last FD from which a cursor was set */ + int num_cls; /* Number of Clients served */ + struct dbase * nxt; + int close; + } dbase; + +typedef struct connection { + + int type; /* one of C_MUM, C_CHILD, ... */ + + int clientfd; + + struct sockaddr_in address; + + DBT v1; + DBT v2; + + char * sendbuff; + char * recbuff; + + struct dbase * dbp; + + struct header cmd; + struct iovec iov[3]; + struct msghdr msg; + + tops op; /* Max operation allowed */ + int send; /* size of the outgoing block */ + int tosend; /* bytes send sofar.. */ + + int gotten; + int toget; + + int close; /* Shall I close the connection ? */ +#ifdef TIMEOUT + TIMESPEC start,last; +#endif + struct connection * next; + } connection; + +typedef struct command_req { + unsigned char cmd; + char * info; + int cnt; + void (*handler)(connection * r); + tops op; + } command_req; + +extern struct command_req cmd_table[ TOKEN_MAX ]; + +#define L_FATAL -2 +#define L_ERROR -1 +#define L_WARN 0 +#define L_INFORM 1 +#define L_VERBOSE 2 +#define L_BLOAT 3 +#define L_DEBUG 4 + +void reply_log(connection * r, int level, char *fmt, ...); +void dbms_log(int level, char *fmt, ...); +void trace(char *fmt, ...); + +extern int debug,verbose,trace_on; + +#define do_error(r,m) do_error_i(r,m,__FILE__,__LINE__); +#define do_error2(r,m,e) do_error2_i(r,m,e,__FILE__,__LINE__); +void do_error_i ( connection * r, char * msg, char *file, int line ); +void do_error2_i ( connection * r, char * msg, int err, char *file, int line ); + +void dispatch( connection * r, int token, DBT * v1, DBT * v2); + +void clean_children( void ); +void cleanmost( void ); +void cleandown( int signo ); + +void zap ( connection * r ); +void zap_dbs( dbase * q ); +#ifdef FORKING +void zap_child( child_rec * r); +#endif + +void close_connection ( connection * r ); +void free_connection ( connection * r ); +void continue_send( connection * r ); +void final_read( connection * r) ; +void initial_read( connection * r ); +void continue_read( connection * r ); + +extern int client_counter; + +#endif Added: incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/handler.c URL: http://svn.apache.org/viewvc/incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/handler.c?view=auto&rev=528394 ============================================================================== --- incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/handler.c (added) +++ incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/handler.c Fri Apr 13 01:56:01 2007 @@ -0,0 +1,1621 @@ +/* + * Copyright (c) 2000-2006 Alberto Reggiori + * Dirk-Willem van Gulik + * + * NOTICE + * + * This product is distributed under a BSD/ASF like license as described in the 'LICENSE' + * file you should have received together with this source code. If you did not get a + * a copy of such a license agreement you can pick up one at: + * + * http://rdfstore.sourceforge.net/LICENSE + * + * + * $Id: handler.c,v 1.48 2006/06/19 10:10:22 areggiori Exp $ + */ + +#include "dbms.h" +#include "dbms_comms.h" +#include "dbms_compat.h" +#include "dbmsd.h" + +#include "deamon.h" +#include "mymalloc.h" +#include "handler.h" +#include "children.h" +#include "pathmake.h" + +#include "rdfstore_flat_store.h" + +dbase * first_dbp = NULL; + +#ifdef STATIC_BUFF +static dbase * free_dbase_list = NULL; +static int free_dbase_list_len = 0; +static int free_dbase_list_keep = 2; +static int free_dbase_list_max = 8; +#endif +static int dbase_counter = 0; + +#ifdef BERKELEY_DB_1_OR_2 +static int rdfstore_backend_dbms_compare_int( + const DBT *a, + const DBT *b ); +#else +static int rdfstore_backend_dbms_compare_int( + DB *file, + const DBT *a, + const DBT *b ); +#endif + +#ifdef BERKELEY_DB_1_OR_2 +static int rdfstore_backend_dbms_compare_double( + const DBT *a, + const DBT *b ); +#else +static int rdfstore_backend_dbms_compare_double( + DB *file, + const DBT *a, + const DBT *b ); +#endif + + +char * iprt( DBT * r ) { + static char tmp[ 128 ]; int i; + if (r==NULL) + return ""; + if (r->data==NULL) + return ""; + if (r->size < 0 || r->size > 1024*1024) + return ""; + + for(i=0;i< ( r->size > 127 ? 127 : r->size);i++) { + int c= ((char *)(r->data))[i]; + tmp[i] = ((c<32) || (c>127)) ? '.' : c; + }; + + tmp[i]='\0'; + return tmp; + } + +char * eptr( int i ) { + if (i==0) + return "Ok "; + else + if (i==1) + return "NtFnd"; + else + if (i==2) + return "Incmp"; + else + if (i>2) + return "+? "; + else + return "Fail "; + } + +static int _dbclose(dbase *q) +{ + if ((q->handle->sync)(q->handle,0)) + return -1; + +#ifdef DB_VERSION_MAJOR + if ( (q->cursor->c_close(q->cursor)) || + ((q->handle->close)(q->handle, 0)) ) +#else + if ((q->handle->close)(q->handle)) +#endif + return -1; + return 0; +} + +void +free_dbs( + dbase * q + ) +{ + if ((q->handle) && (_dbclose(q))) + dbms_log(L_ERROR,"Sync/Close(%s) returned an error during closing of db", q->name); + +#ifdef STATIC_BUFF + if (free_dbase_list_len < free_dbase_list_keep) { + q->nxt = free_dbase_list; + free_dbase_list = q; + free_dbase_list_len ++; + } else +#endif + myfree(q); + +#ifndef STATIC_BUFF + if (q->pfile) myfree(q->pfile); + if (q->name) myfree(q->name); +#endif + dbase_counter --; +}; + +void +zap_dbs ( + dbase * r + ) +{ + dbase * * p; + connection * s; + + /* XXX we do not want this ?! before we + * know it we end up in n**2 land + */ + for ( p = &first_dbp; *p && *p != r; ) + p = &((*p)->nxt); + + if ( *p == NULL) { + dbms_log(L_ERROR,"DBase to zap not found"); + return; + }; + + /* should we not first check all the connections + * to see if there are (about to) close.. + */ + for(s=client_list; s;s=s->next) + if (s->dbp == r) { + s->close = 1; MX; + }; + *p = r->nxt; + free_dbs(r); + } + + +void close_all_dbps() { + dbase * p; + + for(p=first_dbp; p;) { + dbase * q; + q = p; p=p->nxt; + free_dbs( q ); /* XXXX why am I not just calling ZAP ? */ + }; + first_dbp=NULL; + } + +/* opening of a local database.. + */ +int open_dbp( dbase * p ) { + +#if 0 + HASHINFO priv = { + 16*1024, /* bsize; hash bucked size */ + 8, /* ffactor, # keys/bucket */ + 3000, /* nelements, guestimate */ + 512*1024, /* cache size */ + NULL, /* hash function */ + 0 /* use current host order */ + }; +#endif + +#ifdef BERKELEY_DB_1_OR_2 /* Berkeley DB Version 1 or 2 */ +#ifdef DB_VERSION_MAJOR + DB_INFO btreeinfo; + memset(&btreeinfo, 0, sizeof(btreeinfo)); + btreeinfo.bt_compare = ( p->bt_compare_fcn_type == FLAT_STORE_BT_COMP_INT ) ? rdfstore_backend_dbms_compare_int : ( p->bt_compare_fcn_type == FLAT_STORE_BT_COMP_DOUBLE ) ? rdfstore_backend_dbms_compare_double : NULL ; +#else + BTREEINFO btreeinfo; + memset(&btreeinfo, 0, sizeof(btreeinfo)); + btreeinfo.compare = ( p->bt_compare_fcn_type == FLAT_STORE_BT_COMP_INT ) ? rdfstore_backend_dbms_compare_int : ( p->bt_compare_fcn_type == FLAT_STORE_BT_COMP_DOUBLE ) ? rdfstore_backend_dbms_compare_double : NULL ; +#endif +#endif + + umask(0); + + /* XXX Do note that we _have_ a mode variable. We just ignore it. + * except for the create flag. + * + * XXX we could also pass a &priv=NULL pointer to let the DB's work this + * one out.. + */ + +#ifdef BERKELEY_DB_1_OR_2 /* Berkeley DB Version 1 or 2 */ + +#ifdef DB_VERSION_MAJOR + if ( (db_open( p->pfile, + DB_BTREE, + DB_CREATE, /* only create it should be ((ro==0) ? ( DB_CREATE ) : ( DB_RDONLY ) ) */ + 0666, NULL, &btreeinfo, &p->handle )) || +#if DB_VERSION_MAJOR == 2 && DB_VERSION_MINOR < 6 + ((p->handle->cursor)(p->handle, NULL, &p->cursor)) +#else + ((p->handle->cursor)(p->handle, NULL, &p->cursor, 0)) +#endif + ) { +#else + +#if defined(DB_LIBRARY_COMPATIBILITY_API) && DB_VERSION_MAJOR > 2 + if (!(p->handle = (DB *)__db185_open( p->pfile, + p->mode, + 0666, DB_BTREE, &btreeinfo ))) { +#else + if (!(p->handle = (DB *)dbopen( p->pfile, + p->mode, + 0666, DB_BTREE, &btreeinfo ))) { +#endif /* DB_LIBRARY_COMPATIBILITY_API */ + +#endif + +#else /* Berkeley DB Version > 2 */ + if (db_create(&p->handle, NULL,0)) + return errno; + + /* set the b-tree comparinson function to the one passed */ + if( p->bt_compare_fcn_type != NULL ) { + p->handle->set_bt_compare(p->handle, ( p->bt_compare_fcn_type == FLAT_STORE_BT_COMP_INT ) ? + rdfstore_backend_dbms_compare_int : ( p->bt_compare_fcn_type == FLAT_STORE_BT_COMP_DOUBLE ) ? + rdfstore_backend_dbms_compare_double : NULL ); + }; + + p->handle->set_errfile(p->handle,stderr); + p->handle->set_errpfx(p->handle,"DBMS BerkelyDB"); + + if ( (p->handle->open( p->handle, +#if DB_VERSION_MAJOR >= 4 && DB_VERSION_MINOR > 0 && DB_VERSION_PATCH >= 17 + NULL, +#endif + p->pfile, + NULL, + DB_BTREE, + DB_CREATE, /* only create it should be ((ro==0) ? ( DB_CREATE ) : ( DB_RDONLY ) ) */ + 0666 )) || + ((p->handle->cursor)(p->handle, NULL, &p->cursor, 0)) ) { +#endif /* Berkeley DB Version > 2 */ + + return errno; + }; + +#ifndef BERKELEY_DB_1_OR_2 /* Berkeley DB Version > 2 */ +/* + (void)p->handle->set_h_ffactor(p->handle, 1024); + (void)p->handle->set_h_nelem(p->handle, (u_int32_t)6000); +*/ +#endif + + return 0; + } + + +dbase * get_dbp (connection *r, dbms_xsmode_t xsmode, int bt_compare_fcn_type, DBT * v2 ) { + dbase * p; + char * pfile; + char name[ 255 ], *n, *m; + int i; + int mode = 0; + tops mops = T_NONE; + + /* Clean up the name */ + bzero(name,sizeof(name)); + for(m = (unsigned char *)(v2->data),n=name,i=0;isize && iop = allowed_ops_on_dbase(r->address.sin_addr.s_addr, name); + dbms_log(L_DEBUG,"Permissions for %s/%s - %s", + name, inet_ntoa(r->address.sin_addr),op2string(r->op)); + + switch(xsmode) { + case DBMS_XSMODE_RDONLY: + mops = T_RDONLY; + mode = O_RDONLY; + break; + ;; + case DBMS_XSMODE_RDWR: + mops = T_RDWR; + mode = O_RDWR; + break; + ;; + case DBMS_XSMODE_CREAT: + mops = T_CREAT; + mode = O_RDWR | O_CREAT; + break; + ;; + case DBMS_XSMODE_DROP: + mops = T_DROP; + mode = O_RDWR | O_CREAT; + break; + default: + dbms_log(L_ERROR,"Impossible XSmode(bug) %d requed on %s", + xsmode,name); + return NULL; + break; + } + + if (mops > r->op) { + char * ip = strdup(inet_ntoa(r->address.sin_addr)); + dbms_log(L_ERROR,"Access violation on %s: %s requested %s - but may up to %s", + name, ip, op2string(mops),op2string(r->op)); + free(ip); + return NULL; + }; + + /* Max allowed operation */ + r->op = MIN(mops,r->op); + dbms_log(L_DEBUG,"Permissions for %s/%s - asked %s - granted %s", + name, inet_ntoa(r->address.sin_addr),op2string(mops), + op2string(r->op)); + +#if 0 + /* We always add a RDWR to the open - as it may be the case + * that some later connection needs RW. XXX fixme. + */ + mode = ( mode & (~ O_RDONLY)) | O_RDWR; +#endif + +#ifndef RDFSTORE_PLATFORM_SOLARIS +#ifndef RDFSTORE_PLATFORM_LINUX + /* Try to get an exclusive lock if possible */ + mode |= O_EXLOCK; +#endif +#endif + + for ( p = first_dbp; p; p=p->nxt) + if (strcmp(p->name,name)==0) { + int oldmode = p->mode; + + /* If the database has the b-tree comparinson function we need - simply + * return it. If we are forking - and this is not the process + * really handling the database - then ignore all this. Otherwise we + * fail with an error + */ + if ((((p->bt_compare_fcn_type) & bt_compare_fcn_type) == bt_compare_fcn_type ) +#ifdef FORKING + || (!mum_pid) +#endif + ) { + return p; + } else { + dbms_log(L_ERROR, "Wrong b-tree comparinson function %d on %s - it should be %d", + bt_compare_fcn_type, p->name, p->bt_compare_fcn_type ); + return NULL; + }; + + /* If the database already has the perm's we need - simply + * return it. If we are forking - and this is not the process + * really handling the database - then ignore all this + */ + if ((((p->mode) & mode) == mode ) +#ifdef FORKING + || (!mum_pid) +#endif + ) return p; + + /* we need to (re)open the database with the higher level perm's we + * we need this time.. + */ + p->mode = mode; + if (_dbclose(p) || open_dbp( p )) { + dbms_log(L_ERROR, + "DBase %s could not be be reopened with the right permissions %d", + p->name,p->mode); + /* try to reopen the dbase with the old permissions + * (for the other connections still active) + */ + p->mode = oldmode; + + /* bail out - but not clean up de *p; as other + * connections are still using it. + */ + if (open_dbp(p)) + return NULL; + + /* give up - and have the DB removed (even for + * the other connections ! */ + goto err_and_exit; + } + return p; + } + + if (dbase_counter > HARD_MAX_DBASE) { + dbms_log(L_ERROR,"Hard max number of dabases hit. (bug?)"); + return NULL; + }; + +#ifdef STATIC_BUFF + if (free_dbase_list) + { + p = free_dbase_list; + free_dbase_list = free_dbase_list->nxt; + } else { + if (free_dbase_list_keep < free_dbase_list_max) + free_dbase_list_keep += 2; +#else +{ +#endif + p = mymalloc(sizeof(dbase)); + } + + if (p == NULL) { + dbms_log(L_ERROR,"No Memory (for another dbase 1)"); + return NULL; + }; + bzero(p,sizeof(dbase)); + p->nxt = first_dbp; + first_dbp = p; + dbase_counter ++; + +#ifndef STATIC_BUFF + p->name = NULL; + p->pfile = NULL; +#else + p->name[0] ='\0'; + p->pfile[0] = '\0'; +#endif + p->num_cls = 0; + p->close = 0; + p->mode = mode; + p->bt_compare_fcn_type = bt_compare_fcn_type; + p->sname = v2->size; + p->handle = NULL; + +#ifdef FORKING + p->handled_by = NULL; +#endif + +#ifdef STATIC_BUFF + if ( 1+ v2->size > MAX_STATIC_NAME ) +#else + if ((p->name = mymalloc( 1+v2->size ))==NULL) +#endif + { + dbms_log(L_ERROR,"No Memory (for another dbase 2)"); + goto clean_and_exit; + }; + + strcpy(p->name, name); + + if (!(pfile= mkpath(my_dir,p->name))) + goto clean_and_exit; + +#ifdef STATIC_BUFF + if ( strlen(pfile)+1 > MAX_STATIC_PFILE ) +#else + if ((p->pfile = mymalloc(strlen(pfile)+1)) == NULL ) +#endif + { + dbms_log(L_ERROR,"No Memory (for another dbase 3)"); + goto clean_and_exit; + }; + strcpy(p->pfile,pfile); + + /* Check if the DB exists unless we are on an allowed + * create operations level. + */ + if (r->op < T_CREAT) { + struct stat sb; + int s=stat(p->pfile,&sb); + if (s==-1) { + dbms_log(L_ERROR,"DB %s not found\n",p->pfile); + goto clean_and_exit; + } + /* DB exists - we are good. */ + }; + +#ifdef FORKING + /* if we are the main process, then pass + * on the request to a suitable child; + * if we are the 'child' then do the + * actual work.. + */ + if (!mum_pid) { + int mdbs=0,c=0; + struct child_rec * q, *best; + + /* count # of processes and get the least + * loaded one of the lot. Or create a + * fresh one. XXXX We could also go for + * a rotational approach, modulo the counter. + * that would remove the need to loop, but + * spoil the load distribution. + */ + if (child_counter < max_processes) { + q=create_new_child(); + /* fork/child or error */ + if ((q == NULL) && (errno)) + goto clean_and_exit; + if (q == NULL) + return NULL; /* just bail out if we are the child */ + best=q; + } + else { + for(c=0,q=children; q; q=q->nxt) + if ( mdbs == 0 || q->num_dbs < mdbs ) { + mdbs = q->num_dbs; + best = q; + }; + }; + + p->handled_by = best; + p->handled_by->num_dbs ++; + + return p; + }; /* if mother */ + /* we are a child... just open normal. + */ +#endif + if (open_dbp( p ) == 0) + return p; + +err_and_exit: + dbms_log(L_ERROR,"open_dbp(1) %s(mode %d) (bt_compare %d) failed: %s",p->pfile,p->mode,p->bt_compare_fcn_type, strerror(errno)); + +clean_and_exit: + p->close = 1; MX; + + /* repair... and shuffle... */ + first_dbp = p->nxt; +#ifndef STATIC_BUFF + if (p->pfile) myfree(p->pfile); + if (p->name) myfree(p->name); + if (p) myfree(p); +#else + p->nxt = free_dbase_list; + free_dbase_list = p; +#endif + dbase_counter --; + return NULL; +} + +void do_init( connection * r) { + DBT val; + u_long proto; + dbms_xsmode_t xsmode; + int bt_compare_fcn_type; + + memset(&val, 0, sizeof(val)); + + val.data = &proto; + val.size = sizeof( u_long ); + + xsmode = (dbms_xsmode_t)((u_long) ntohl( ((u_long *)(r->v1.data))[1] )); + +#ifdef FORKING + assert(mum_pid==0); +#endif + if (r->v1.size == 0) { + reply_log(r,L_ERROR,"No protocol version"); + return; + }; + + proto =((u_long *)(r->v1.data))[0]; + if ( ntohl(proto) != DBMS_PROTO ) { + reply_log(r,L_ERROR,"Protocol not supported"); + return; + }; + + bt_compare_fcn_type = ((int) ntohl( ((u_long *)(r->v1.data))[2] )); + if ( ( bt_compare_fcn_type != 0 ) && + ( bt_compare_fcn_type < FLAT_STORE_BT_COMP_INT ) && + ( bt_compare_fcn_type > FLAT_STORE_BT_COMP_DATE ) ) { + reply_log(r,L_ERROR,"B-tree sorting function not supported"); + return; + }; + + /* work out wether we have this dbase already open, + * and open it if ness. + */ + r->dbp = get_dbp( r, xsmode, bt_compare_fcn_type, &(r->v2)); /* returns NULL on error or if it is a child */ + + if (r->dbp == NULL) { + if (errno == ENOENT) { + dbms_log(L_DEBUG,"Closing instantly with a not found"); + dispatch(r, TOKEN_INIT | F_NOTFOUND,&val,NULL); + return; + }; +#ifdef FORKING + if (!mum_pid) +#endif + reply_log(r,L_ERROR,"Open2 database '%s' failed: %s", + iprt(&(r->v2)),strerror(errno)); + return; + }; + + r->dbp->num_cls ++; +#ifdef FORKING +{ + /* We -also- need to record some xtra things which are lost acrss the connection. */ + u_long extra[4]; + extra[0] = ((u_long *)(r->v1.data))[0]; /* proto */ + extra[1] = ((u_long *)(r->v1.data))[1]; /* mode */ + extra[2] = ((u_long *)(r->v1.data))[2]; /* bt_compare_fcn_type */ + extra[3] = r->address.sin_addr.s_addr; + r->v1.data = extra; + r->v1.size = sizeof(extra); + if (handoff_fd(r->dbp->handled_by, r)) + reply_log(r,L_ERROR,"handoff %s : %s", + r->dbp->name,strerror(errno)); +} +#else + dispatch(r, TOKEN_INIT | F_FOUND,&val,NULL); +#endif + return; + } + +#ifdef FORKING +void do_pass( connection * mums_r) { + /* this is not really a RQ coming in from a child.. bit instead + * a warning that we are about to pass a file descriptor + * in the next message. There is no need to actually confirm + * anything if we are successfull, we should just be on the + * standby to get the FD, and treat it as a new connection.. + * + * note that the r->fd is _not_ a client fd, but the one to + * our mother. + */ + connection * r; + int newfd; + u_long proto; + dbms_xsmode_t xsmode; + DBT val; + u_long bt_compare_fcn_type; + + memset(&val, 0, sizeof(val)); + assert(mums_r->v1.size = 4*sizeof(u_long)); + mums_r->address.sin_addr.s_addr = ((u_long *)(mums_r->v1.data))[3]; + + assert(mum_pid); + + if ((newfd=takeon_fd(mum->clientfd))<0) { + reply_log(mums_r,L_ERROR,"Take on failed: %s", + strerror(errno)); + /* give up on the connection to mum ?*/ + mums_r->close = 1; MX; + return; + }; + + /* try to take this FD on board.. and let it do + * whatever error moaning itself. + */ + proto =((u_long *)(mums_r->v1.data))[0]; + xsmode = (dbms_xsmode_t)((u_long) htonl(((u_long *)(mums_r->v1.data))[1])); + + dbms_log(L_INFORM,"PASS db='%s' mode %d",iprt(&(mums_r->v2)),xsmode); + + if ((r = handle_new_connection( newfd, C_CLIENT, mums_r->address)) == NULL) + return; + + /* is this the sort of init data we can handle ? + */ + if ( ntohl(proto) != DBMS_PROTO ) { + reply_log(r,L_ERROR,"Protocol not supported"); + return; + }; + + bt_compare_fcn_type = ((int) ntohl( ((u_long *)(mums_r->v1.data))[2] )); + if ( ( bt_compare_fcn_type != 0 ) && + ( ( bt_compare_fcn_type < FLAT_STORE_BT_COMP_INT ) || + ( bt_compare_fcn_type > FLAT_STORE_BT_COMP_DATE ) ) ) { + reply_log(r,L_ERROR,"B-tree sorting function not supported"); + return; + }; + + r->dbp = get_dbp( r, xsmode, bt_compare_fcn_type, &(mums_r->v2)); + + if (r->dbp== NULL) { + if (errno == ENOENT) { + dispatch(r, TOKEN_INIT | F_NOTFOUND,&val,NULL); + r->close = 1; MX; + return; + }; + reply_log(r,L_ERROR,"Open database %s failed: %s", + iprt(&(mums_r->v2)),strerror(errno)); + return; + }; + + r->dbp->num_cls ++; + r->dbp->handled_by = NULL; + + /* let the _real_ client know all is well. */ + proto=htonl(DBMS_PROTO); + val.data= &proto; + val.size = sizeof( u_long ); + + dispatch(r, TOKEN_INIT | F_FOUND,&val,NULL); + + dbms_log(L_INFORM,"PASS send init repy on %d to client",r->clientfd); + return; + }; +#endif + +void do_fetch( connection * r) { + DBT key, val; + int err; + + memset(&key, 0, sizeof(key)); + memset(&val, 0, sizeof(val)); + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command FETCH"); + return; + }; + + key.data = r->v1.data; + key.size = r->v1.size; + +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->get)(r->dbp->handle, NULL, &key, &val, 0); +#else + err=(r->dbp->handle->get)(r->dbp->handle, &key, &val, 0); +#endif + + if (err == 0) + dispatch(r,TOKEN_FETCH | F_FOUND,&key,&val); + else +#ifdef DB_VERSION_MAJOR + if (err == DB_NOTFOUND) +#else + if (err == 1) +#endif + dispatch(r,TOKEN_FETCH | F_NOTFOUND,NULL,NULL); + else { + errno=err; + reply_log(r,L_ERROR,"fetch on %s failed: %s (klen=%d, vlen=%d, err=%d(1))",r->dbp->name,strerror(errno), key.size,val.size,err); + } + } + +void do_inc ( connection * r) { + DBT key, val; + int err; + unsigned long l; + char * p; + char outbuf[256]; /* surely shorter than UMAX_LONG */ + + memset(&key, 0, sizeof(key)); + memset(&val, 0, sizeof(val)); + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command FETCH"); + return; + }; + + /* all we get from the client is the key, and + * all we return is the (increased) value + */ + key.data = r->v1.data; + key.size = r->v1.size; + +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->get)( r->dbp->handle, NULL, &key, &val, 0); +#else + err=(r->dbp->handle->get)( r->dbp->handle, &key, &val,0); +#endif + +#ifdef DB_VERSION_MAJOR + if ((err == DB_NOTFOUND) || (val.size == 0)) { +#else + if ((err == 1) || (val.size == 0)) { +#endif + dispatch(r,TOKEN_INC | F_NOTFOUND,NULL,NULL); + return; + } + else + if (err) { +#ifdef DB_VERSION_MAJOR + errno=err; +#endif + reply_log(r,L_ERROR,"inc on %s failed: %s",r->dbp->name, + strerror(errno) ); + return; + }; + + /* XXX bit of a hack; but perl seems to deal with + * all storage as ascii strings in some un- + * specified locale. + */ + bzero(outbuf,256); + strncpy(outbuf,val.data,MIN( val.size, 255 )); + l=strtoul( outbuf, &p, 10 ); + + if (*p || l == ULONG_MAX || errno == ERANGE) { + reply_log(r,L_ERROR,"inc on %s failed: %s",r->dbp->name, + "Not the (entire) string is an unsigned integer" + ); + return; + }; + /* this is where it all happens... */ + l++; + + bzero(outbuf,256); + snprintf(outbuf,255,"%lu",l); + val.data = & outbuf; + val.size = strlen(outbuf); + + /* and put it back.. + * + * Put routines return -1 on error (setting errno), 0 + * on success, and 1 if the R_NOOVERWRITE flag was set + * and the key already exists in the file. + */ +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->put)( r->dbp->handle, NULL, &key, &val, 0); +#else + err=(r->dbp->handle->put)( r->dbp->handle, &key, &val,0); +#endif + + /* just send it back as an ascii string + */ +#ifdef DB_VERSION_MAJOR + if (( err == 0 ) || ( err < 0 )) +#else + if (( err == 0 ) || ( err == 1 )) +#endif + dispatch(r,TOKEN_INC | F_FOUND,NULL,&val); + else { +#ifdef DB_VERSION_MAJOR + errno=err; +#endif + reply_log(r,L_ERROR,"inc store on %s failed: %s", + r->dbp->name,strerror(errno)); + }; + }; + +void do_dec ( connection * r) { + DBT key, val; + int err; + unsigned long l; + char * p; + char outbuf[256]; /* surely shorter than UMAX_LONG */ + + memset(&key, 0, sizeof(key)); + memset(&val, 0, sizeof(val)); + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command FETCH"); + return; + }; + + /* all we get from the client is the key, and + * all we return is the (decreased) value + */ + key.data = r->v1.data; + key.size = r->v1.size; + +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->get)( r->dbp->handle, NULL, &key, &val, 0); +#else + err=(r->dbp->handle->get)( r->dbp->handle, &key, &val,0); +#endif + +#ifdef DB_VERSION_MAJOR + if ((err == DB_NOTFOUND) || (val.size == 0)) { +#else + if ((err == 1) || (val.size == 0)) { +#endif + dispatch(r,TOKEN_DEC | F_NOTFOUND,NULL,NULL); + return; + } + else + if (err) { +#ifdef DB_VERSION_MAJOR + errno=err; +#endif + reply_log(r,L_ERROR,"dec on %s failed: %s",r->dbp->name, + strerror(errno) ); + return; + }; + + /* XXX bit of a hack; but perl seems to deal with + * all storage as ascii strings in some un- + * specified locale. + */ + bzero(outbuf,256); + strncpy(outbuf,val.data,MIN( val.size, 255 )); + l=strtoul( outbuf, &p, 10 ); + + if (*p || l == ULONG_MAX || l == 0 || errno == ERANGE) { + reply_log(r,L_ERROR,"dec on %s failed: %s",r->dbp->name, + "Not the (entire) string is an unsigned integer" + ); + return; + }; + /* this is where it all happens... */ + l--; + + bzero(outbuf,256); + snprintf(outbuf,255,"%lu",l); + val.data = & outbuf; + val.size = strlen(outbuf); + + /* and put it back.. + * + * Put routines return -1 on error (setting errno), 0 + * on success, and 1 if the R_NOOVERWRITE flag was set + * and the key already exists in the file. + */ +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->put)( r->dbp->handle, NULL, &key, &val, 0); +#else + err=(r->dbp->handle->put)( r->dbp->handle, &key, &val,0); +#endif + + /* just send it back as an ascii string + */ +#ifdef DB_VERSION_MAJOR + if (( err == 0 ) || ( err < 0 )) +#else + if (( err == 0 ) || ( err == 1 )) +#endif + dispatch(r,TOKEN_DEC | F_FOUND,NULL,&val); + else +#ifdef DB_VERSION_MAJOR + { + errno=err; + reply_log(r,L_ERROR,"dec store on %s failed: %s", + r->dbp->name,strerror(errno)); + }; +#else + reply_log(r,L_ERROR,"dec store on %s failed: %s", + r->dbp->name,strerror(errno)); +#endif + }; + +/* atomic packed increment */ +void do_packinc ( connection * r) { + DBT key, val; + int err; + dbms_counter l=0; + unsigned char outbuf[256]; + + memset(&key, 0, sizeof(key)); + memset(&val, 0, sizeof(val)); + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command FETCH"); + return; + }; + + /* all we get from the client is the key, and + * all we return is the (increased) value + */ + key.data = r->v1.data; + key.size = r->v1.size; + +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->get)( r->dbp->handle, NULL, &key, &val, 0); +#else + err=(r->dbp->handle->get)( r->dbp->handle, &key, &val,0); +#endif + +#ifdef DB_VERSION_MAJOR + if ((err == DB_NOTFOUND) || (val.size == 0)) { +#else + if ((err == 1) || (val.size == 0)) { +#endif + dispatch(r,TOKEN_PACKINC | F_NOTFOUND,NULL,NULL); + return; + } + else + if (err) { +#ifdef DB_VERSION_MAJOR + errno=err; +#endif + reply_log(r,L_ERROR,"packinc on %s failed: %s",r->dbp->name, + strerror(errno) ); + return; + }; + + l = ntohl(*(dbms_counter *)val.data); + + /* this is where it all happens... */ + l++; + + val.data = outbuf; + val.size = sizeof(dbms_counter); + + *(dbms_counter *)val.data = htonl(l); + + /* and put it back.. + * + * Put routines return -1 on error (setting errno), 0 + * on success, and 1 if the R_NOOVERWRITE flag was set + * and the key already exists in the file. + */ +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->put)( r->dbp->handle, NULL, &key, &val, 0); +#else + err=(r->dbp->handle->put)( r->dbp->handle, &key, &val,0); +#endif + + /* just send it back as an ascii string + */ +#ifdef DB_VERSION_MAJOR + if (( err == 0 ) || ( err < 0 )) +#else + if (( err == 0 ) || ( err == 1 )) +#endif + dispatch(r,TOKEN_PACKINC | F_FOUND,NULL,&val); + else +#ifdef DB_VERSION_MAJOR + { + errno=err; + reply_log(r,L_ERROR,"packinc store on %s failed: %s", + r->dbp->name,strerror(errno)); + }; +#else + reply_log(r,L_ERROR,"packinc store on %s failed: %s", + r->dbp->name,strerror(errno)); +#endif + }; + +/* atomic packed decrement */ +void do_packdec ( connection * r) { + DBT key, val; + int err; + dbms_counter l=0; + unsigned char outbuf[256]; /* surely shorter than UMAX_LONG */ + + memset(&key, 0, sizeof(key)); + memset(&val, 0, sizeof(val)); + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command FETCH"); + return; + }; + + /* all we get from the client is the key, and + * all we return is the (increased) value + */ + key.data = r->v1.data; + key.size = r->v1.size; + +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->get)( r->dbp->handle, NULL, &key, &val, 0); +#else + err=(r->dbp->handle->get)( r->dbp->handle, &key, &val,0); +#endif + +#ifdef DB_VERSION_MAJOR + if ((err == DB_NOTFOUND) || (val.size == 0)) { +#else + if ((err == 1) || (val.size == 0)) { +#endif + dispatch(r,TOKEN_PACKDEC | F_NOTFOUND,NULL,NULL); + return; + } + else + if (err) { +#ifdef DB_VERSION_MAJOR + errno=err; +#endif + reply_log(r,L_ERROR,"packdec on %s failed: %s",r->dbp->name, + strerror(errno) ); + return; + }; + + l = ntohl(*(dbms_counter *)val.data); + /* this is where it all happens... */ + l--; + + + val.data = outbuf; + val.size = sizeof(uint32_t)+1; + + *(dbms_counter *)val.data = htonl(l); + + /* and put it back.. + * + * Put routines return -1 on error (setting errno), 0 + * on success, and 1 if the R_NOOVERWRITE flag was set + * and the key already exists in the file. + */ +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->put)( r->dbp->handle, NULL, &key, &val, 0); +#else + err=(r->dbp->handle->put)( r->dbp->handle, &key, &val,0); +#endif + + /* just send it back as an ascii string + */ +#ifdef DB_VERSION_MAJOR + if (( err == 0 ) || ( err < 0 )) +#else + if (( err == 0 ) || ( err == 1 )) +#endif + dispatch(r,TOKEN_PACKDEC | F_FOUND,NULL,&val); + else +#ifdef DB_VERSION_MAJOR + { + errno=err; + reply_log(r,L_ERROR,"packdec store on %s failed: %s", + r->dbp->name,strerror(errno)); + }; +#else + reply_log(r,L_ERROR,"packdec store on %s failed: %s", + r->dbp->name,strerror(errno)); +#endif + }; + +void do_exists( connection * r) { + DBT key, val; + int err; + + memset(&key, 0, sizeof(key)); + memset(&val, 0, sizeof(val)); + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command EXISTS"); + return; + }; + + key.data = r->v1.data; + key.size = r->v1.size; + +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->get)( r->dbp->handle, NULL, &key, &val, 0); +#else + err=(r->dbp->handle->get)( r->dbp->handle, &key, &val,0); +#endif + + if ( err == 0 ) + dispatch(r,TOKEN_EXISTS | F_FOUND,NULL,NULL); + else +#ifdef DB_VERSION_MAJOR + if (err == DB_NOTFOUND) + dispatch(r,TOKEN_EXISTS | F_NOTFOUND,NULL,NULL); + else { + errno=err; + reply_log(r,L_ERROR,"exists on %s failed: %s",r->dbp->name,strerror(errno)); + } +#else + if (err == 1) + dispatch(r,TOKEN_EXISTS | F_NOTFOUND,NULL,NULL); + else + reply_log(r,L_ERROR,"exists on %s failed: %s",r->dbp->name,strerror(errno)); +#endif + }; + +void do_delete( connection * r) { + DBT key; + int err; + + memset(&key, 0, sizeof(key)); + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command DELETE"); + return; + }; + + key.data = r->v1.data; + key.size = r->v1.size; + +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->del)( r->dbp->handle, NULL, &key, 0); +#else + err=(r->dbp->handle->del)( r->dbp->handle, &key,0); +#endif + + if ( err == 0 ) + dispatch(r,TOKEN_DELETE | F_FOUND,NULL,NULL); + else +#ifdef DB_VERSION_MAJOR + if (err == DB_NOTFOUND) + dispatch(r,TOKEN_DELETE | F_NOTFOUND,NULL,NULL); + else { + errno=err; + reply_log(r,L_ERROR,"delete on %s failed: %s",r->dbp->name,strerror(errno)); + } +#else + if ( err == 1 ) + dispatch(r,TOKEN_DELETE | F_NOTFOUND,NULL,NULL); + else + reply_log(r,L_ERROR,"delete on %s failed: %s",r->dbp->name,strerror(errno)); +#endif + }; + +void do_store( connection * r) { + DBT key, val; + int err; + + memset(&key, 0, sizeof(key)); + memset(&val, 0, sizeof(val)); + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command STORE"); + return; + }; + + key.data = r->v1.data; + key.size = r->v1.size; + + val.data = r->v2.data; + val.size = r->v2.size; + +#ifdef DB_VERSION_MAJOR + err=(r->dbp->handle->put)( r->dbp->handle, NULL, &key, &val, 0); +#else + err=(r->dbp->handle->put)( r->dbp->handle, &key, &val,0); +#endif + + if ( err == 0 ) + dispatch(r,TOKEN_STORE | F_FOUND,NULL,NULL); /* it was F_NOTFOUND wich was returning always 1 even if not there (F_NOTFOUND) */ + else +#ifdef DB_VERSION_MAJOR + if ( err < 0 ) + dispatch(r,TOKEN_STORE | F_FOUND,NULL,NULL); + else { + errno=err; + reply_log(r,L_ERROR,"store on %s failed: %s",r->dbp->name,strerror(errno)); + }; +#else + if ( err == 1 ) + dispatch(r,TOKEN_STORE | F_NOTFOUND,NULL,NULL); /* it was F_FOUND which was returning always 0 even if already there (F_FOUND) see above dispatch */ + else + reply_log(r,L_ERROR,"store on %s failed: %s",r->dbp->name,strerror(errno)); +#endif + + }; + +void do_sync( connection * r) { + int err=0; + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command SYNC"); + return; + }; + + err=(r->dbp->handle->sync)( r->dbp->handle,0); + + if (err != 0 ) { + reply_log(r,L_ERROR,"sync on %s failed: %s",r->dbp->name,strerror(errno)); + } + else { + dispatch(r,TOKEN_SYNC,NULL,NULL); + }; + }; + +void do_clear( connection * r) { + int err; + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command CLEAR"); + return; + }; + + /* close the database, remove the file, and repoen... ? */ + if ( (_dbclose(r->dbp)) || + ((err=unlink(r->dbp->pfile)) !=0) || + ((err=open_dbp( r->dbp )) != 0) ) + { + reply_log(r,L_ERROR,"clear on %s failed: %s",r->dbp->name,strerror(errno)); + return; + }; + + trace("%6s %12s %s","SYNC",r->dbp->name,eptr(err)); + dispatch(r, TOKEN_CLEAR,NULL, NULL); + }; + +void do_list( connection * r) { +#if 0 + DBT key, val; + int err; + + memset(&key, 0, sizeof(key)); + memset(&val, 0, sizeof(val)); + + /* now the issue here is... do we want to do + * the entire array; as one HUGE malloc ? + */ + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command LIST"); + return; + }; + + /* keep track of whom used the cursor last...*/ + r->dbp->lastfd = r->clientfd; + + f = R_FIRST; + for(;;) { + err=(r->dbp->handle->seq)( r->dbp->handle, &key, &val,f); + if ( err ) last; + f = F_NEXT; + + }; + + if ( err < 0 ) + reply_log(r,L_ERROR,"first on %s failed: %s", + r->dbp->name,strerror(errno)); + else + if ( err == 1 ) + dispatch(r,TOKEN_FIRSTKEY | F_NOTFOUND,NULL,NULL); + else + dispatch(r,TOKEN_LIST | F_FOUND,&key,&val); +#endif + reply_log(r,L_ERROR,"Not implemented.. yet"); + } + +void do_ping( connection * r) { + dispatch(r,TOKEN_PING | F_FOUND,NULL,NULL); + } + +void do_drop( connection * r) { + char dbpath[ 1024 ]; + dbms_log(L_INFORM,"Drop cmd"); + + /* Construct name - add .db where/if needed ?? */ + /* snprintf(dbpath,sizeof(dbpath),"%s.db",r->dbp->pfile); */ + snprintf(dbpath,sizeof(dbpath),"%s",r->dbp->pfile); + + /* or r->dbp->close = 2; */ + zap_dbs(r->dbp); + + if (unlink(dbpath)) + reply_log(r,L_ERROR, + "DB file %s could not be deleted: %s", + dbpath,strerror(errno)); + else + dispatch(r,TOKEN_DROP| F_FOUND,NULL,NULL); +} + +void do_close( connection * r) { + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command CLOSE"); + return; + }; + + dispatch(r,TOKEN_CLOSE,NULL,NULL); + r->close = 1; MX; + } + +/* Combined from function; from first record when flag==R_FIRST or DB_FIRST + * or from the current cursor if flag=R_CURSOR or DB_SET_RANGE. If + * no cursos is yet set; the latter two default to an R_FIRST or DB_FIRST. + */ +static +void _from( connection * r, DBT *key, DBT *val, int flag) { + int err; + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command FIRST/FROM"); + return; + }; + + /* keep track of whom used the cursor last...*/ + r->dbp->lastfd = r->clientfd; + +#ifdef DB_VERSION_MAJOR + err=(r->dbp->cursor->c_get)( r->dbp->cursor, key, val, flag); +#else + err=(r->dbp->handle->seq)( r->dbp->handle, key, val,flag); +#endif + +#if DB_VERSION_MAJOR >= 2 + if (err == DB_NOTFOUND) + dispatch(r,( (flag==DB_FIRST) ? TOKEN_FIRSTKEY : TOKEN_FROM ) | F_NOTFOUND,NULL,NULL); + else + if ( err == 0 ) + dispatch(r,( (flag==DB_FIRST) ? TOKEN_FIRSTKEY : TOKEN_FROM ) | F_FOUND,key,val); + else { + errno=err; + reply_log(r,L_ERROR,"first on %s failed: %s",r->dbp->name,strerror(errno)); + } +#else + if ( err == 1 ) + dispatch(r,( (flag==R_FIRST) ? TOKEN_FIRSTKEY : TOKEN_FROM ) | F_NOTFOUND,NULL,NULL); + else + if ( err == 0 ) + dispatch(r,( (flag==R_FIRST) ? TOKEN_FIRSTKEY : TOKEN_FROM ) | F_FOUND,key,val); + else + reply_log(r,L_ERROR,"first on %s failed: %s",r->dbp->name,strerror(errno)); +#endif +}; + +void do_first(connection * r) { + DBT key, val; + memset(&key, 0, sizeof(key)); + memset(&val, 0, sizeof(val)); + +#if DB_VERSION_MAJOR >= 2 + _from(r,&key,&val,DB_FIRST); +#else + _from(r,&key,&val,R_FIRST); +#endif +} + +void do_from(connection *r) { + DBT key, val; + memset(&key, 0, sizeof(key)); + memset(&val, 0, sizeof(val)); + + key.data = r->v1.data; /* copy the requested closest key */ + key.size = r->v1.size; + +#if DB_VERSION_MAJOR >= 2 + _from(r,&key,&val,DB_SET_RANGE); +#else + _from(r,&key,&val,R_CURSOR); +#endif +}; + + +void do_next( connection * r) { + DBT key, val; + int err; + + memset(&key, 0, sizeof(key)); + memset(&val, 0, sizeof(val)); + + if (r->type != C_CLIENT) { + dbms_log(L_ERROR,"Command received from non-client command NEXT"); + return; + }; + + /* We need to set the cursor first, if we where + * not the last using it. + */ + if ( r->dbp->lastfd != r->clientfd ) { + r->dbp->lastfd = r->clientfd; + key.data = r->v1.data; /* copy the previous key if any */ + key.size = r->v1.size; + +#if DB_VERSION_MAJOR >= 2 + err=(r->dbp->cursor->c_get)(r->dbp->cursor, &key, &val, DB_NEXT); +#else + err=(r->dbp->handle->seq)( r->dbp->handle, &key, &val, R_NEXT); +#endif + +#ifdef DB_VERSION_MAJOR + if ( (err != 0) && (err != DB_NOTFOUND) ) { + reply_log(r,L_ERROR,"Internal DB Error %s",r->dbp->name); + return; + }; +#else + if (err<0 && errno ==0) + dbms_log(L_WARN,"seq-cursor We have the impossible err=%d and %d", + err,errno); + + if ((err != 0) && (err != 1) && (errno != 0) ) { + reply_log(r,L_ERROR,"Internal DB Error %s",r->dbp->name); + return; + }; +#endif + + /* BUG: we could detect the fact that the previous key + * the callee was aware of, has been zapped. For + * now we note that, if the key is not there, we + * have received the next greater key. Which we + * thus return ?! This is an issue. + */ + } + else + err = 0; + + if (err == 0) +#if DB_VERSION_MAJOR >= 2 + err=(r->dbp->cursor->c_get)(r->dbp->cursor, &key, &val, DB_NEXT); +#else + err=(r->dbp->handle->seq)( r->dbp->handle, &key, &val, R_NEXT); +#endif + + trace("%6s %12s %20s: %s %s","NEXT", + r->dbp->name, iprt(&key), + iprt( err==0 ? &val : NULL ),eptr(err)); + +#ifdef DB_VERSION_MAJOR + if ( ( err == DB_NOTFOUND ) || ( err > 0 ) ) + dispatch(r,TOKEN_NEXTKEY | F_NOTFOUND,NULL,NULL); + else +#else + if (( err == 1 ) || (( err <0 ) && (errno == 0)) ) + dispatch(r,TOKEN_NEXTKEY | F_NOTFOUND,NULL,NULL); + else +#endif + if ( err == 0 ) + dispatch(r,TOKEN_NEXTKEY | F_FOUND,&key,&val); + else { +#ifdef DB_VERSION_MAJOR + errno=err; +#endif + reply_log(r,L_ERROR,"next on %s failed: %s",r->dbp->name,strerror(errno)); + }; + }; + +struct command_req cmd_table[ TOKEN_MAX ]; +#define IT(i,s,f,o) { cmd_table[i].cnt = 0; cmd_table[i].cmd = i; cmd_table[i].info = s; cmd_table[i].handler = f; cmd_table[i].op = o; } +void init_cmd_table( void ) +{ + int i; + for(i=0;icmd.token; + + if ( i>=0 && i<= TOKEN_MAX && cmd_table[i].handler) { + if (cmd_table[i].op <= r->op) { + cmd_table[i].cnt++; + (cmd_table[i].handler)(r); + } else { + char * ip = strdup(inet_ntoa(r->address.sin_addr)); + reply_log(r,L_ERROR,"Access violation for %s on %s (required is %s but IP is limited to %s)", + ip,cmd_table[i].info, + op2string(cmd_table[i].op),op2string(r->op)); + free(ip); + r->close = 1; MX; + } + return; + } + + reply_log(r,L_ERROR,"Unkown command token %d",i); + r->close = 1; MX; + return; +} + +/* misc subroutines (copied from ../../backend_bdb_store.c - should be merged) */ + +/* + * The following compare function are used for btree(s) for basic + * XML-Schema data types xsd:integer, xsd:double (and will xsd:date) + * + * They return: + * < 0 if a < b + * = 0 if a = b + * > 0 if a > b + */ +#ifdef BERKELEY_DB_1_OR_2 +static int rdfstore_backend_dbms_compare_int( + const DBT *a, + const DBT *b ) { +#else +static int rdfstore_backend_dbms_compare_int( + DB *file, + const DBT *a, + const DBT *b ) { +#endif + long ai, bi; + + memcpy(&ai, a->data, sizeof(long)); + memcpy(&bi, b->data, sizeof(long)); + + return (ai - bi); + }; + +#ifdef BERKELEY_DB_1_OR_2 +static int rdfstore_backend_dbms_compare_double( + const DBT *a, + const DBT *b ) { +#else +static int rdfstore_backend_dbms_compare_double( + DB *file, + const DBT *a, + const DBT *b ) { +#endif + double ad,bd; + + memcpy(&ad, a->data, sizeof(double)); + memcpy(&bd, b->data, sizeof(double)); + + if ( ad < bd ) { + return -1; + } else if ( ad > bd) { + return 1; + }; + + return 0; + }; Added: incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/handler.h URL: http://svn.apache.org/viewvc/incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/handler.h?view=auto&rev=528394 ============================================================================== --- incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/handler.h (added) +++ incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/handler.h Fri Apr 13 01:56:01 2007 @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2000-2006 Alberto Reggiori + * Dirk-Willem van Gulik + * + * NOTICE + * + * This product is distributed under a BSD/ASF like license as described in the 'LICENSE' + * file you should have received together with this source code. If you did not get a + * a copy of such a license agreement you can pick up one at: + * + * http://rdfstore.sourceforge.net/LICENSE + * + * + * $Id: handler.h,v 1.4 2006/06/19 10:10:22 areggiori Exp $ + */ + +void close_all_dbps(); +void parse_request( connection * r); +void init_cmd_table( void ); Added: incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/loop.c URL: http://svn.apache.org/viewvc/incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/loop.c?view=auto&rev=528394 ============================================================================== --- incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/loop.c (added) +++ incubator/triplesoup/donations/TRIPLES-3-RDFStore/dbms/deamon/loop.c Fri Apr 13 01:56:01 2007 @@ -0,0 +1,267 @@ +/* + * Copyright (c) 2000-2006 Alberto Reggiori + * Dirk-Willem van Gulik + * + * NOTICE + * + * This product is distributed under a BSD/ASF like license as described in the 'LICENSE' + * file you should have received together with this source code. If you did not get a + * a copy of such a license agreement you can pick up one at: + * + * http://rdfstore.sourceforge.net/LICENSE + * + * + * $Id: loop.c,v 1.18 2006/06/19 10:10:22 areggiori Exp $ + */ +#include "dbms.h" +#include "dbms_compat.h" +#include "dbms_comms.h" +#include "dbmsd.h" + +#include "deamon.h" +#include "handler.h" + +/* for debugging.. + */ +char * +show( + int max, + fd_set * all, + fd_set * show + ) +{ int i; + static char out[16*1024]; + out[0]='\0'; + for(i=0; inxt) + if (p->handle) { + (p->handle->sync)(p->handle,0); +#ifdef DB_VERSION_MAJOR + (p->handle->fd)(p->handle, &fd); +#else + fd = (p->handle->fd)(p->handle); +#endif + fsync( fd ); + one++; + }; + + if (one) + sync(); + + dbms_log(L_INFORM,"Synced %d databases and the file system",one); + } + +void +select_loop( void ) +{ + time_t lsync = time(NULL); + /* seconds and micro seconds. */ + struct timeval nill={600,0}; + struct timeval *np = &nill; + + if (!mum_pid) + np = NULL; + + for (;;) { + int n; + time_t now = time(NULL); + struct connection *r, *s; + dbase * p; +#ifdef FORKING + child_rec * d; +#endif + rset=allrset; + wset=allwset; + eset=alleset; + + /* mothers do not time out, or if + * the last cycle was synced and + * was nothing to do... + */ + if ((n=select(maxfd+1,&rset,&wset,&eset,np)) < 0) { + if (errno != EINTR ) + dbms_log(L_ERROR,"RWE Select Probem %s",strerror(errno)); + continue; + }; + + /* not done anything for 15 minutes or so. + * are there any connections outstanding apart + * from the one to mum ? + */ + if ( (n==0) && (mum_pid) && + (!(first_dbp && client_list && client_list->next))) { + + // clients but no dbase ? + assert( ! (client_list) && (client_list->next)); + + // a dbase but no clients ? + assert(! first_dbp); + + dbms_log(L_INFORM,"Nothing to do, this child stops.."); + + exit(0); + } + + /* upon request from alberto... flush + every 5 minutes or so.. see if that + cures the issue since we moved to raid. + */ + if ((mum_pid) && (difftime(now,lsync) > 300)) { + flush_all(); + lsync = now; + /* next round, we can wait for just about forever */ + // if (n == 0) np = NULL; XXX not needed + }; + dbms_log(L_DEBUG,"Read : %s",show(maxfd+1,&allrset,&rset)); + dbms_log(L_DEBUG,"Write : %s",show(maxfd+1,&allrset,&wset)); + dbms_log(L_DEBUG,"Except: %s",show(maxfd+1,&allrset,&eset)); + + /* Is someone knocking on our front door ? + */ + if ((sockfd>=0) && (FD_ISSET(sockfd,&rset))) { + struct sockaddr_in client; + int len=sizeof(client); + int fd; + + if (mum_pid) + dbms_log(L_ERROR,"Should not get such an accept()"); + else + if ((fd = accept(sockfd, + ( struct sockaddr *) &client, &len)) <0) + dbms_log(L_ERROR,"Could not accept"); + else { + tops level = allowed_ops(client.sin_addr.s_addr); + dbms_log(L_DEBUG,"Accept(%d) op level for IP=%s: %s", + fd,inet_ntoa(client.sin_addr),op2string(level)); + + if (level > T_NONE) + handle_new_connection(fd, C_NEW_CLIENT, client); + else { + dbms_log(L_ERROR,"Accept violation: %s rejected.", + inet_ntoa(client.sin_addr)); + close(fd); + } + } + } + + /* note that for the pthreads we rely on a mark-and-sweep + * style of garbage collect. + */ + if (client_list != NULL) for ( s = client_list; s != NULL; ) { + /* Page early, as the record might get zapped + * and taken out of the lists in this loop. + */ + assert( s != NULL ); + r=s; s=r->next; + + assert( r != s ); + if (r->close) + continue; + + if (FD_ISSET(r->clientfd,&rset)) { + int trapit=getpid(); // trap forks. + if (r->tosend != 0) { + dbms_log(L_ERROR,"read request received while working on send"); + zap(r); + continue; + } + dbms_log(L_DEBUG,"read F=%d R%d W%d E%d", + r->clientfd, + FD_ISSET(r->clientfd,&rset) ? 1 : 0, + FD_ISSET(r->clientfd,&wset) ? 1 : 0, + FD_ISSET(r->clientfd,&eset) ? 1 : 0 + ); + + if (r->toget == 0) + initial_read(r); + else + continue_read(r); + + if (trapit != getpid()) + break; +#ifdef TIMEOUT + r->last=time(NULL); +#endif + if (r->close) + continue; + }; + + if (FD_ISSET(r->clientfd,&wset)) { + if (r->tosend >= 0 ) + continue_send(r); + else + dbms_log(L_ERROR,"write select while not expecting to write"); +#ifdef TIMEOUT + r->last=time(NULL); +#endif + if (r->close) + continue; + }; + +// XXX this eset is a pointless +// excersize, perhaps ?? +// only seen on linux-RH5.1 +// + if (FD_ISSET(r->clientfd,&eset)) { + dbms_log(L_ERROR,"Some exception. Unexpected"); + r->close = 1; MX; +#ifdef TIMEOUT + r->last=time(NULL); +#endif + }; +#ifdef TIMEOUT + if (difftime( r->last, time(NULL) ) > TIMEOUT) { + inform("Timeout, closed the connection."); + r->close =1; MX; + }; +#endif + }; /* client set loop */ + + /* clean up operations... + * note the order + */ + for ( s=client_list; s != NULL; ) { + r=s; s=r->next; + assert( r != s ); + if ( r->close ) { + dbms_log(L_DEBUG,"General clean %d",r->clientfd); + zap(r); + }; + }; + +#ifdef FORKING + for(d=children;d;) { + child_rec * e = d; d=d->nxt; + assert( d != e ); + if (e->close) + zap_child( e ); + }; +#endif + + for(p=first_dbp; p;) { + dbase * q=p; p=p->nxt; + assert( p != q ); + if (q->close) + zap_dbs(q); + }; + + }; /* Forever.. */ + } /* of main */