BIND 10 trac2582, updated. 275a72e95dc82e105e48654e1a3ed71a48b7840a [2582] Synchronisation when the config manager connects

BIND 10 source code commits bind10-changes at lists.isc.org
Tue Jan 15 10:16:39 UTC 2013


The branch, trac2582 has been updated
       via  275a72e95dc82e105e48654e1a3ed71a48b7840a (commit)
       via  8d31e215ffc21430b29276ed65dd44a57598f9b8 (commit)
      from  84a994f8ef9bef873fd36a0837476f7b0a2d319c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit 275a72e95dc82e105e48654e1a3ed71a48b7840a
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Tue Jan 15 11:15:31 2013 +0100

    [2582] Synchronisation when the config manager connects

commit 8d31e215ffc21430b29276ed65dd44a57598f9b8
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Tue Jan 15 10:26:54 2013 +0100

    [2582] Signal connection of config manager
    
    Let the config manager call a function when the config manager
    subscribes. The function is empty in this commit, but it'll be used
    later on.

-----------------------------------------------------------------------

Summary of changes:
 src/bin/msgq/msgq.py.in             |   55 ++++++++++++++++++-
 src/bin/msgq/msgq_messages.mes      |    4 ++
 src/bin/msgq/tests/msgq_test.py     |   99 ++++++++++++++++++++++++++++++++++-
 src/lib/python/isc/config/cfgmgr.py |    1 +
 4 files changed, 155 insertions(+), 4 deletions(-)

-----------------------------------------------------------------------
diff --git a/src/bin/msgq/msgq.py.in b/src/bin/msgq/msgq.py.in
index f047fb1..ef1fe88 100755
--- a/src/bin/msgq/msgq.py.in
+++ b/src/bin/msgq/msgq.py.in
@@ -57,8 +57,17 @@ VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
 class MsgQReceiveError(Exception): pass
 
 class SubscriptionManager:
-    def __init__(self):
+    def __init__(self, cfgmgr_ready):
+        """
+        Initialize the subscription manager.
+        parameters:
+        * cfgmgr_ready: A callable object run once the config manager
+            subscribes. This is a hackish solution, but we can't read
+            the configuration sooner.
+        """
         self.subscriptions = {}
+        self.__cfgmgr_ready = cfgmgr_ready
+        self.__cfgmgr_ready_called = False
 
     def subscribe(self, group, instance, socket):
         """Add a subscription."""
@@ -70,6 +79,10 @@ class SubscriptionManager:
         else:
             logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance)
             self.subscriptions[target] = [ socket ]
+        if group == "ConfigManager" and not self.__cfgmgr_ready_called:
+            logger.debug(TRACE_BASIC, MSGQ_CFGMGR_SUBSCRIBED)
+            self.__cfgmgr_ready_called = True
+            self.__cfgmgr_ready()
 
     def unsubscribe(self, group, instance, socket):
         """Remove the socket from the one specific subscription."""
@@ -137,10 +150,45 @@ class MsgQ:
         self.sockets = {}
         self.connection_counter = random.random()
         self.hostname = socket.gethostname()
-        self.subs = SubscriptionManager()
+        self.subs = SubscriptionManager(self.cfgmgr_ready)
         self.lnames = {}
         self.sendbuffs = {}
         self.running = False
+        self.__cfgmgr_ready = None
+        self.__cfgmgr_ready_cond = threading.Condition()
+
+    def cfgmgr_ready(self, ready=True):
+        """Notify that the config manager is either subscribed, or
+           that the msgq is shutting down and it won't connect, but
+           anybody waiting for it should stop anyway.
+
+           The ready parameter signifies if the config manager is subscribed.
+
+           This method can be called multiple times, but second and any
+           following call is simply ignored. This means the "abort" version
+           of the call can be used on any stop unconditionally, even when
+           the config manager already connected.
+        """
+        with self.__cfgmgr_ready_cond:
+            if self.__cfgmgr_ready is not None:
+                # This is a second call to this method. In that case it does
+                # nothing.
+                return
+            self.__cfgmgr_ready = ready
+            self.__cfgmgr_ready_cond.notify_all()
+
+    def wait_cfgmgr(self):
+        """Wait for msgq to subscribe.
+
+           When this returs, the config manager is either subscribed, or
+           msgq gave up waiting for it. Success is signified by the return
+           value.
+        """
+        with self.__cfgmgr_ready_cond:
+            # Wait until it either aborts or subscribes
+            while self.__cfgmgr_ready is None:
+                self.__cfgmgr_ready_cond.wait()
+            return self.__cfgmgr_ready
 
     def setup_poller(self):
         """Set up the poll thing.  Internal function."""
@@ -559,6 +607,9 @@ class MsgQ:
         # Signal it should terminate.
         self.__control_sock.close()
         self.__control_sock = None
+        # Abort anything waiting on the condition, just to make sure it's not
+        # blocked forever
+        self.cfgmgr_ready(False)
 
     def cleanup_signalsock(self):
         """Close the signal sockets. We could do it directly in shutdown,
diff --git a/src/bin/msgq/msgq_messages.mes b/src/bin/msgq/msgq_messages.mes
index 78cf9c6..eb8ba82 100644
--- a/src/bin/msgq/msgq_messages.mes
+++ b/src/bin/msgq/msgq_messages.mes
@@ -19,6 +19,10 @@
 # <topsrcdir>/tools/reorder_message_file.py to make sure the
 # messages are in the correct order.
 
+% MSGQ_CFGMGR_SUBSCRIBED The config manager subscribed to message queue
+This is a debug message. The message queue has little bit of special handling
+for the configuration manager. This special handling is happening now.
+
 % MSGQ_HDR_DECODE_ERR Error decoding header received from socket %1: %2
 The socket with mentioned file descriptor sent a packet. However, it was not
 possible to decode the routing header of the packet. The packet is ignored.
diff --git a/src/bin/msgq/tests/msgq_test.py b/src/bin/msgq/tests/msgq_test.py
index 503f02b..cd13305 100644
--- a/src/bin/msgq/tests/msgq_test.py
+++ b/src/bin/msgq/tests/msgq_test.py
@@ -19,7 +19,12 @@ import isc.log
 
 class TestSubscriptionManager(unittest.TestCase):
     def setUp(self):
-        self.sm = SubscriptionManager()
+        self.__cfgmgr_ready_called = 0
+        self.sm = SubscriptionManager(self.cfgmgr_ready)
+
+    def cfgmgr_ready(self):
+        # Called one more time
+        self.__cfgmgr_ready_called += 1
 
     def test_subscription_add_delete_manager(self):
         self.sm.subscribe("a", "*", 'sock1')
@@ -118,6 +123,23 @@ class TestSubscriptionManager(unittest.TestCase):
         # But we can clean up after that.
         msgq.shutdown()
 
+    def test_subscribe_cfgmgr(self):
+        """Test special handling of the config manager. Once it subscribes,
+           the message queue needs to connect and read the config. But not
+           before and only once.
+        """
+        self.assertEqual(0, self.__cfgmgr_ready_called)
+        # Not called when something else subscribes
+        self.sm.subscribe('SomethingElse', '*', 's1')
+        self.assertEqual(0, self.__cfgmgr_ready_called)
+        # Called whenever the config manager subscribes
+        self.sm.subscribe('ConfigManager', '*', 's2')
+        self.assertEqual(1, self.__cfgmgr_ready_called)
+        # But not called again when it subscribes again (should not
+        # happen in practice, but we make sure anyway)
+        self.sm.subscribe('ConfigManager', '*', 's3')
+        self.assertEqual(1, self.__cfgmgr_ready_called)
+
 class DummySocket:
     """
     Dummy socket class.
@@ -196,7 +218,6 @@ class MsgQThread(threading.Thread):
     def stop(self):
         self.msgq_.stop()
 
-
 class SendNonblock(unittest.TestCase):
     """
     Tests that the whole thing will not get blocked if someone does not read.
@@ -465,6 +486,80 @@ class SendNonblock(unittest.TestCase):
         self.do_send_with_send_error(3, sockerr, False, sockerr)
         self.do_send_with_send_error(23, sockerr, False, sockerr)
 
+class ThreadTests(unittest.TestCase):
+    """Test various things around thread synchronization."""
+
+    def setUp(self):
+        self.__msgq = MsgQ()
+        self.__abort_wait = False
+        self.__result = None
+        self.__notify_thread = threading.Thread(target=self.__notify)
+        self.__wait_thread = threading.Thread(target=self.__wait)
+        # Make sure the threads are killed if left behind by the test.
+        self.__notify_thread.daemon = True
+        self.__wait_thread.daemon = True
+
+    def __notify(self):
+        """Call the cfgmgr_ready."""
+        if self.__abort_wait:
+            self.__msgq.cfgmgr_ready(False)
+        else:
+            self.__msgq.cfgmgr_ready()
+
+    def __wait(self):
+        """Wait for config manager and store the result."""
+        self.__result = self.__msgq.wait_cfgmgr()
+
+    def test_wait_cfgmgr(self):
+        """One thread signals the config manager subscribed, the other
+           waits for it. We then check it terminated correctly.
+        """
+        self.__notify_thread.start()
+        self.__wait_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertTrue(self.__result)
+
+    def test_wait_cfgmgr_2(self):
+        """Same as test_wait_cfgmgr, but starting the threads in reverse order
+           (the result should be the same).
+        """
+        self.__wait_thread.start()
+        self.__notify_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertTrue(self.__result)
+
+    def test_wait_abort(self):
+        """Similar to test_wait_cfgmgr, but the config manager is never
+           subscribed and it is aborted.
+        """
+        self.__abort_wait = True
+        self.__wait_thread.start()
+        self.__notify_thread.start()
+        # Timeout to ensure the test terminates even on failure
+        self.__wait_thread.join(60)
+        self.assertIsNotNone(self.__result)
+        self.assertFalse(self.__result)
+
+    def __check_ready_and_abort(self):
+        """Check that when we first say the config manager is ready and then
+           try to abort, it uses the first result.
+        """
+        self.__msgq.cfgmgr_ready()
+        self.__msgq.cfgmgr_ready(False)
+        self.__result = self.__msgq.wait_cfgmgr()
+
+    def test_ready_and_abort(self):
+        """Perform the __check_ready_and_abort test, but in a separate thread,
+           so in case something goes wrong with the synchronisation and it
+           deadlocks, the test will terminate anyway.
+        """
+        test_thread = threading.Thread(target=self.__check_ready_and_abort)
+        test_thread.daemon = True
+        test_thread.start()
+        test_thread.join(60)
+        self.assertTrue(self.__result)
 
 if __name__ == '__main__':
     isc.log.init("b10-msgq")
diff --git a/src/lib/python/isc/config/cfgmgr.py b/src/lib/python/isc/config/cfgmgr.py
index a200dfc..9563cab 100644
--- a/src/lib/python/isc/config/cfgmgr.py
+++ b/src/lib/python/isc/config/cfgmgr.py
@@ -234,6 +234,7 @@ class ConfigManager:
 
     def notify_boss(self):
         """Notifies the Boss module that the Config Manager is running"""
+        # TODO: Use a real, broadcast notification here.
         self.cc.group_sendmsg({"running": "ConfigManager"}, "Boss")
 
     def set_module_spec(self, spec):



More information about the bind10-changes mailing list