[svn] commit: r525 - /branches/parkinglot/src/bin/msgq/msgq.py

BIND 10 source code commits bind10-changes at lists.isc.org
Tue Jan 26 21:32:10 UTC 2010


Author: mgraff
Date: Tue Jan 26 21:32:10 2010
New Revision: 525

Log:
add kqueue() support in parallel to poll() support

Modified:
    branches/parkinglot/src/bin/msgq/msgq.py

Modified: branches/parkinglot/src/bin/msgq/msgq.py
==============================================================================
--- branches/parkinglot/src/bin/msgq/msgq.py (original)
+++ branches/parkinglot/src/bin/msgq/msgq.py Tue Jan 26 21:32:10 2010
@@ -81,6 +81,7 @@
         self.verbose = True
         self.c_channel_port = c_channel_port
         self.poller = None
+        self.kqueue = None
         self.runnable = False
         self.listen_socket = False
         self.sockets = {}
@@ -91,7 +92,17 @@
 
     def setup_poller(self):
         """Set up the poll thing.  Internal function."""
-        self.poller = select.poll()
+        try:
+            self.poller = select.poll()
+        except AttributeError:
+            self.kqueue = select.kqueue()
+    
+    def add_kqueue_socket(self, socket):
+        event = select.kevent(socket.fileno(),
+                              select.KQ_FILTER_READ,
+                              select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+        self.kqueue.control([event], 0)
+
 
     def setup_listener(self):
         """Set up the listener socket.  Internal function."""
@@ -100,7 +111,10 @@
         self.listen_socket.bind(("127.0.0.1", self.c_channel_port))
         self.listen_socket.listen(1024)
 
-        self.poller.register(self.listen_socket, select.POLLIN)
+        if self.poller:
+            self.poller.register(self.listen_socket, select.POLLIN)
+        else:
+            self.add_kqueue_socket(self.listen_socket)
 
     def setup(self):
         """Configure listener socket, polling, etc."""
@@ -120,7 +134,11 @@
         self.sockets[newsocket.fileno()] = newsocket
         lname = self.newlname()
         self.lnames[lname] = newsocket
-        self.poller.register(newsocket, select.POLLIN)
+
+        if self.poller:
+            self.poller.register(newsocket, select.POLLIN)
+        else:
+            self.add_kqueue_socket(newsocket)
 
     def process_socket(self, fd):
         """Process a read on a socket."""
@@ -133,7 +151,8 @@
 
     def kill_socket(self, fd, sock):
         """Fully close down the socket."""
-        self.poller.unregister(sock)
+        if self.poller:
+            self.poller.unregister(sock)
         self.subs.unsubscribe_all(sock)
         lname = [ k for k, v in self.lnames.items() if v == sock ][0]
         del self.lnames[lname]
@@ -280,6 +299,13 @@
 
     def run(self):
         """Process messages.  Forever.  Mostly."""
+        
+        if self.poller:
+            self.run_poller()
+        else:
+            self.run_kqueue()
+    
+    def run_poller(self):
         while True:
             try:
                 events = self.poller.poll()
@@ -295,6 +321,21 @@
                 else:
                     self.process_socket(fd)
 
+    def run_kqueue(self):
+        while True:
+            events = self.kqueue.control(None, 10)
+            if not events:
+                raise RuntimeError('serve: kqueue returned no events')
+            
+            for event in events:
+                if event.ident == self.listen_socket.fileno():
+                    self.process_accept()
+                else:
+                    if event.flags & select.KQ_FILTER_READ and event.data > 0:
+                        self.process_socket(event.ident)
+                    elif event.flags & select.KQ_EV_EOF:
+                        self.kill_socket(event.ident, self.sockets[event.ident])
+
     def shutdown(self):
         """Stop the MsgQ master."""
         if self.verbose:




More information about the bind10-changes mailing list