[netfilter-cvslog] r3354 - trunk/netfilter-ha/ct_sync

hidden at netfilter.org hidden at netfilter.org
Mon Dec 13 22:43:06 CET 2004


Author: hidden at netfilter.org
Date: 2004-12-13 22:43:06 +0100 (Mon, 13 Dec 2004)
New Revision: 3354

Modified:
   trunk/netfilter-ha/ct_sync/ct_sync_main.c
   trunk/netfilter-ha/ct_sync/ct_sync_proto.c
   trunk/netfilter-ha/ct_sync/ct_sync_proto.h
Log:
- implement polling-based receive loop


Modified: trunk/netfilter-ha/ct_sync/ct_sync_main.c
===================================================================
--- trunk/netfilter-ha/ct_sync/ct_sync_main.c	2004-12-13 12:28:03 UTC (rev 3353)
+++ trunk/netfilter-ha/ct_sync/ct_sync_main.c	2004-12-13 21:43:06 UTC (rev 3354)
@@ -62,7 +62,7 @@
 #define ASSERT_WRITE_LOCK(x) MUST_BE_WRITE_LOCKED(&ip_conntrack_lock)
 #include <linux/netfilter_ipv4/listhelp.h>
 
-#define CT_SYNC_VERSION	"0.17"
+#define CT_SYNC_VERSION	"0.18"
 
 MODULE_LICENSE("GPL");
 MODULE_AUTHOR("KOVACS Krisztian <hidden at sch.bme.hu>, Harald Welte <laforge at netfilter.org>");
@@ -87,6 +87,7 @@
 MODULE_PARM_DESC(cmarkbit, "bit in the connection mark to use as sync mark");
 
 /* used to stop ct_sync threads */
+static DECLARE_WAIT_QUEUE_HEAD(ct_sync_rcv_wait);
 static DECLARE_WAIT_QUEUE_HEAD(stop_ct_rcv_thread_wait);
 static atomic_t stop_ct_rcv_thread = ATOMIC_INIT(0);
 
@@ -974,13 +975,14 @@
 }
 
 /* FIXME: these should be configurable */
-#define CT_SYNC_RECV_BURST 100
-#define CT_SYNC_INITSYNC_RATE 20
+#define CT_SYNC_RECV_BURST 200
+//#define CT_SYNC_INITSYNC_RATE 20
 /* sync kernel thread: receiver */
 static int
 ct_sync_rcv_thread_main(void *ct_thread_startup)
 {
 	mm_segment_t oldmm;
+	DECLARE_WAITQUEUE(wait, current);
 	
 	CT_SYNC_ENTER();
 	
@@ -1005,7 +1007,26 @@
 	  	if (unlikely(atomic_read(&stop_ct_rcv_thread)))
 		  	break;
 
-		/* first process all packets from the socket buffer into
+		/* wait for a new packet to arrive */
+		if (!cts_proto_recv_pending(cts_cfg.protoh)) {
+			__set_current_state(TASK_INTERRUPTIBLE);
+			add_wait_queue(&ct_sync_rcv_wait, &wait);
+
+			for (;;) {
+				__set_current_state(TASK_INTERRUPTIBLE);
+				if (cts_proto_recv_pending(cts_cfg.protoh) ||
+				    unlikely(atomic_read(&stop_ct_rcv_thread)))
+					break;
+
+				CT_SYNC_DEBUG2("falling asleep\n");
+				schedule();
+			}
+
+			remove_wait_queue(&ct_sync_rcv_wait, &wait);
+			__set_current_state(TASK_RUNNING);
+		}
+
+		/* first process packets from the socket buffer into
 		 * our protocol layer */
 		while (cts_proto_recv_pending(cts_cfg.protoh)) {
 			switch (cts_proto_recv_pkt(cts_cfg.protoh)) {
@@ -1049,7 +1070,15 @@
 					ct_sync_msg_process_slave(msghdr, 
 								pkthdr);
 			}
+
+		/* give chance for other processes to run */
+		if (pkts_received >= CT_SYNC_RECV_BURST)
+			cond_resched();
 			
+		/* FIXME: implement another way to detect end of initsync,
+		 * since this method won't work with the new I/O scheduling
+		 * model */
+#if 0
 		if (msgs_received < CT_SYNC_INITSYNC_RATE) {
 			/* We have not received more than CT_SYNC_INITSYNC_RATE
 			 * messages in the last 100 ms.  This usually means
@@ -1058,10 +1087,8 @@
 			 * SLAVE_SYNRECV, we transition into SLAVE_RUNNING. */
 			cts_proto_recv_running(cts_cfg.protoh);
 		}
+#endif
 
-		__set_current_state(TASK_INTERRUPTIBLE);
-		schedule_timeout(HZ/10);
-		__set_current_state(TASK_RUNNING);
 	}
 
 error:
@@ -1799,7 +1826,7 @@
 	/* init protocol layer */
 	cts_cfg.protoh = cts_proto_init(cts_cfg.devname, &cts_cfg.addr, id,
 					state, &ct_sync_became_slave, NULL,
-					&ct_sync_send_wait);
+					&ct_sync_send_wait, &ct_sync_rcv_wait);
 	if (!cts_cfg.protoh) {
 		CT_SYNC_ERR("Failed to initialize protocol.\n");
 		ret = -EINVAL;
@@ -1933,6 +1960,7 @@
 	__set_current_state(TASK_INTERRUPTIBLE);
 	atomic_set(&stop_ct_rcv_thread, 1);
 	add_wait_queue(&stop_ct_rcv_thread_wait, &wait);
+	wake_up(&ct_sync_rcv_wait);
 	schedule();
 	__set_current_state(TASK_RUNNING);
 	remove_wait_queue(&stop_ct_rcv_thread_wait, &wait);

Modified: trunk/netfilter-ha/ct_sync/ct_sync_proto.c
===================================================================
--- trunk/netfilter-ha/ct_sync/ct_sync_proto.c	2004-12-13 12:28:03 UTC (rev 3353)
+++ trunk/netfilter-ha/ct_sync/ct_sync_proto.c	2004-12-13 21:43:06 UTC (rev 3354)
@@ -1273,6 +1273,25 @@
 	return msghdr;
 }
 
+/* data ready callback: wake up rcv thread */
+static void
+cts_data_ready(struct sock *sk, int count)
+{
+	struct cts_protoh *cph;
+
+	CT_SYNC_ENTER();
+
+	cph = (struct cts_protoh *)sk->user_data;
+	if (cph && cph->recv.wait && waitqueue_active(cph->recv.wait))
+		wake_up_interruptible(cph->recv.wait);
+
+	if (sk->sleep && waitqueue_active(sk->sleep))
+		wake_up_interruptible(sk->sleep);
+
+	CT_SYNC_LEAVE();
+}
+
+
 /* initialize protocol */
 struct cts_protoh *
 cts_proto_init(char *devname, 
@@ -1281,7 +1300,8 @@
 	       int master,
 	       void (*became_slave)(struct cts_protoh*, void*),
 	       void *became_slave_arg,
-	       wait_queue_head_t *wait)
+	       wait_queue_head_t *send_wait,
+	       wait_queue_head_t *rcv_wait)
 {
 	int ret;
 	struct cts_protoh *cph;
@@ -1310,7 +1330,7 @@
 	if (ret < 0)
 		goto error_cph;
 	cph->send.ring.seqno = jiffies;
-	cph->send.wait = wait;
+	cph->send.wait = send_wait;
 	/* alloc points to cur, special case for startup to defer
 	   header initialization to advance_alloc() */
 	cph->send.ring.alloc = cph->send.ring.cur = cph->send.ring.sent;
@@ -1325,17 +1345,20 @@
 	cph->recv.ring.sent = cph->recv.ring.cur = cph->recv.ring.alloc;
 	cph->recv.ring.backlog = cph->recv.ring.sent->prev;
 	csr_print(&cph->recv.ring);
-	cph->recv.wait = NULL;
+	cph->recv.wait = rcv_wait;
 
 	/* create sockets */
 	if ((cph->send.socket = cts_sock_server_init(devname, addr)) == NULL) {
 		CT_SYNC_ERR("Failed to create send socket.\n");
 		goto error_crr;
-	} 
+	}
 	if ((cph->recv.socket = cts_sock_client_init(devname, addr)) == NULL) {
 		CT_SYNC_ERR("Failed to create rcv socket.\n");
 		goto error_ssock;
-	} 
+	}
+	/* setup data_ready callback of receive socket */
+	cph->recv.socket->sk->user_data = cph;
+	cph->recv.socket->sk->data_ready = cts_data_ready;
 
 	if (ret < 0)
 		goto error_rsock;

Modified: trunk/netfilter-ha/ct_sync/ct_sync_proto.h
===================================================================
--- trunk/netfilter-ha/ct_sync/ct_sync_proto.h	2004-12-13 12:28:03 UTC (rev 3353)
+++ trunk/netfilter-ha/ct_sync/ct_sync_proto.h	2004-12-13 21:43:06 UTC (rev 3354)
@@ -12,7 +12,8 @@
 				  void (*became_slave)(struct cts_protoh*,
 					  		void *),
 				  void *became_slave_arg,
-				  wait_queue_head_t *wait);
+				  wait_queue_head_t *send_wait,
+				  wait_queue_head_t *rcv_wait);
 
 /* protocol cleanup funciton */
 void cts_proto_cleanup(struct cts_protoh *cph);




More information about the netfilter-cvslog mailing list