BIND 10 master, updated. 79b57c7becdf6fa66812c75bf29a63f63221442f Changelog for #2582

BIND 10 source code commits bind10-changes at lists.isc.org
Fri Jan 18 13:01:53 UTC 2013


The branch, master has been updated
       via  79b57c7becdf6fa66812c75bf29a63f63221442f (commit)
       via  ced31d8c5a0f2ca930b976d3caecfc24fc04634e (commit)
       via  980d715bc1b4b6dc592ac6c8e7bdedf4cf528f6c (commit)
       via  b0a6e5fff9ad673e6ba6a55797ab08b1349ecd8f (commit)
       via  2283402c2b94545fa66cbc5e8f7294cb39924568 (commit)
       via  2f3611a364d4ec7b2626921c22029b8dbc024404 (commit)
       via  6a1247051d30f4eb5be35b661f74a90a51390cbf (commit)
       via  a7872f60cba423597e3049aa8acd466fbe29882d (commit)
       via  973bfad40ba4fe946d53d200c7e27ab32d79d0de (commit)
       via  275a72e95dc82e105e48654e1a3ed71a48b7840a (commit)
       via  8d31e215ffc21430b29276ed65dd44a57598f9b8 (commit)
       via  84a994f8ef9bef873fd36a0837476f7b0a2d319c (commit)
       via  fd3de5ebc7bbc021ba4b36769f4541a1fad5fffc (commit)
      from  3aee00b231be08a7edecfedf833ab0d9f2629922 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit 79b57c7becdf6fa66812c75bf29a63f63221442f
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Fri Jan 18 13:36:22 2013 +0100

    Changelog for #2582

commit ced31d8c5a0f2ca930b976d3caecfc24fc04634e
Merge: 3aee00b 980d715
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Fri Jan 18 13:31:04 2013 +0100

    Merge #2582
    
    Let the message queue connect to itself and handle config updates and commands.
    No commands or configuration specific to Msgq does not exist yet, but it
    handles the generic logging config.

-----------------------------------------------------------------------

Summary of changes:
 ChangeLog                           |    9 ++
 src/bin/msgq/Makefile.am            |    5 +-
 src/bin/msgq/msgq.py.in             |  247 ++++++++++++++++++++++++++++-------
 src/bin/msgq/msgq.spec              |    8 ++
 src/bin/msgq/msgq_messages.mes      |   18 +++
 src/bin/msgq/tests/msgq_test.py     |  111 +++++++++++++++-
 src/lib/python/isc/config/cfgmgr.py |    1 +
 tests/lettuce/features/msgq.feature |   18 +++
 8 files changed, 368 insertions(+), 49 deletions(-)
 create mode 100644 src/bin/msgq/msgq.spec
 create mode 100644 tests/lettuce/features/msgq.feature

-----------------------------------------------------------------------
diff --git a/ChangeLog b/ChangeLog
index 58d4e4a..c999ea6 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,7 +1,16 @@
+248.	[func]		vorner
+	The message queue daemon now appears on the bus. This has two
+	effects, one is it obeys logging configuration and logs to the
+	correct place like the rest of the modules. The other is it
+	appears in bindctl as module (but it doesn't have any commands or
+	configuration yet).
+	(Trac #2582, git ced31d8c5a0f2ca930b976d3caecfc24fc04634e)
+
 547.	[func]*		vorner
 	The b10-loadzone now performs more thorough sanity check on the
 	loaded data.  Some of the checks are now fatal and zone failing
 	them will be rejected.
+	(Trac #2436, git 48d999f1cb59f308f9f30ba2639521d2a5a85baa)
 
 546.	[func]		marcin
 	DHCP option definitions can be now created using the
diff --git a/src/bin/msgq/Makefile.am b/src/bin/msgq/Makefile.am
index 5f377b1..a49b125 100644
--- a/src/bin/msgq/Makefile.am
+++ b/src/bin/msgq/Makefile.am
@@ -4,13 +4,16 @@ pkglibexecdir = $(libexecdir)/@PACKAGE@
 
 pkglibexec_SCRIPTS = b10-msgq
 
+b10_msgqdir = $(pkgdatadir)
+b10_msgq_DATA = msgq.spec
+
 CLEANFILES = b10-msgq msgq.pyc
 CLEANFILES += $(PYTHON_LOGMSGPKG_DIR)/work/msgq_messages.py
 CLEANFILES += $(PYTHON_LOGMSGPKG_DIR)/work/msgq_messages.pyc
 
 man_MANS = b10-msgq.8
 DISTCLEANFILES = $(man_MANS)
-EXTRA_DIST = $(man_MANS) msgq.xml msgq_messages.mes
+EXTRA_DIST = $(man_MANS) msgq.xml msgq_messages.mes msgq.spec
 
 nodist_pylogmessage_PYTHON = $(PYTHON_LOGMSGPKG_DIR)/work/msgq_messages.py
 pylogmessagedir = $(pyexecdir)/isc/log_messages/
diff --git a/src/bin/msgq/msgq.py.in b/src/bin/msgq/msgq.py.in
index 6937600..edca400 100755
--- a/src/bin/msgq/msgq.py.in
+++ b/src/bin/msgq/msgq.py.in
@@ -29,6 +29,8 @@ import errno
 import time
 import select
 import random
+import threading
+import isc.config.ccsession
 from optparse import OptionParser, OptionValueError
 import isc.util.process
 import isc.log
@@ -37,7 +39,15 @@ from isc.log_messages.msgq_messages import *
 import isc.cc
 
 isc.util.process.rename()
+
+isc.log.init("b10-msgq", buffer=True)
+# Logger that is used in the actual msgq handling - startup, shutdown and the
+# poller thread.
 logger = isc.log.Logger("msgq")
+# A separate copy for the master/config thread when the poller thread runs.
+# We use a separate instance, since the logger itself doesn't have to be
+# thread safe.
+config_logger = isc.log.Logger("msgq")
 TRACE_START = logger.DBGLVL_START_SHUT
 TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
 TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
@@ -47,11 +57,31 @@ TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
 # number, and the overall BIND 10 version number (set in configure.ac).
 VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
 
+# If B10_FROM_BUILD is set in the environment, we use data files
+# from a directory relative to that, otherwise we use the ones
+# installed on the system
+if "B10_FROM_BUILD" in os.environ:
+    SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/msgq"
+else:
+    PREFIX = "@prefix@"
+    DATAROOTDIR = "@datarootdir@"
+    SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
+SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
+
 class MsgQReceiveError(Exception): pass
 
 class SubscriptionManager:
-    def __init__(self):
+    def __init__(self, cfgmgr_ready):
+        """
+        Initialize the subscription manager.
+        parameters:
+        * cfgmgr_ready: A callable object run once the config manager
+            subscribes. This is a hackish solution, but we can't read
+            the configuration sooner.
+        """
         self.subscriptions = {}
+        self.__cfgmgr_ready = cfgmgr_ready
+        self.__cfgmgr_ready_called = False
 
     def subscribe(self, group, instance, socket):
         """Add a subscription."""
@@ -63,6 +93,10 @@ class SubscriptionManager:
         else:
             logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance)
             self.subscriptions[target] = [ socket ]
+        if group == "ConfigManager" and not self.__cfgmgr_ready_called:
+            logger.debug(TRACE_BASIC, MSGQ_CFGMGR_SUBSCRIBED)
+            self.__cfgmgr_ready_called = True
+            self.__cfgmgr_ready()
 
     def unsubscribe(self, group, instance, socket):
         """Remove the socket from the one specific subscription."""
@@ -130,10 +164,52 @@ class MsgQ:
         self.sockets = {}
         self.connection_counter = random.random()
         self.hostname = socket.gethostname()
-        self.subs = SubscriptionManager()
+        self.subs = SubscriptionManager(self.cfgmgr_ready)
         self.lnames = {}
         self.sendbuffs = {}
         self.running = False
+        self.__cfgmgr_ready = None
+        self.__cfgmgr_ready_cond = threading.Condition()
+        # A lock used when the message queue does anything more complicated.
+        # It is mostly a safety measure, the threads doing so should be mostly
+        # independent, and the one with config session should be read only,
+        # but with threads, one never knows. We use threads for concurrency,
+        # not for performance, so we use wide lock scopes to be on the safe
+        # side.
+        self.__lock = threading.Lock()
+
+    def cfgmgr_ready(self, ready=True):
+        """Notify that the config manager is either subscribed, or
+           that the msgq is shutting down and it won't connect, but
+           anybody waiting for it should stop anyway.
+
+           The ready parameter signifies if the config manager is subscribed.
+
+           This method can be called multiple times, but second and any
+           following call is simply ignored. This means the "abort" version
+           of the call can be used on any stop unconditionally, even when
+           the config manager already connected.
+        """
+        with self.__cfgmgr_ready_cond:
+            if self.__cfgmgr_ready is not None:
+                # This is a second call to this method. In that case it does
+                # nothing.
+                return
+            self.__cfgmgr_ready = ready
+            self.__cfgmgr_ready_cond.notify_all()
+
+    def wait_cfgmgr(self):
+        """Wait for msgq to subscribe.
+
+           When this returns, the config manager is either subscribed, or
+           msgq gave up waiting for it. Success is signified by the return
+           value.
+        """
+        with self.__cfgmgr_ready_cond:
+            # Wait until it either aborts or subscribes
+            while self.__cfgmgr_ready is None:
+                self.__cfgmgr_ready_cond.wait()
+            return self.__cfgmgr_ready
 
     def setup_poller(self):
         """Set up the poll thing.  Internal function."""
@@ -143,7 +219,7 @@ class MsgQ:
             self.poller = select.poll()
 
     def add_kqueue_socket(self, socket, write_filter=False):
-        """Add a kquque filter for a socket.  By default the read
+        """Add a kqueue filter for a socket.  By default the read
         filter is used; if write_filter is set to True, the write
         filter is used.  We use a boolean value instead of a specific
         filter constant, because kqueue filter values do not seem to
@@ -191,6 +267,20 @@ class MsgQ:
         else:
             self.add_kqueue_socket(self.listen_socket)
 
+    def setup_signalsock(self):
+        """Create a socket pair used to signal when we want to finish.
+           Using a socket is easy and thread/signal safe way to signal
+           the termination.
+        """
+        # The __poller_sock will be the end in the poller. When it is
+        # closed, we should shut down.
+        (self.__poller_sock, self.__control_sock) = socket.socketpair()
+
+        if self.poller:
+            self.poller.register(self.__poller_sock, select.POLLIN)
+        else:
+            self.add_kqueue_socket(self.__poller_sock)
+
     def setup(self):
         """Configure listener socket, polling, etc.
            Raises a socket.error if the socket_file cannot be
@@ -198,6 +288,7 @@ class MsgQ:
         """
 
         self.setup_poller()
+        self.setup_signalsock()
         self.setup_listener()
 
         logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);
@@ -493,16 +584,21 @@ class MsgQ:
                 else:
                     logger.fatal(MSGQ_POLL_ERR, err)
                     break
-            for (fd, event) in events:
-                if fd == self.listen_socket.fileno():
-                    self.process_accept()
-                else:
-                    if event & select.POLLOUT:
-                        self.__process_write(fd)
-                    elif event & select.POLLIN:
-                        self.process_socket(fd)
+            with self.__lock:
+                for (fd, event) in events:
+                    if fd == self.listen_socket.fileno():
+                        self.process_accept()
+                    elif fd == self.__poller_sock.fileno():
+                        # If it's the signal socket, we should terminate now.
+                        self.running = False
+                        break
                     else:
-                        logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
+                        if event & select.POLLOUT:
+                            self.__process_write(fd)
+                        elif event & select.POLLIN:
+                            self.process_socket(fd)
+                        else:
+                            logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
 
     def run_kqueue(self):
         while self.running:
@@ -512,38 +608,83 @@ class MsgQ:
             if not events:
                 raise RuntimeError('serve: kqueue returned no events')
 
-            for event in events:
-                if event.ident == self.listen_socket.fileno():
-                    self.process_accept()
-                else:
-                    if event.filter == select.KQ_FILTER_WRITE:
-                        self.__process_write(event.ident)
-                    if event.filter == select.KQ_FILTER_READ and \
-                            event.data > 0:
-                        self.process_socket(event.ident)
-                    elif event.flags & select.KQ_EV_EOF:
-                        self.kill_socket(event.ident,
-                                         self.sockets[event.ident])
+            with self.__lock:
+                for event in events:
+                    if event.ident == self.listen_socket.fileno():
+                        self.process_accept()
+                    elif event.ident == self.__poller_sock.fileno():
+                        # If it's the signal socket, we should terminate now.
+                        self.running = False
+                        break;
+                    else:
+                        if event.filter == select.KQ_FILTER_WRITE:
+                            self.__process_write(event.ident)
+                        if event.filter == select.KQ_FILTER_READ and \
+                                event.data > 0:
+                            self.process_socket(event.ident)
+                        elif event.flags & select.KQ_EV_EOF:
+                            self.kill_socket(event.ident,
+                                             self.sockets[event.ident])
 
     def stop(self):
-        self.running = False
+        # Signal it should terminate.
+        self.__control_sock.close()
+        self.__control_sock = None
+        # Abort anything waiting on the condition, just to make sure it's not
+        # blocked forever
+        self.cfgmgr_ready(False)
+
+    def cleanup_signalsock(self):
+        """Close the signal sockets. We could do it directly in shutdown,
+           but this part is reused in tests.
+        """
+        if self.__poller_sock:
+            self.__poller_sock.close()
+            self.__poller_sock = None
+        if self.__control_sock:
+            self.__control_sock.close()
+            self.__control_sock = None
 
     def shutdown(self):
         """Stop the MsgQ master."""
-        if self.verbose:
-            sys.stdout.write("[b10-msgq] Stopping the server.\n")
+        logger.debug(TRACE_START, MSGQ_SHUTDOWN)
         self.listen_socket.close()
+        self.cleanup_signalsock()
         if os.path.exists(self.socket_file):
             os.remove(self.socket_file)
 
-# can signal handling and calling a destructor be done without a
-# global variable?
-msgq = None
+    def config_handler(self, new_config):
+        """The configuration handler (run in a separate thread).
+           Not tested, currently effectively empty.
+        """
+        config_logger.debug(TRACE_DETAIL, MSGQ_CONFIG_DATA, new_config)
+
+        with self.__lock:
+            if not self.running:
+                return
+
+            # TODO: Any config handlig goes here.
+
+            return isc.config.create_answer(0)
+
+    def command_handler(self, command, args):
+        """The command handler (run in a separate thread).
+           Not tested, currently effectively empty.
+        """
+        config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args)
+
+        with self.__lock:
+            if not self.running:
+                return
+
+            # TODO: Any commands go here
 
-def signal_handler(signal, frame):
+            config_logger.error(MSGQ_COMMAND_UNKNOWN, command)
+            return isc.config.create_answer(1, 'unknown command: ' + command)
+
+def signal_handler(msgq, signal, frame):
     if msgq:
-        msgq.shutdown()
-    sys.exit(0)
+        msgq.stop()
 
 if __name__ == "__main__":
     def check_port(option, opt_str, value, parser):
@@ -556,6 +697,7 @@ if __name__ == "__main__":
 
     # Parse any command-line options.
     parser = OptionParser(version=VERSION)
+    # TODO: Should we remove the option?
     parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
                       help="display more about what is going on")
     parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
@@ -563,29 +705,46 @@ if __name__ == "__main__":
                       help="UNIX domain socket file the msgq daemon will use")
     (options, args) = parser.parse_args()
 
-    # Init logging, according to the parameters.
-    # FIXME: Do proper logger configuration, this is just a hack
-    # This is #2582
-    sev = 'INFO'
-    if options.verbose:
-        sev = 'DEBUG'
-    isc.log.init("b10-msgq", buffer=False, severity=sev, debuglevel=99)
-
-    signal.signal(signal.SIGTERM, signal_handler)
-
     # Announce startup.
     logger.debug(TRACE_START, MSGQ_START, VERSION)
 
     msgq = MsgQ(options.msgq_socket_file, options.verbose)
 
+    signal.signal(signal.SIGTERM,
+                  lambda signal, frame: signal_handler(msgq, signal, frame))
+
     try:
         msgq.setup()
     except Exception as e:
         logger.fatal(MSGQ_START_FAIL, e)
         sys.exit(1)
 
+    # We run the processing in a separate thread. This is because we want to
+    # connect to the msgq ourself. But the cc library is unfortunately blocking
+    # in many places and waiting for the processing part to answer, it would
+    # deadlock.
+    poller_thread = threading.Thread(target=msgq.run)
+    poller_thread.daemon = True
     try:
-        msgq.run()
+        poller_thread.start()
+        if msgq.wait_cfgmgr():
+            # Once we get the config manager, we can read our own config.
+            session = isc.config.ModuleCCSession(SPECFILE_LOCATION,
+                                                 msgq.config_handler,
+                                                 msgq.command_handler,
+                                                 None, True,
+                                                 msgq.socket_file)
+            session.start()
+            # And we create a thread that'll just wait for commands and
+            # handle them. We don't terminate the thread, we set it to
+            # daemon. Once the main thread terminates, it'll just die.
+            def run_session():
+                while True:
+                    session.check_command(False)
+            background_thread = threading.Thread(target=run_session)
+            background_thread.daemon = True
+            background_thread.start()
+        poller_thread.join()
     except KeyboardInterrupt:
         pass
 
diff --git a/src/bin/msgq/msgq.spec b/src/bin/msgq/msgq.spec
new file mode 100644
index 0000000..93204fa
--- /dev/null
+++ b/src/bin/msgq/msgq.spec
@@ -0,0 +1,8 @@
+{
+  "module_spec": {
+    "module_name": "Msgq",
+    "module_description": "The message queue",
+    "config_data": [],
+    "commands": []
+  }
+}
diff --git a/src/bin/msgq/msgq_messages.mes b/src/bin/msgq/msgq_messages.mes
index 21c5aa8..75e4227 100644
--- a/src/bin/msgq/msgq_messages.mes
+++ b/src/bin/msgq/msgq_messages.mes
@@ -19,6 +19,21 @@
 # <topsrcdir>/tools/reorder_message_file.py to make sure the
 # messages are in the correct order.
 
+% MSGQ_CFGMGR_SUBSCRIBED The config manager subscribed to message queue
+This is a debug message. The message queue has little bit of special handling
+for the configuration manager. This special handling is happening now.
+
+% MSGQ_COMMAND Running command %1 with arguments %2
+Debug message. The message queue received a command and it is running it.
+
+% MSGQ_COMMAND_UNKNOWN Unknown command '%1'
+The message queue received a command from other module, but it doesn't
+recognize it. This is probably either a coding error or inconsistency between
+the message queue version and version of the module.
+
+% MSGQ_CONFIG_DATA Received configuration update for the msgq: %1
+Debug message. The message queue received a configuration update, handling it.
+
 % MSGQ_HDR_DECODE_ERR Error decoding header received from socket %1: %2
 The socket with mentioned file descriptor sent a packet. However, it was not
 possible to decode the routing header of the packet. The packet is ignored.
@@ -69,6 +84,9 @@ incompatible version of a module and message queue daemon.
 There was a low-level error when sending data to a socket. The error is logged
 and the corresponding socket is dropped.
 
+% MSGQ_SHUTDOWN Stopping Msgq
+Debug message. The message queue is shutting down.
+
 % MSGQ_SOCK_CLOSE Closing socket fd %1
 Debug message. Closing the mentioned socket.
 
diff --git a/src/bin/msgq/tests/msgq_test.py b/src/bin/msgq/tests/msgq_test.py
index 417418f..00e15d8 100644
--- a/src/bin/msgq/tests/msgq_test.py
+++ b/src/bin/msgq/tests/msgq_test.py
@@ -19,7 +19,12 @@ import isc.log
 
 class TestSubscriptionManager(unittest.TestCase):
     def setUp(self):
-        self.sm = SubscriptionManager()
+        self.__cfgmgr_ready_called = 0
+        self.sm = SubscriptionManager(self.cfgmgr_ready)
+
+    def cfgmgr_ready(self):
+        # Called one more time
+        self.__cfgmgr_ready_called += 1
 
     def test_subscription_add_delete_manager(self):
         self.sm.subscribe("a", "*", 'sock1')
@@ -101,7 +106,7 @@ class TestSubscriptionManager(unittest.TestCase):
         try:
             msgq.setup()
             self.assertTrue(os.path.exists(socket_file))
-            msgq.shutdown();
+            msgq.shutdown()
             self.assertFalse(os.path.exists(socket_file))
         except socket.error:
             # ok, the install path doesn't exist at all,
@@ -115,6 +120,25 @@ class TestSubscriptionManager(unittest.TestCase):
     def test_open_socket_bad(self):
         msgq = MsgQ("/does/not/exist")
         self.assertRaises(socket.error, msgq.setup)
+        # But we can clean up after that.
+        msgq.shutdown()
+
+    def test_subscribe_cfgmgr(self):
+        """Test special handling of the config manager. Once it subscribes,
+           the message queue needs to connect and read the config. But not
+           before and only once.
+        """
+        self.assertEqual(0, self.__cfgmgr_ready_called)
+        # Not called when something else subscribes
+        self.sm.subscribe('SomethingElse', '*', 's1')
+        self.assertEqual(0, self.__cfgmgr_ready_called)
+        # Called whenever the config manager subscribes
+        self.sm.subscribe('ConfigManager', '*', 's2')
+        self.assertEqual(1, self.__cfgmgr_ready_called)
+        # But not called again when it subscribes again (should not
+        # happen in practice, but we make sure anyway)
+        self.sm.subscribe('ConfigManager', '*', 's3')
+        self.assertEqual(1, self.__cfgmgr_ready_called)
 
 class DummySocket:
     """
@@ -194,7 +218,6 @@ class MsgQThread(threading.Thread):
     def stop(self):
         self.msgq_.stop()
 
-
 class SendNonblock(unittest.TestCase):
     """
     Tests that the whole thing will not get blocked if someone does not read.
@@ -282,8 +305,10 @@ class SendNonblock(unittest.TestCase):
             if queue_pid == 0:
                 signal.alarm(120)
                 msgq.setup_poller()
+                msgq.setup_signalsock()
                 msgq.register_socket(queue)
                 msgq.run()
+                msgq.cleanup_signalsock()
             else:
                 try:
                     def killall(signum, frame):
@@ -357,6 +382,7 @@ class SendNonblock(unittest.TestCase):
         # Don't need a listen_socket
         msgq.listen_socket = DummySocket
         msgq.setup_poller()
+        msgq.setup_signalsock()
         msgq.register_socket(write)
         msgq.register_socket(control_write)
         # Queue the message for sending
@@ -384,6 +410,10 @@ class SendNonblock(unittest.TestCase):
         # Fail the test if it didn't stop
         self.assertFalse(msgq_thread.isAlive(), "Thread did not stop")
 
+        # Clean up some internals of msgq (usually called as part of
+        # shutdown, but we skip that one here)
+        msgq.cleanup_signalsock()
+
         # Check the exception from the thread, if any
         # First, if we didn't expect it; reraise it (to make test fail and
         # show the stacktrace for debugging)
@@ -456,8 +486,81 @@ class SendNonblock(unittest.TestCase):
         self.do_send_with_send_error(3, sockerr, False, sockerr)
         self.do_send_with_send_error(23, sockerr, False, sockerr)
 
+class ThreadTests(unittest.TestCase):
+    """Test various things around thread synchronization."""
+
+    def setUp(self):
+        self.__msgq = MsgQ()
+        self.__abort_wait = False
+        self.__result = None
+        self.__notify_thread = threading.Thread(target=self.__notify)
+        self.__wait_thread = threading.Thread(target=self.__wait)
+        # Make sure the threads are killed if left behind by the test.
+        self.__notify_thread.daemon = True
+        self.__wait_thread.daemon = True
+
+    def __notify(self):
+        """Call the cfgmgr_ready."""
+        if self.__abort_wait:
+            self.__msgq.cfgmgr_ready(False)
+        else:
+            self.__msgq.cfgmgr_ready()
+
+    def __wait(self):
+        """Wait for config manager and store the result."""
+        self.__result = self.__msgq.wait_cfgmgr()
+
+    def test_wait_cfgmgr(self):
+        """One thread signals the config manager subscribed, the other
+           waits for it. We then check it terminated correctly.
+        """
+        self.__notify_thread.start()
+        self.__wait_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertTrue(self.__result)
+
+    def test_wait_cfgmgr_2(self):
+        """Same as test_wait_cfgmgr, but starting the threads in reverse order
+           (the result should be the same).
+        """
+        self.__wait_thread.start()
+        self.__notify_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertTrue(self.__result)
+
+    def test_wait_abort(self):
+        """Similar to test_wait_cfgmgr, but the config manager is never
+           subscribed and it is aborted.
+        """
+        self.__abort_wait = True
+        self.__wait_thread.start()
+        self.__notify_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertIsNotNone(self.__result)
+        self.assertFalse(self.__result)
+
+    def __check_ready_and_abort(self):
+        """Check that when we first say the config manager is ready and then
+           try to abort, it uses the first result.
+        """
+        self.__msgq.cfgmgr_ready()
+        self.__msgq.cfgmgr_ready(False)
+        self.__result = self.__msgq.wait_cfgmgr()
+
+    def test_ready_and_abort(self):
+        """Perform the __check_ready_and_abort test, but in a separate thread,
+           so in case something goes wrong with the synchronisation and it
+           deadlocks, the test will terminate anyway.
+        """
+        test_thread = threading.Thread(target=self.__check_ready_and_abort)
+        test_thread.daemon = True
+        test_thread.start()
+        test_thread.join(60)
+        self.assertTrue(self.__result)
 
 if __name__ == '__main__':
-    isc.log.init("b10-msgq")
     isc.log.resetUnitTestRootLogger()
     unittest.main()
diff --git a/src/lib/python/isc/config/cfgmgr.py b/src/lib/python/isc/config/cfgmgr.py
index a200dfc..9563cab 100644
--- a/src/lib/python/isc/config/cfgmgr.py
+++ b/src/lib/python/isc/config/cfgmgr.py
@@ -234,6 +234,7 @@ class ConfigManager:
 
     def notify_boss(self):
         """Notifies the Boss module that the Config Manager is running"""
+        # TODO: Use a real, broadcast notification here.
         self.cc.group_sendmsg({"running": "ConfigManager"}, "Boss")
 
     def set_module_spec(self, spec):
diff --git a/tests/lettuce/features/msgq.feature b/tests/lettuce/features/msgq.feature
new file mode 100644
index 0000000..19973f4
--- /dev/null
+++ b/tests/lettuce/features/msgq.feature
@@ -0,0 +1,18 @@
+Feature: Message queue tests
+    Tests for the message queue daemon.
+
+    Scenario: logging
+        # We check the message queue logs.
+        Given I have bind10 running with configuration default.config
+        And wait for bind10 stderr message BIND10_STARTED_CC
+        And wait for bind10 stderr message MSGQ_START
+        And wait for bind10 stderr message MSGQ_LISTENER_STARTED
+        And wait for bind10 stderr message MSGQ_CFGMGR_SUBSCRIBED
+        And wait for bind10 stderr message CMDCTL_STARTED
+
+        # Check it handles configuration. The configuration is invalid,
+        # but it should get there anyway and we abuse it.
+        # TODO: Once it has any kind of real command or configuration
+        # value, use that instead.
+        Then set bind10 configuration Msgq to {"nonsense": 1}
+        And wait for bind10 stderr message MSGQ_CONFIG_DATA



More information about the bind10-changes mailing list