[netfilter-cvslog] r3354 - trunk/netfilter-ha/ct_sync
hidden at netfilter.org
hidden at netfilter.org
Mon Dec 13 22:43:06 CET 2004
Author: hidden at netfilter.org
Date: 2004-12-13 22:43:06 +0100 (Mon, 13 Dec 2004)
New Revision: 3354
Modified:
trunk/netfilter-ha/ct_sync/ct_sync_main.c
trunk/netfilter-ha/ct_sync/ct_sync_proto.c
trunk/netfilter-ha/ct_sync/ct_sync_proto.h
Log:
- implement polling-based receive loop
Modified: trunk/netfilter-ha/ct_sync/ct_sync_main.c
===================================================================
--- trunk/netfilter-ha/ct_sync/ct_sync_main.c 2004-12-13 12:28:03 UTC (rev 3353)
+++ trunk/netfilter-ha/ct_sync/ct_sync_main.c 2004-12-13 21:43:06 UTC (rev 3354)
@@ -62,7 +62,7 @@
#define ASSERT_WRITE_LOCK(x) MUST_BE_WRITE_LOCKED(&ip_conntrack_lock)
#include <linux/netfilter_ipv4/listhelp.h>
-#define CT_SYNC_VERSION "0.17"
+#define CT_SYNC_VERSION "0.18"
MODULE_LICENSE("GPL");
MODULE_AUTHOR("KOVACS Krisztian <hidden at sch.bme.hu>, Harald Welte <laforge at netfilter.org>");
@@ -87,6 +87,7 @@
MODULE_PARM_DESC(cmarkbit, "bit in the connection mark to use as sync mark");
/* used to stop ct_sync threads */
+static DECLARE_WAIT_QUEUE_HEAD(ct_sync_rcv_wait);
static DECLARE_WAIT_QUEUE_HEAD(stop_ct_rcv_thread_wait);
static atomic_t stop_ct_rcv_thread = ATOMIC_INIT(0);
@@ -974,13 +975,14 @@
}
/* FIXME: these should be configurable */
-#define CT_SYNC_RECV_BURST 100
-#define CT_SYNC_INITSYNC_RATE 20
+#define CT_SYNC_RECV_BURST 200
+//#define CT_SYNC_INITSYNC_RATE 20
/* sync kernel thread: receiver */
static int
ct_sync_rcv_thread_main(void *ct_thread_startup)
{
mm_segment_t oldmm;
+ DECLARE_WAITQUEUE(wait, current);
CT_SYNC_ENTER();
@@ -1005,7 +1007,26 @@
if (unlikely(atomic_read(&stop_ct_rcv_thread)))
break;
- /* first process all packets from the socket buffer into
+ /* wait for a new packet to arrive */
+ if (!cts_proto_recv_pending(cts_cfg.protoh)) {
+ __set_current_state(TASK_INTERRUPTIBLE);
+ add_wait_queue(&ct_sync_rcv_wait, &wait);
+
+ for (;;) {
+ __set_current_state(TASK_INTERRUPTIBLE);
+ if (cts_proto_recv_pending(cts_cfg.protoh) ||
+ unlikely(atomic_read(&stop_ct_rcv_thread)))
+ break;
+
+ CT_SYNC_DEBUG2("falling asleep\n");
+ schedule();
+ }
+
+ remove_wait_queue(&ct_sync_rcv_wait, &wait);
+ __set_current_state(TASK_RUNNING);
+ }
+
+ /* first process packets from the socket buffer into
* our protocol layer */
while (cts_proto_recv_pending(cts_cfg.protoh)) {
switch (cts_proto_recv_pkt(cts_cfg.protoh)) {
@@ -1049,7 +1070,15 @@
ct_sync_msg_process_slave(msghdr,
pkthdr);
}
+
+ /* give chance for other processes to run */
+ if (pkts_received >= CT_SYNC_RECV_BURST)
+ cond_resched();
+ /* FIXME: implement another way to detect end of initsync,
+ * since this method won't work with the new I/O scheduling
+ * model */
+#if 0
if (msgs_received < CT_SYNC_INITSYNC_RATE) {
/* We have not received more than CT_SYNC_INITSYNC_RATE
* messages in the last 100 ms. This usually means
@@ -1058,10 +1087,8 @@
* SLAVE_SYNRECV, we transition into SLAVE_RUNNING. */
cts_proto_recv_running(cts_cfg.protoh);
}
+#endif
- __set_current_state(TASK_INTERRUPTIBLE);
- schedule_timeout(HZ/10);
- __set_current_state(TASK_RUNNING);
}
error:
@@ -1799,7 +1826,7 @@
/* init protocol layer */
cts_cfg.protoh = cts_proto_init(cts_cfg.devname, &cts_cfg.addr, id,
state, &ct_sync_became_slave, NULL,
- &ct_sync_send_wait);
+ &ct_sync_send_wait, &ct_sync_rcv_wait);
if (!cts_cfg.protoh) {
CT_SYNC_ERR("Failed to initialize protocol.\n");
ret = -EINVAL;
@@ -1933,6 +1960,7 @@
__set_current_state(TASK_INTERRUPTIBLE);
atomic_set(&stop_ct_rcv_thread, 1);
add_wait_queue(&stop_ct_rcv_thread_wait, &wait);
+ wake_up(&ct_sync_rcv_wait);
schedule();
__set_current_state(TASK_RUNNING);
remove_wait_queue(&stop_ct_rcv_thread_wait, &wait);
Modified: trunk/netfilter-ha/ct_sync/ct_sync_proto.c
===================================================================
--- trunk/netfilter-ha/ct_sync/ct_sync_proto.c 2004-12-13 12:28:03 UTC (rev 3353)
+++ trunk/netfilter-ha/ct_sync/ct_sync_proto.c 2004-12-13 21:43:06 UTC (rev 3354)
@@ -1273,6 +1273,25 @@
return msghdr;
}
+/* data ready callback: wake up rcv thread */
+static void
+cts_data_ready(struct sock *sk, int count)
+{
+ struct cts_protoh *cph;
+
+ CT_SYNC_ENTER();
+
+ cph = (struct cts_protoh *)sk->user_data;
+ if (cph && cph->recv.wait && waitqueue_active(cph->recv.wait))
+ wake_up_interruptible(cph->recv.wait);
+
+ if (sk->sleep && waitqueue_active(sk->sleep))
+ wake_up_interruptible(sk->sleep);
+
+ CT_SYNC_LEAVE();
+}
+
+
/* initialize protocol */
struct cts_protoh *
cts_proto_init(char *devname,
@@ -1281,7 +1300,8 @@
int master,
void (*became_slave)(struct cts_protoh*, void*),
void *became_slave_arg,
- wait_queue_head_t *wait)
+ wait_queue_head_t *send_wait,
+ wait_queue_head_t *rcv_wait)
{
int ret;
struct cts_protoh *cph;
@@ -1310,7 +1330,7 @@
if (ret < 0)
goto error_cph;
cph->send.ring.seqno = jiffies;
- cph->send.wait = wait;
+ cph->send.wait = send_wait;
/* alloc points to cur, special case for startup to defer
header initialization to advance_alloc() */
cph->send.ring.alloc = cph->send.ring.cur = cph->send.ring.sent;
@@ -1325,17 +1345,20 @@
cph->recv.ring.sent = cph->recv.ring.cur = cph->recv.ring.alloc;
cph->recv.ring.backlog = cph->recv.ring.sent->prev;
csr_print(&cph->recv.ring);
- cph->recv.wait = NULL;
+ cph->recv.wait = rcv_wait;
/* create sockets */
if ((cph->send.socket = cts_sock_server_init(devname, addr)) == NULL) {
CT_SYNC_ERR("Failed to create send socket.\n");
goto error_crr;
- }
+ }
if ((cph->recv.socket = cts_sock_client_init(devname, addr)) == NULL) {
CT_SYNC_ERR("Failed to create rcv socket.\n");
goto error_ssock;
- }
+ }
+ /* setup data_ready callback of receive socket */
+ cph->recv.socket->sk->user_data = cph;
+ cph->recv.socket->sk->data_ready = cts_data_ready;
if (ret < 0)
goto error_rsock;
Modified: trunk/netfilter-ha/ct_sync/ct_sync_proto.h
===================================================================
--- trunk/netfilter-ha/ct_sync/ct_sync_proto.h 2004-12-13 12:28:03 UTC (rev 3353)
+++ trunk/netfilter-ha/ct_sync/ct_sync_proto.h 2004-12-13 21:43:06 UTC (rev 3354)
@@ -12,7 +12,8 @@
void (*became_slave)(struct cts_protoh*,
void *),
void *became_slave_arg,
- wait_queue_head_t *wait);
+ wait_queue_head_t *send_wait,
+ wait_queue_head_t *rcv_wait);
/* protocol cleanup funciton */
void cts_proto_cleanup(struct cts_protoh *cph);
More information about the netfilter-cvslog
mailing list