BIND 10 trac2582, updated. 84a994f8ef9bef873fd36a0837476f7b0a2d319c [2582] Run the MSGQ poller in separate thread
BIND 10 source code commits
bind10-changes at lists.isc.org
Mon Jan 14 18:13:20 UTC 2013
The branch, trac2582 has been updated
via 84a994f8ef9bef873fd36a0837476f7b0a2d319c (commit)
from fd3de5ebc7bbc021ba4b36769f4541a1fad5fffc (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
commit 84a994f8ef9bef873fd36a0837476f7b0a2d319c
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date: Mon Jan 14 19:09:07 2013 +0100
[2582] Run the MSGQ poller in separate thread
This will be needed in the following work. If we run in the same thread
and tried to connect to self by the cc library, it would deadlock, since
the library blocks waiting for answer.
Also, small cleanup of shutdown is included.
No functional change should be observable.
-----------------------------------------------------------------------
Summary of changes:
src/bin/msgq/msgq.py.in | 58 ++++++++++++++++++++++++++++++++++++---
src/bin/msgq/tests/msgq_test.py | 11 +++++++-
2 files changed, 64 insertions(+), 5 deletions(-)
-----------------------------------------------------------------------
diff --git a/src/bin/msgq/msgq.py.in b/src/bin/msgq/msgq.py.in
index 6ddc31f..f047fb1 100755
--- a/src/bin/msgq/msgq.py.in
+++ b/src/bin/msgq/msgq.py.in
@@ -29,6 +29,7 @@ import errno
import time
import select
import random
+import threading
from optparse import OptionParser, OptionValueError
import isc.util.process
import isc.log
@@ -37,7 +38,13 @@ from isc.log_messages.msgq_messages import *
import isc.cc
isc.util.process.rename()
+# Logger that is used in the actual msgq handling - startup, shutdown and the
+# poller thread.
logger = isc.log.Logger("msgq")
+# A separate copy for the master/config thread when the poller thread runs.
+# We use a separate instance, since the logger itself doesn't have to be
+# thread safe.
+config_logger = isc.log.Logger("msgq")
TRACE_START = logger.DBGLVL_START_SHUT
TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
@@ -191,6 +198,20 @@ class MsgQ:
else:
self.add_kqueue_socket(self.listen_socket)
+ def setup_signalsock(self):
+ """Create a socket pair used to signal when we want to finish.
+ Using a socket is easy and thread/signal safe way to signal
+ the termination.
+ """
+ # The __poller_sock will be the end in the poller. When it is
+ # closed, we should shut down.
+ (self.__poller_sock, self.__control_sock) = socket.socketpair()
+
+ if self.poller:
+ self.poller.register(self.__poller_sock, select.POLLIN)
+ else:
+ self.add_kqueue_socket(self.__poller_sock)
+
def setup(self):
"""Configure listener socket, polling, etc.
Raises a socket.error if the socket_file cannot be
@@ -198,6 +219,7 @@ class MsgQ:
"""
self.setup_poller()
+ self.setup_signalsock()
self.setup_listener()
logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);
@@ -496,6 +518,10 @@ class MsgQ:
for (fd, event) in events:
if fd == self.listen_socket.fileno():
self.process_accept()
+ elif fd == self.__poller_sock.fileno():
+ # If it's the signal socket, we should terminate now.
+ self.running = False
+ break
else:
if event & select.POLLOUT:
self.__process_write(fd)
@@ -515,6 +541,10 @@ class MsgQ:
for event in events:
if event.ident == self.listen_socket.fileno():
self.process_accept()
+ elif event.ident == self.__poller_sock.fileno():
+ # If it's the signal socket, we should terminate now.
+ self.running = False
+ break;
else:
if event.filter == select.KQ_FILTER_WRITE:
self.__process_write(event.ident)
@@ -526,12 +556,26 @@ class MsgQ:
self.sockets[event.ident])
def stop(self):
- self.running = False
+ # Signal it should terminate.
+ self.__control_sock.close()
+ self.__control_sock = None
+
+ def cleanup_signalsock(self):
+ """Close the signal sockets. We could do it directly in shutdown,
+ but this part is reused in tests.
+ """
+ if self.__poller_sock:
+ self.__poller_sock.close()
+ self.__poller_sock = None
+ if self.__control_sock:
+ self.__control_sock.close()
+ self.__control_sock = None
def shutdown(self):
"""Stop the MsgQ master."""
logger.debug(TRACE_START, MSGQ_SHUTDOWN)
self.listen_socket.close()
+ self.cleanup_signalsock()
if os.path.exists(self.socket_file):
os.remove(self.socket_file)
@@ -541,8 +585,7 @@ msgq = None
def signal_handler(signal, frame):
if msgq:
- msgq.shutdown()
- sys.exit(0)
+ msgq.stop()
if __name__ == "__main__":
def check_port(option, opt_str, value, parser):
@@ -583,8 +626,15 @@ if __name__ == "__main__":
logger.fatal(MSGQ_START_FAIL, e)
sys.exit(1)
+ # We run the processing in a separate thread. This is because we want to
+ # connect to the msgq ourself. But the cc library is unfortunately blocking
+ # in many places and waiting for the processing part to answer, it would
+ # deadlock.
+ poller_thread = threading.Thread(target=msgq.run)
+ poller_thread.daemon = True
try:
- msgq.run()
+ poller_thread.start()
+ poller_thread.join()
except KeyboardInterrupt:
pass
diff --git a/src/bin/msgq/tests/msgq_test.py b/src/bin/msgq/tests/msgq_test.py
index 417418f..503f02b 100644
--- a/src/bin/msgq/tests/msgq_test.py
+++ b/src/bin/msgq/tests/msgq_test.py
@@ -101,7 +101,7 @@ class TestSubscriptionManager(unittest.TestCase):
try:
msgq.setup()
self.assertTrue(os.path.exists(socket_file))
- msgq.shutdown();
+ msgq.shutdown()
self.assertFalse(os.path.exists(socket_file))
except socket.error:
# ok, the install path doesn't exist at all,
@@ -115,6 +115,8 @@ class TestSubscriptionManager(unittest.TestCase):
def test_open_socket_bad(self):
msgq = MsgQ("/does/not/exist")
self.assertRaises(socket.error, msgq.setup)
+ # But we can clean up after that.
+ msgq.shutdown()
class DummySocket:
"""
@@ -282,8 +284,10 @@ class SendNonblock(unittest.TestCase):
if queue_pid == 0:
signal.alarm(120)
msgq.setup_poller()
+ msgq.setup_signalsock()
msgq.register_socket(queue)
msgq.run()
+ msgq.cleanup_signalsock()
else:
try:
def killall(signum, frame):
@@ -357,6 +361,7 @@ class SendNonblock(unittest.TestCase):
# Don't need a listen_socket
msgq.listen_socket = DummySocket
msgq.setup_poller()
+ msgq.setup_signalsock()
msgq.register_socket(write)
msgq.register_socket(control_write)
# Queue the message for sending
@@ -384,6 +389,10 @@ class SendNonblock(unittest.TestCase):
# Fail the test if it didn't stop
self.assertFalse(msgq_thread.isAlive(), "Thread did not stop")
+ # Clean up some internals of msgq (usually called as part of
+ # shutdown, but we skip that one here)
+ msgq.cleanup_signalsock()
+
# Check the exception from the thread, if any
# First, if we didn't expect it; reraise it (to make test fail and
# show the stacktrace for debugging)
More information about the bind10-changes
mailing list