[netfilter-cvslog] r7509 - in trunk/conntrack-tools: . include src

pablo at netfilter.org pablo at netfilter.org
Sat Apr 26 18:07:04 CEST 2008


Author: pablo at netfilter.org
Date: 2008-04-26 18:07:00 +0200 (Sat, 26 Apr 2008)
New Revision: 7509

Modified:
   trunk/conntrack-tools/ChangeLog
   trunk/conntrack-tools/include/network.h
   trunk/conntrack-tools/src/network.c
   trunk/conntrack-tools/src/queue.c
   trunk/conntrack-tools/src/sync-ftfw.c
   trunk/conntrack-tools/src/sync-mode.c
Log:
rework of the FT-FW approach


Modified: trunk/conntrack-tools/ChangeLog
===================================================================
--- trunk/conntrack-tools/ChangeLog	2008-04-24 03:45:46 UTC (rev 7508)
+++ trunk/conntrack-tools/ChangeLog	2008-04-26 16:07:00 UTC (rev 7509)
@@ -24,6 +24,7 @@
 o fix asymmetric path support (reported by Gary Richards)
 o improve netlink overrun handling
 o add more verbose error notification when we fail to inject a conntrack
+o rework of the FT-FW approach
 
 version 0.9.6 (2008/03/08)
 ------------------------------

Modified: trunk/conntrack-tools/include/network.h
===================================================================
--- trunk/conntrack-tools/include/network.h	2008-04-24 03:45:46 UTC (rev 7508)
+++ trunk/conntrack-tools/include/network.h	2008-04-26 16:07:00 UTC (rev 7509)
@@ -26,20 +26,18 @@
 #define NETHDR_ACK_SIZ sizeof(struct nethdr_ack)
 
 enum {
-	NET_F_HELLO_BIT = 0,
-	NET_F_HELLO = (1 << NET_F_HELLO_BIT),
+	NET_F_UNUSED 	= (1 << 0),
+	NET_F_RESYNC 	= (1 << 1),
+	NET_F_NACK 	= (1 << 2),
+	NET_F_ACK 	= (1 << 3),
+	NET_F_ALIVE 	= (1 << 4),
+};
 
-	NET_F_RESYNC_BIT = 1,
-	NET_F_RESYNC = (1 << NET_F_RESYNC_BIT),
-
-	NET_F_NACK_BIT = 2,
-	NET_F_NACK = (1 << NET_F_NACK_BIT),
-
-	NET_F_ACK_BIT = 3,
-	NET_F_ACK = (1 << NET_F_ACK_BIT),
-
-	NET_F_ALIVE_BIT = 4,
-	NET_F_ALIVE = (1 << NET_F_ALIVE_BIT),
+enum {
+	MSG_DATA,
+	MSG_CTL,
+	MSG_DROP,
+	MSG_BAD,
 };
 
 #define BUILD_NETMSG(ct, query)					\
@@ -57,7 +55,18 @@
 size_t prepare_send_netmsg(struct mcast_sock *m, void *data);
 int mcast_send_netmsg(struct mcast_sock *m, void *data);
 int handle_netmsg(struct nethdr *net);
+
+enum {
+	SEQ_UNKNOWN,
+	SEQ_UNSET,
+	SEQ_IN_SYNC,
+	SEQ_AFTER,
+	SEQ_BEFORE,
+};
+
 int mcast_track_seq(uint32_t seq, uint32_t *exp_seq);
+void mcast_track_update_seq(uint32_t seq);
+int mcast_track_is_seq_set(void);
 
 struct mcast_conf;
 
@@ -66,13 +75,12 @@
 int mcast_buffered_send_netmsg(struct mcast_sock *m, void *data, size_t len);
 ssize_t mcast_buffered_pending_netmsg(struct mcast_sock *m);
 
-#define IS_DATA(x)	((x->flags & ~NET_F_HELLO) == 0)
+#define IS_DATA(x)	(x->flags == 0)
 #define IS_ACK(x)	(x->flags & NET_F_ACK)
 #define IS_NACK(x)	(x->flags & NET_F_NACK)
 #define IS_RESYNC(x)	(x->flags & NET_F_RESYNC)
 #define IS_ALIVE(x)	(x->flags & NET_F_ALIVE)
 #define IS_CTL(x)	IS_ACK(x) || IS_NACK(x) || IS_RESYNC(x) || IS_ALIVE(x)
-#define IS_HELLO(x)	(x->flags & NET_F_HELLO)
 
 #define HDR_NETWORK2HOST(x)						\
 ({									\

Modified: trunk/conntrack-tools/src/network.c
===================================================================
--- trunk/conntrack-tools/src/network.c	2008-04-24 03:45:46 UTC (rev 7508)
+++ trunk/conntrack-tools/src/network.c	2008-04-26 16:07:00 UTC (rev 7509)
@@ -33,13 +33,14 @@
 
 #undef _TEST_DROP
 #ifdef _TEST_DROP
-	static int drop = 0;
 
-	if (++drop >= 10) {
+#define DROP_RATE .25
+
+	/* simulate message omission with a certain probability */
+	if ((random() & 0x7FFFFFFF) < 0x80000000 * DROP_RATE) {
 		printf("drop sq: %u fl:%u len:%u\n",
 			ntohl(net->seq), ntohs(net->flags),
 			ntohs(net->len));
-		drop = 0;
 		return 0;
 	}
 #endif
@@ -57,7 +58,6 @@
 	if (!seq_set) {
 		seq_set = 1;
 		cur_seq = time(NULL);
-		net->flags |= NET_F_HELLO;
 	}
 	net->len = len;
 	net->seq = cur_seq++;
@@ -181,9 +181,6 @@
 
 	HDR_NETWORK2HOST(net);
 
-	if (IS_HELLO(net))
-		STATE_SYNC(last_seq_recv) = net->seq - 1;
-
 	if (IS_CTL(net))
 		return 0;
 
@@ -198,37 +195,51 @@
 	return 0;
 }
 
+static int local_seq_set = 0;
+
+/* this function only tracks, it does not update the last sequence received */
 int mcast_track_seq(uint32_t seq, uint32_t *exp_seq)
 {
-	static int local_seq_set = 0;
-	int ret = 1;
+	int ret = SEQ_UNKNOWN;
 
 	/* netlink sequence tracking initialization */
 	if (!local_seq_set) {
-		local_seq_set = 1;
+		ret = SEQ_UNSET;
 		goto out;
 	}
 
 	/* fast path: we received the correct sequence */
-	if (seq == STATE_SYNC(last_seq_recv)+1)
+	if (seq == STATE_SYNC(last_seq_recv)+1) {
+		ret = SEQ_IN_SYNC;
 		goto out;
+	}
 
 	/* out of sequence: some messages got lost */
 	if (after(seq, STATE_SYNC(last_seq_recv)+1)) {
 		STATE_SYNC(packets_lost) += seq-STATE_SYNC(last_seq_recv)+1;
-		ret = 0;
+		ret = SEQ_AFTER;
 		goto out;
 	}
 
 	/* out of sequence: replayed/delayed packet? */
 	if (before(seq, STATE_SYNC(last_seq_recv)+1))
-		dlog(LOG_WARNING, "delayed packet? exp=%u rcv=%u",
-		     STATE_SYNC(last_seq_recv)+1, seq);
+		ret = SEQ_BEFORE;
 
 out:
 	*exp_seq = STATE_SYNC(last_seq_recv)+1;
-	/* update expected sequence */
-	STATE_SYNC(last_seq_recv) = seq;
 
 	return ret;
 }
+
+void mcast_track_update_seq(uint32_t seq)
+{
+	if (!local_seq_set)
+		local_seq_set = 1;
+
+	STATE_SYNC(last_seq_recv) = seq;
+}
+
+int mcast_track_is_seq_set()
+{
+	return local_seq_set;
+}

Modified: trunk/conntrack-tools/src/queue.c
===================================================================
--- trunk/conntrack-tools/src/queue.c	2008-04-24 03:45:46 UTC (rev 7508)
+++ trunk/conntrack-tools/src/queue.c	2008-04-26 16:07:00 UTC (rev 7509)
@@ -93,7 +93,7 @@
 		goto err;
 	}
 
-	list_add(&n->head, &b->head);
+	list_add_tail(&n->head, &b->head);
 	b->cur_size += size;
 	b->num_elems++;
 

Modified: trunk/conntrack-tools/src/sync-ftfw.c
===================================================================
--- trunk/conntrack-tools/src/sync-ftfw.c	2008-04-24 03:45:46 UTC (rev 7508)
+++ trunk/conntrack-tools/src/sync-ftfw.c	2008-04-26 16:07:00 UTC (rev 7509)
@@ -34,12 +34,27 @@
 #define dp(...)
 #endif
 
+#if 0 
+#define dprint printf
+#else
+#define dprint(...)
+#endif
+
 static LIST_HEAD(rs_list);
 static LIST_HEAD(tx_list);
+static unsigned int rs_list_len;
 static unsigned int tx_list_len;
 static struct queue *rs_queue;
 static struct queue *tx_queue;
+static uint32_t exp_seq;
+static uint32_t window;
+static uint32_t ack_from;
+static int ack_from_set = 0;
+static struct alarm_block alive_alarm;
 
+/* XXX: alive message expiration configurable */
+#define ALIVE_INT 1
+
 struct cache_ftfw {
 	struct list_head 	rs_list;
 	struct list_head	tx_list;
@@ -64,6 +79,7 @@
 
 	/* no need for list_del_init since the entry is destroyed */
 	list_del(&cn->rs_list);
+	rs_list_len--;
 }
 
 static struct cache_extra cache_ftfw_extra = {
@@ -83,15 +99,57 @@
 	queue_add(tx_queue, &ack, NETHDR_ACK_SIZ);
 }
 
-static struct alarm_block alive_alarm;
+static void ftfw_run(void);
 
+/* this function is called from the alarm framework */
 static void do_alive_alarm(struct alarm_block *a, void *data)
 {
-	tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
+	if (ack_from_set && mcast_track_is_seq_set()) {
+		/* exp_seq contains the last update received */
+		dprint("send ALIVE ACK (from=%u, to=%u)\n",
+			ack_from, STATE_SYNC(last_seq_recv));
+		tx_queue_add_ctlmsg(NET_F_ACK,
+				    ack_from,
+				    STATE_SYNC(last_seq_recv));
+		ack_from_set = 0;
+	} else
+		tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
 
-	add_alarm(&alive_alarm, 1, 0);
+	/* TODO: no need for buffered send, extracted from run_sync() */
+	ftfw_run();
+	mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
 }
 
+#undef _SIGNAL_DEBUG
+#ifdef _SIGNAL_DEBUG
+
+static int rs_dump(void *data1, const void *data2)
+{
+	struct nethdr_ack *net = data1;
+
+	dprint("in RS queue -> seq:%u flags:%u\n", net->seq, net->flags);
+
+	return 0;
+}
+
+#include <signal.h>
+
+static void my_dump(int foo)
+{
+	struct cache_ftfw *cn, *tmp;
+
+	list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
+		struct us_conntrack *u;
+		
+		u = cache_get_conntrack(STATE_SYNC(internal), cn);
+		dprint("in RS list -> seq:%u\n", cn->seq);
+	}
+
+	queue_iterate(rs_queue, NULL, rs_dump);
+}
+
+#endif
+
 static int ftfw_init(void)
 {
 	tx_queue = queue_create(CONFIG(resend_queue_size));
@@ -106,13 +164,16 @@
 		return -1;
 	}
 
-	INIT_LIST_HEAD(&tx_list);
-	INIT_LIST_HEAD(&rs_list);
-
-	/* XXX: alive message expiration configurable */
 	init_alarm(&alive_alarm, NULL, do_alive_alarm);
-	add_alarm(&alive_alarm, 1, 0);
+	add_alarm(&alive_alarm, ALIVE_INT, 0);
 
+	/* set ack window size */
+	window = CONFIG(window_size);
+
+#ifdef _SIGNAL_DEBUG
+	signal(SIGUSR1, my_dump);
+#endif
+
 	return 0;
 }
 
@@ -128,7 +189,7 @@
 	struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), u);
 
 	/* add to tx list */
-	list_add(&cn->tx_list, &tx_list);
+	list_add_tail(&cn->tx_list, &tx_list);
 	tx_list_len++;
 
 	return 0;
@@ -157,13 +218,14 @@
 
 static int rs_queue_to_tx(void *data1, const void *data2)
 {
-	struct nethdr *net = data1;
+	struct nethdr_ack *net = data1;
 	const struct nethdr_ack *nack = data2;
 
 	if (between(net->seq, nack->from, nack->to)) {
 		dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
 			net->seq, net->flags, net->len);
 		queue_add(tx_queue, net, net->len);
+		queue_del(rs_queue, net);
 	}
 	return 0;
 }
@@ -182,18 +244,20 @@
 
 static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to)
 {
-	struct cache_ftfw *cn;
+	struct cache_ftfw *cn, *tmp;
 
-	list_for_each_entry(cn, &rs_list, rs_list) {
+	list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
 		struct us_conntrack *u;
 		
 		u = cache_get_conntrack(STATE_SYNC(internal), cn);
 		if (between(cn->seq, from, to)) {
 			dp("resending nack'ed (oldseq=%u)\n", cn->seq);
-			list_add(&cn->tx_list, &tx_list);
+			list_del_init(&cn->rs_list);
+			rs_list_len--;
+			list_add_tail(&cn->tx_list, &tx_list);
 			tx_list_len++;
-		} 
-	}
+		}
+	} 
 }
 
 static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
@@ -207,56 +271,115 @@
 		if (between(cn->seq, from, to)) {
 			dp("queue: deleting from queue (seq=%u)\n", cn->seq);
 			list_del_init(&cn->rs_list);
-		} 
+			rs_list_len--;
+		}
 	}
 }
 
-static int ftfw_recv(const struct nethdr *net)
+static int digest_msg(const struct nethdr *net)
 {
-	static unsigned int window = 0;
-	unsigned int exp_seq;
+	if (IS_DATA(net))
+		return MSG_DATA;
 
-	if (window == 0)
-		window = CONFIG(window_size);
+	else if (IS_ACK(net)) {
+		const struct nethdr_ack *h = (const struct nethdr_ack *) net;
 
-	if (!mcast_track_seq(net->seq, &exp_seq)) {
-		dp("OOS: sending nack (seq=%u)\n", exp_seq);
-		tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
-		window = CONFIG(window_size);
-	} else {
-		/* received a window, send an acknowledgement */
-		if (--window == 0) {
-			dp("sending ack (seq=%u)\n", net->seq);
-			tx_queue_add_ctlmsg(NET_F_ACK, 
-					    net->seq - CONFIG(window_size), 
-					    net->seq);
-		}
-	}
+		dprint("ACK(%u): from seq=%u to seq=%u\n",
+			h->seq, h->from, h->to);
+		rs_list_empty(STATE_SYNC(internal), h->from, h->to);
+		queue_iterate(rs_queue, h, rs_queue_empty);
+		return MSG_CTL;
 
-	if (IS_NACK(net)) {
+	} else if (IS_NACK(net)) {
 		const struct nethdr_ack *nack = (const struct nethdr_ack *) net;
 
-		dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to);
+		dprint("NACK(%u): from seq=%u to seq=%u\n",
+			nack->seq, nack->from, nack->to);
 		rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to);
 		queue_iterate(rs_queue, nack, rs_queue_to_tx);
-		return 1;
+		return MSG_CTL;
+
 	} else if (IS_RESYNC(net)) {
 		dp("RESYNC ALL\n");
 		cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx);
-		return 1;
-	} else if (IS_ACK(net)) {
-		const struct nethdr_ack *h = (const struct nethdr_ack *) net;
+		return MSG_CTL;
 
-		dp("ACK: from seq=%u to seq=%u\n", h->from, h->to);
-		rs_list_empty(STATE_SYNC(internal), h->from, h->to);
-		queue_iterate(rs_queue, h, rs_queue_empty);
-		return 1;
 	} else if (IS_ALIVE(net))
-		return 1;
+		return MSG_CTL;
 
-	return 0;
+	return MSG_BAD;
 }
 
+static int ftfw_recv(const struct nethdr *net)
+{
+	int ret = MSG_DATA;
+
+	switch (mcast_track_seq(net->seq, &exp_seq)) {
+	case SEQ_AFTER:
+		ret = digest_msg(net);
+		if (ret == MSG_BAD) {
+			ret = MSG_BAD;
+			goto out;
+		}
+
+		if (ack_from_set) {
+			tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1);
+			dprint("OFS send half ACK: from seq=%u to seq=%u\n", 
+				ack_from, exp_seq-1);
+			ack_from_set = 0;
+		}
+
+		tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
+		dprint("OFS send NACK: from seq=%u to seq=%u\n", 
+			exp_seq, net->seq-1);
+
+		/* count this message as part of the new window */
+		window = CONFIG(window_size) - 1;
+		ack_from = net->seq;
+		ack_from_set = 1;
+		break;
+
+	case SEQ_BEFORE:
+		/* we don't accept delayed packets */
+		dlog(LOG_WARNING, "Received seq=%u before expected seq=%u",
+				   net->seq, exp_seq);
+		dlog(LOG_WARNING, "Probably the other node has come back"
+				  "to life but you forgot to add "
+				  "conntrackd -r to your scripts");
+		ret = MSG_DROP;
+		break;
+
+	case SEQ_UNSET:
+	case SEQ_IN_SYNC:
+		ret = digest_msg(net);
+		if (ret == MSG_BAD) {
+			ret = MSG_BAD;
+			goto out;
+		}
+
+		if (!ack_from_set) {
+			ack_from_set = 1;
+			ack_from = net->seq;
+		}
+
+		if (--window <= 0) {
+			/* received a window, send an acknowledgement */
+			dprint("OFS send ACK: from seq=%u to seq=%u\n",
+				ack_from, net->seq);
+
+			tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq);
+			window = CONFIG(window_size);
+			ack_from_set = 0;
+		}
+	}
+
+out:
+	if ((ret == MSG_DATA || ret == MSG_CTL))
+		mcast_track_update_seq(net->seq);
+
+	return ret;
+}
+
 static void ftfw_send(struct nethdr *net, struct us_conntrack *u)
 {
 	struct netpld *pld = NETHDR_DATA(net);
@@ -270,11 +393,14 @@
 		cn = (struct cache_ftfw *) 
 			cache_get_extra(STATE_SYNC(internal), u);
 
-		if (!list_empty(&cn->rs_list))
-			list_del(&cn->rs_list);
+		if (!list_empty(&cn->rs_list)) {
+			list_del_init(&cn->rs_list);
+			rs_list_len--;
+		}
 
 		cn->seq = net->seq;
-		list_add(&cn->rs_list, &rs_list);
+		list_add_tail(&cn->rs_list, &rs_list);
+		rs_list_len++;
 		break;
 	case NFCT_Q_DESTROY:
 		queue_add(rs_queue, net, net->len);
@@ -294,7 +420,7 @@
 	HDR_NETWORK2HOST(net);
 
 	if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) {
-		dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n",
+		dprint("tx_queue -> to_rs_queue sq: %u fl:%u len:%u\n",
         	       net->seq, net->flags, net->len);
 		queue_add(rs_queue, net, net->len);
 	}
@@ -317,8 +443,7 @@
 	tx_list_len--;
 
 	ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
-	if (STATE_SYNC(sync)->send)
-		STATE_SYNC(sync)->send(net, u);
+	ftfw_send(net, u);
 
 	return ret;
 }
@@ -337,6 +462,14 @@
 		u = cache_get_conntrack(STATE_SYNC(internal), cn);
 		tx_list_xmit(&cn->tx_list, u);
 	}
+
+	/* reset alive alarm */
+	add_alarm(&alive_alarm, 1, 0);
+
+	dprint("tx_list_len:%u tx_queue_len:%u "
+	       "rs_list_len: %u rs_queue_len:%u\n",
+		tx_list_len, queue_len(tx_queue),
+		rs_list_len, queue_len(rs_queue));
 }
 
 struct sync_mode sync_ftfw = {

Modified: trunk/conntrack-tools/src/sync-mode.c
===================================================================
--- trunk/conntrack-tools/src/sync-mode.c	2008-04-24 03:45:46 UTC (rev 7508)
+++ trunk/conntrack-tools/src/sync-mode.c	2008-04-26 16:07:00 UTC (rev 7509)
@@ -42,8 +42,18 @@
 	struct nf_conntrack *ct = (struct nf_conntrack *)(void*) __ct;
 	struct us_conntrack *u;
 
-	if (STATE_SYNC(sync)->recv(net))
-		return;
+	switch (STATE_SYNC(sync)->recv(net)) {
+		case MSG_DATA:
+			break;
+		case MSG_DROP:
+		case MSG_CTL:
+			return;
+		case MSG_BAD:
+			STATE(malformed)++;
+			return;
+		default:
+			break;
+	}
 
 	memset(ct, 0, sizeof(__ct));
 
@@ -211,14 +221,15 @@
 static void run_sync(fd_set *readfds)
 {
 	/* multicast packet has been received */
-	if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds))
+	if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds)) {
 		mcast_handler();
 
-	if (STATE_SYNC(sync)->run)
-		STATE_SYNC(sync)->run();
+		if (STATE_SYNC(sync)->run)
+			STATE_SYNC(sync)->run();
 
-	/* flush pending messages */
-	mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
+		/* flush pending messages */
+		mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
+	}
 }
 
 static void kill_sync(void)
@@ -358,16 +369,8 @@
 
 	ret = nfct_query(h, NFCT_Q_GET, u->ct);
 	if (ret == -1 && errno == ENOENT) {
-		size_t len;
-		struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_DESTROY);
-
 		debug_ct(u->ct, "overrun purge resync");
-
-	        len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
-	        mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
-		if (STATE_SYNC(sync)->send)
-			STATE_SYNC(sync)->send(net, u);
-
+		mcast_send_sync(u, u->ct, NFCT_Q_DESTROY);
 		cache_del(STATE_SYNC(internal), u->ct);
 	}
 
@@ -402,16 +405,8 @@
 
 	if (!cache_test(STATE_SYNC(internal), ct)) {
 		if ((u = cache_update_force(STATE_SYNC(internal), ct))) {
-			size_t len;
-
 			debug_ct(u->ct, "overrun resync");
-
-			struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE);
-			len = prepare_send_netmsg(STATE_SYNC(mcast_client),net);
-			mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), 
-						   net, len);
-			if (STATE_SYNC(sync)->send)
-				STATE_SYNC(sync)->send(net, u);
+			mcast_send_sync(u, u->ct, NFCT_Q_UPDATE);
 		}
 	}
 
@@ -437,7 +432,6 @@
 	} else {
 		if (errno == EEXIST) {
 			cache_del(STATE_SYNC(internal), ct);
-			mcast_send_sync(NULL, ct, NFCT_Q_DESTROY);
 			goto retry;
 		}
 




More information about the netfilter-cvslog mailing list