[svn] commit: r525 - /branches/parkinglot/src/bin/msgq/msgq.py
BIND 10 source code commits
bind10-changes at lists.isc.org
Tue Jan 26 21:32:10 UTC 2010
Author: mgraff
Date: Tue Jan 26 21:32:10 2010
New Revision: 525
Log:
add kqueue() support in parallel to poll() support
Modified:
branches/parkinglot/src/bin/msgq/msgq.py
Modified: branches/parkinglot/src/bin/msgq/msgq.py
==============================================================================
--- branches/parkinglot/src/bin/msgq/msgq.py (original)
+++ branches/parkinglot/src/bin/msgq/msgq.py Tue Jan 26 21:32:10 2010
@@ -81,6 +81,7 @@
self.verbose = True
self.c_channel_port = c_channel_port
self.poller = None
+ self.kqueue = None
self.runnable = False
self.listen_socket = False
self.sockets = {}
@@ -91,7 +92,17 @@
def setup_poller(self):
"""Set up the poll thing. Internal function."""
- self.poller = select.poll()
+ try:
+ self.poller = select.poll()
+ except AttributeError:
+ self.kqueue = select.kqueue()
+
+ def add_kqueue_socket(self, socket):
+ event = select.kevent(socket.fileno(),
+ select.KQ_FILTER_READ,
+ select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ self.kqueue.control([event], 0)
+
def setup_listener(self):
"""Set up the listener socket. Internal function."""
@@ -100,7 +111,10 @@
self.listen_socket.bind(("127.0.0.1", self.c_channel_port))
self.listen_socket.listen(1024)
- self.poller.register(self.listen_socket, select.POLLIN)
+ if self.poller:
+ self.poller.register(self.listen_socket, select.POLLIN)
+ else:
+ self.add_kqueue_socket(self.listen_socket)
def setup(self):
"""Configure listener socket, polling, etc."""
@@ -120,7 +134,11 @@
self.sockets[newsocket.fileno()] = newsocket
lname = self.newlname()
self.lnames[lname] = newsocket
- self.poller.register(newsocket, select.POLLIN)
+
+ if self.poller:
+ self.poller.register(newsocket, select.POLLIN)
+ else:
+ self.add_kqueue_socket(newsocket)
def process_socket(self, fd):
"""Process a read on a socket."""
@@ -133,7 +151,8 @@
def kill_socket(self, fd, sock):
"""Fully close down the socket."""
- self.poller.unregister(sock)
+ if self.poller:
+ self.poller.unregister(sock)
self.subs.unsubscribe_all(sock)
lname = [ k for k, v in self.lnames.items() if v == sock ][0]
del self.lnames[lname]
@@ -280,6 +299,13 @@
def run(self):
"""Process messages. Forever. Mostly."""
+
+ if self.poller:
+ self.run_poller()
+ else:
+ self.run_kqueue()
+
+ def run_poller(self):
while True:
try:
events = self.poller.poll()
@@ -295,6 +321,21 @@
else:
self.process_socket(fd)
+ def run_kqueue(self):
+ while True:
+ events = self.kqueue.control(None, 10)
+ if not events:
+ raise RuntimeError('serve: kqueue returned no events')
+
+ for event in events:
+ if event.ident == self.listen_socket.fileno():
+ self.process_accept()
+ else:
+ if event.flags & select.KQ_FILTER_READ and event.data > 0:
+ self.process_socket(event.ident)
+ elif event.flags & select.KQ_EV_EOF:
+ self.kill_socket(event.ident, self.sockets[event.ident])
+
def shutdown(self):
"""Stop the MsgQ master."""
if self.verbose:
More information about the bind10-changes
mailing list