[netfilter-cvslog] r6307 - branches/ulog/ulogd2/output/mysql

laforge at netfilter.org laforge at netfilter.org
Thu Dec 8 13:42:08 CET 2005


Author: laforge at netfilter.org
Date: 2005-12-08 13:42:06 +0100 (Thu, 08 Dec 2005)
New Revision: 6307

Modified:
   branches/ulog/ulogd2/output/mysql/ulogd_output_MYSQL.c
Log:
convert mysql to new DB api


Modified: branches/ulog/ulogd2/output/mysql/ulogd_output_MYSQL.c
===================================================================
--- branches/ulog/ulogd2/output/mysql/ulogd_output_MYSQL.c	2005-12-08 12:41:44 UTC (rev 6306)
+++ branches/ulog/ulogd2/output/mysql/ulogd_output_MYSQL.c	2005-12-08 12:42:06 UTC (rev 6307)
@@ -48,6 +48,8 @@
 #include <ulogd/ulogd.h>
 #include <ulogd/conffile.h>
 
+#include "../../util/db.c"
+
 #ifdef DEBUG_MYSQL
 #define DEBUGP(x, args...)	fprintf(stderr, x, ## args)
 #else
@@ -55,20 +57,15 @@
 #endif
 
 struct mysql_instance {
+	struct db_instance db_inst;
 	MYSQL *dbh; /* the database handle we are using */
-	char *stmt; /* buffer for our insert statement */
-	char *stmt_val; /* pointer to the beginning of the "VALUES" part */
-	char *stmt_ins; /* pointer to current inser position in statement */
-	time_t reconnect; /* Attempt to reconnect if connection is lost */
-
-	int (*interp)(struct ulogd_pluginstance *upi);
 };
-#define TIME_ERR		((time_t)-1)	/* Be paranoid */
 
 /* our configuration directives */
-static struct config_keyset mysql_kset = {
-	.num_ces = 9,
+static struct config_keyset kset_mysql = {
+	.num_ces = DB_CE_NUM+5,
 	.ces = {
+		DB_CES,
 		{
 			.key = "db", 
 			.type = CONFIG_TYPE_STRING,
@@ -90,234 +87,18 @@
 			.options = CONFIG_OPT_MANDATORY,
 		},
 		{
-			.key = "table", 
-			.type = CONFIG_TYPE_STRING,
-			.options = CONFIG_OPT_MANDATORY,
-		},
-		{
 			.key = "port",
 			.type = CONFIG_TYPE_INT,
 		},
-		{
-			.key = "reconnect",
-			.type = CONFIG_TYPE_INT,
-		},
-		{
-			.key = "connect_timeout",
-			.type = CONFIG_TYPE_INT,
-		},
-		{
-			.key = "ip_as_string",
-			.type = CONFIG_TYPE_INT,
-		},
 	},
 };
-#define db_ce(x)	(x->ces[0])
-#define	host_ce(x)	(x->ces[1])
-#define user_ce(x)	(x->ces[2])
-#define pass_ce(x)	(x->ces[3])
-#define table_ce(x)	(x->ces[4])
-#define port_ce(x)	(x->ces[5])
-#define reconnect_ce(x)	(x->ces[6])
-#define timeout_ce(x)	(x->ces[7])
-#define asstring_ce(x)	(x->ces[8])
-
-static struct ulogd_plugin mysql_plugin;
-
-static int _mysql_init_db(struct ulogd_pluginstance *upi);
-
-/* this is a wrapper that just calls the current real
- * interp function */
-static int interp_mysql(struct ulogd_pluginstance *upi)
-{
-	struct mysql_instance *mi = (struct mysql_instance *) &upi->private;
-	return mi->interp(upi);
-}
-
-/* our main output function, called by ulogd */
-static int __interp_mysql(struct ulogd_pluginstance *upi)
-{
-	struct mysql_instance *mi = (struct mysql_instance *) &upi->private;
-	int i;
-
-	mi->stmt_ins = mi->stmt_val;
-
-	for (i = 0; i < upi->input.num_keys; i++) { 
-		struct ulogd_key *res = upi->input.keys[i].u.source;
-
-		if (upi->input.keys[i].flags & ULOGD_KEYF_INACTIVE)
-			continue;
-
-		if (!res)
-			ulogd_log(ULOGD_NOTICE, "no source for `%s' ?!?\n",
-				  upi->input.keys[i].name);
-			
-		if (!res || !IS_VALID(*res)) {
-			/* no result, we have to fake something */
-			mi->stmt_ins += sprintf(mi->stmt_ins, "NULL,");
-			continue;
-		}
-		
-		switch (res->type) {
-			char *tmpstr;
-			struct in_addr addr;
-		case ULOGD_RET_INT8:
-			sprintf(mi->stmt_ins, "%d,", res->u.value.i8);
-			break;
-		case ULOGD_RET_INT16:
-			sprintf(mi->stmt_ins, "%d,", res->u.value.i16);
-			break;
-		case ULOGD_RET_INT32:
-			sprintf(mi->stmt_ins, "%d,", res->u.value.i32);
-			break;
-		case ULOGD_RET_INT64:
-			sprintf(mi->stmt_ins, "%lld,", res->u.value.i64);
-			break;
-		case ULOGD_RET_UINT8:
-			sprintf(mi->stmt_ins, "%u,", res->u.value.ui8);
-			break;
-		case ULOGD_RET_UINT16:
-			sprintf(mi->stmt_ins, "%u,", res->u.value.ui16);
-			break;
-		case ULOGD_RET_IPADDR:
-			if (asstring_ce(upi->config_kset).u.value) {
-				memset(&addr, 0, sizeof(addr));
-				addr.s_addr = ntohl(res->u.value.ui32);
-				*(mi->stmt_ins++) = '\'';
-				tmpstr = inet_ntoa(addr);
-			#ifdef OLD_MYSQL
-				mysql_escape_string(mi->stmt_ins, tmpstr,
-						    strlen(tmpstr));
-			#else
-				mysql_real_escape_string(mi->dbh, mi->stmt_ins,
-							 tmpstr,
-						 	strlen(tmpstr));
-			#endif /* OLD_MYSQL */
-                                mi->stmt_ins = mi->stmt + strlen(mi->stmt);
-				sprintf(mi->stmt_ins, "',");
-				break;
-			}
-			/* fallthrough when logging IP as u_int32_t */
-		case ULOGD_RET_UINT32:
-			sprintf(mi->stmt_ins, "%u,", res->u.value.ui32);
-			break;
-		case ULOGD_RET_UINT64:
-			sprintf(mi->stmt_ins, "%llu,", res->u.value.ui64);
-			break;
-		case ULOGD_RET_BOOL:
-			sprintf(mi->stmt_ins, "'%d',", res->u.value.b);
-			break;
-		case ULOGD_RET_STRING:
-			*(mi->stmt_ins++) = '\'';
-			if (res->u.value.ptr) {
-			#ifdef OLD_MYSQL
-				mi->stmt_ins += mysql_escape_string(mi->stmt_ins, 
-							res->u.value.ptr,
-							strlen(res->u.value.ptr));
-			#else
-				mi->stmt_ins += mysql_real_escape_string(
-							mi->dbh, mi->stmt_ins, 
-							res->u.value.ptr, 
-							strlen(res->u.value.ptr));
-			#endif
-			}
-			sprintf(mi->stmt_ins, "',");
-			break;
-		case ULOGD_RET_RAW:
-			ulogd_log(ULOGD_NOTICE,
-				"%s: type RAW not supported by MySQL\n",
-				upi->input.keys[i].name);
-			break;
-		default:
-			ulogd_log(ULOGD_NOTICE,
-				"unknown type %d for %s\n",
-				res->type, upi->input.keys[i].name);
-			break;
-		}
-		mi->stmt_ins = mi->stmt + strlen(mi->stmt);
-	}
-	*(mi->stmt_ins - 1) = ')';
-	DEBUGP("stmt=#%s#\n", mi->stmt);
-
-	/* now we have created our statement, insert it */
-
-	if (mysql_real_query(mi->dbh, mi->stmt, strlen(mi->stmt))) {
-		ulogd_log(ULOGD_ERROR, "sql error during insert: %s\n",
-			  mysql_error(mi->dbh));
-		return _mysql_init_db(upi);
-	}
-
-	return 0;
-}
-
-/* no connection, plugin disabled */
-static int mysql_output_disabled(struct ulogd_pluginstance *upi)
-{
-	return 0;
-}
-
-#define MYSQL_INSERTTEMPL   "insert into X (Y) values (Z)"
-#define MYSQL_VALSIZE	100
-
-/* create the static part of our insert statement */
-static int mysql_createstmt(struct ulogd_pluginstance *upi)
-{
-	struct mysql_instance *mi = (struct mysql_instance *) upi->private;
-	struct _field *f;
-	unsigned int size;
-	char buf[ULOGD_MAX_KEYLEN];
-	char *underscore;
-	int i;
-
-	if (mi->stmt)
-		free(mi->stmt);
-
-	/* caclulate the size for the insert statement */
-	size = strlen(MYSQL_INSERTTEMPL) + 
-				strlen(table_ce(upi->config_kset).u.string);
-
-	for (i = 0; i < upi->input.num_keys; i++) {
-		if (upi->input.keys[i].flags & ULOGD_KEYF_INACTIVE)
-			continue;
-		/* we need space for the key and a comma, as well as
-		 * enough space for the values */
-		size += strlen(upi->input.keys[i].name) + 1 + MYSQL_VALSIZE;
-	}	
-
-	ulogd_log(ULOGD_DEBUG, "allocating %u bytes for statement\n", size);
-
-	mi->stmt = (char *) malloc(size);
-	if (!mi->stmt) {
-		ulogd_log(ULOGD_ERROR, "OOM!\n");
-		return -ENOMEM;
-	}
-
-	sprintf(mi->stmt, "insert into %s (",
-		table_ce(upi->config_kset).u.string);
-	mi->stmt_val = mi->stmt + strlen(mi->stmt);
-
-	for (i = 0; i < upi->input.num_keys; i++) {
-		if (upi->input.keys[i].flags & ULOGD_KEYF_INACTIVE)
-			continue;
-
-		strncpy(buf, upi->input.keys[i].name, ULOGD_MAX_KEYLEN);	
-		while ((underscore = strchr(buf, '.')))
-			*underscore = '_';
-		sprintf(mi->stmt_val, "%s,", buf);
-		mi->stmt_val = mi->stmt + strlen(mi->stmt);
-	}
-	*(mi->stmt_val - 1) = ')';
-
-	sprintf(mi->stmt_val, " values (");
-	mi->stmt_val = mi->stmt + strlen(mi->stmt);
-
-	ulogd_log(ULOGD_DEBUG, "stmt='%s'\n", mi->stmt);
-
-	return 0;
-}
-
+#define db_ce(x)	(x->ces[DB_CE_NUM+0])
+#define	host_ce(x)	(x->ces[DB_CE_NUM+1])
+#define user_ce(x)	(x->ces[DB_CE_NUM+2])
+#define pass_ce(x)	(x->ces[DB_CE_NUM+3])
+#define port_ce(x)	(x->ces[DB_CE_NUM+4])
 /* find out which columns the table has */
-static int mysql_get_columns(struct ulogd_pluginstance *upi)
+static int get_columns_mysql(struct ulogd_pluginstance *upi)
 {
 	struct mysql_instance *mi = (struct mysql_instance *) upi->private;
 	MYSQL_RES *result;
@@ -372,7 +153,7 @@
 
 		DEBUGP("field '%s' found\n", buf);
 
-		/* add it u list of input keys */
+		/* add it to list of input keys */
 		strncpy(upi->input.keys[i].name, buf, ULOGD_MAX_KEYLEN);
 	}
 	/* MySQL Auto increment ... ID :) */
@@ -382,12 +163,23 @@
 	return 0;
 }
 
+static int close_db_mysql(struct ulogd_pluginstance *upi)
+{
+	struct mysql_instance *mi = (struct mysql_instance *) upi->private;
+	mysql_close(mi->dbh);
+	return 0;
+}
+
 /* make connection and select database */
-static int open_db(struct ulogd_pluginstance *upi, char *server,
-		   int port, char *user, char *pass, char *db)
+static int open_db_mysql(struct ulogd_pluginstance *upi)
 {
 	struct mysql_instance *mi = (struct mysql_instance *) upi->private;
 	unsigned int connect_timeout = timeout_ce(upi->config_kset).u.value;
+	char *server = host_ce(upi->config_kset).u.string;
+	u_int16_t port = port_ce(upi->config_kset).u.value;
+	char *user = user_ce(upi->config_kset).u.string;
+	char *pass = pass_ce(upi->config_kset).u.string;
+	char *db = pass_ce(upi->config_kset).u.string;
 
 	mi->dbh = mysql_init(NULL);
 	if (!mi->dbh) {
@@ -407,157 +199,56 @@
 	return 0;
 }
 
-static int init_reconnect(struct ulogd_pluginstance *upi)
+static int escape_string_mysql(struct ulogd_pluginstance *upi,
+				char *dst, const char *src, unsigned int len)
 {
 	struct mysql_instance *mi = (struct mysql_instance *) upi->private;
-	if (reconnect_ce(upi->config_kset).u.value) {
-		mi->reconnect = time(NULL);
-		if (mi->reconnect != TIME_ERR) {
-			ulogd_log(ULOGD_ERROR, "no connection to database, "
-				  "attempting to reconnect after %u seconds\n",
-				  reconnect_ce(upi->config_kset).u.value);
-			mi->reconnect += reconnect_ce(upi->config_kset).u.value;
-			mi->interp = &_mysql_init_db;
-			return -1;
-		}
-	}
 
-	/* Disable plugin permanently */
-	ulogd_log(ULOGD_ERROR, "permanently disabling plugin\n");
-	mi->interp = &mysql_output_disabled;
-	
-	return 0;
+#ifdef OLD_MYSQL
+	return mysql_escape_string(dst, src, len);
+#else
+	return mysql_real_escape_string(mi->dbh, dst, src, len);
+#endif /* OLD_MYSQL */
 }
 
-static int _mysql_init_db(struct ulogd_pluginstance *upi)
+static int execute_mysql(struct ulogd_pluginstance *upi,
+			 const char *stmt, unsigned int len)
 {
 	struct mysql_instance *mi = (struct mysql_instance *) upi->private;
-
-	if (mi->reconnect && mi->reconnect > time(NULL))
-		return 0;
-	
-	if (open_db(upi, host_ce(upi->config_kset).u.string,
-		    port_ce(upi->config_kset).u.value,
-		    user_ce(upi->config_kset).u.string, 
-		    pass_ce(upi->config_kset).u.string,
-		    db_ce(upi->config_kset).u.string)) {
-		ulogd_log(ULOGD_ERROR, "can't establish database connection\n");
-		return init_reconnect(upi);
-	}
-
-#if 0
-	/* read the fieldnames to know which values to insert */
-	if (mysql_get_columns(table_ce.u.string)) {
-		ulogd_log(ULOGD_ERROR, "unable to get mysql columns\n");
-		return init_reconnect();
-	}
-	mysql_createstmt();
-#endif	
-	/* enable 'real' logging */
-	mi->interp = &__interp_mysql;
-
-	mi->reconnect = 0;
-
-	/* call the interpreter function to actually write the
-	 * log line that we wanted to write */
-	return __interp_mysql(upi);
-}
-
-static int configure_mysql(struct ulogd_pluginstance *upi,
-			   struct ulogd_pluginstance_stack *stack)
-{
-	struct mysql_instance *mi = (struct mysql_instance *) upi->private;
 	int ret;
 
-	ulogd_log(ULOGD_NOTICE, "(re)configuring\n");
+	ret = mysql_real_query(mi->dbh, stmt, len);
+	if (ret)
+		return -1;
 
-	/* Assign the default interp function */
-	mi->interp = &__interp_mysql;
-
-	/* First: Parse configuration file section for this instance */
-	ret = config_parse_file(upi->id, upi->config_kset);
-	if (ret < 0) {
-		ulogd_log(ULOGD_ERROR, "error parsing config file\n");
-		return ret;
-	}
-
-	/* Second: Open Database */
-	ret = open_db(upi, host_ce(upi->config_kset).u.string,
-		      port_ce(upi->config_kset).u.value,
-		      user_ce(upi->config_kset).u.string,
-		      pass_ce(upi->config_kset).u.string,
-		      db_ce(upi->config_kset).u.string);
-	if (ret < 0) {
-		ulogd_log(ULOGD_ERROR, "error in open_db\n");
-		return ret;
-	}
-
-	/* Third: Determine required input keys for given table */
-	ret = mysql_get_columns(upi);
-	if (ret < 0)
-		ulogd_log(ULOGD_ERROR, "error in get_columns\n");
-	
-	/* Close database, since ulogd core could just call configure
-	 * but abort during input key resolving routines.  configure
-	 * doesn't have a destructor... */
-	mysql_close(mi->dbh);
-	
-	return ret;
+	return 0;
 }
 
-static int start_mysql(struct ulogd_pluginstance *upi)
+static char *strerror_mysql(struct ulogd_pluginstance *upi)
 {
 	struct mysql_instance *mi = (struct mysql_instance *) upi->private;
-	int ret;
-
-	ulogd_log(ULOGD_NOTICE, "starting\n");
-
-	ret = open_db(upi, host_ce(upi->config_kset).u.string,
-		      port_ce(upi->config_kset).u.value,
-		      user_ce(upi->config_kset).u.string,
-		      pass_ce(upi->config_kset).u.string,
-		      db_ce(upi->config_kset).u.string);
-	if (ret < 0)
-		return ret;
-
-	ret = mysql_createstmt(upi);
-	if (ret < 0)
-		mysql_close(mi->dbh);
-
-	return ret;
+	return (char *) mysql_error(mi->dbh);
 }
 
-static int stop_mysql(struct ulogd_pluginstance *upi)
+static struct db_driver db_driver_mysql = {
+	.get_columns	= &get_columns_mysql,
+	.open_db	= &open_db_mysql,
+	.close_db	= &close_db_mysql,
+	.escape_string	= &escape_string_mysql,
+	.execute	= &execute_mysql,
+	.strerror	= &strerror_mysql,
+};
+
+static int configure_mysql(struct ulogd_pluginstance *upi,
+			   struct ulogd_pluginstance_stack *stack)
 {
-	struct mysql_instance *mi = (struct mysql_instance *) upi->private;
+	struct db_instance *di = (struct db_instance *) &upi->private;
+	di->driver = &db_driver_mysql;
 
-	ulogd_log(ULOGD_NOTICE, "stopping\n");
-	mysql_close(mi->dbh);
-
-	/* try to free our dynamically allocated input key array */
-	if (upi->input.keys) {
-		free(upi->input.keys);
-		upi->input.keys = NULL;
-	}
-	return 0;
+	return configure_db(upi, stack);
 }
 
-static void signal_mysql(struct ulogd_pluginstance *upi,
-			 int signal)
-{
-	switch (signal) {
-	case SIGHUP:
-		/* reopen database connection */
-		stop_mysql(upi);
-		start_mysql(upi);
-		break;
-	default:
-		break;
-	}
-}
-
-
-static struct ulogd_plugin mysql_plugin = {
+static struct ulogd_plugin plugin_mysql = {
 	.name = "MYSQL",
 	.input = {
 		.keys = NULL,
@@ -567,13 +258,13 @@
 	.output = {
 		.type = ULOGD_DTYPE_SINK,
 	},
-	.config_kset = &mysql_kset,
+	.config_kset = &kset_mysql,
 	.priv_size = sizeof(struct mysql_instance),
 	.configure = &configure_mysql,
-	.start	   = &start_mysql,
-	.stop	   = &stop_mysql,
-	.signal	   = &signal_mysql,
-	.interp	   = &interp_mysql,
+	.start	   = &start_db,
+	.stop	   = &stop_db,
+	.signal	   = &signal_db,
+	.interp	   = &interp_db,
 	.version   = ULOGD_VERSION,
 };
 
@@ -581,5 +272,5 @@
 
 void init(void) 
 {
-	ulogd_register_plugin(&mysql_plugin);
+	ulogd_register_plugin(&plugin_mysql);
 }




More information about the netfilter-cvslog mailing list