BIND 10 trac2582, updated. 2283402c2b94545fa66cbc5e8f7294cb39924568 [2582] Background the handling of commands

BIND 10 source code commits bind10-changes at lists.isc.org
Tue Jan 15 12:09:23 UTC 2013


The branch, trac2582 has been updated
       via  2283402c2b94545fa66cbc5e8f7294cb39924568 (commit)
       via  2f3611a364d4ec7b2626921c22029b8dbc024404 (commit)
       via  6a1247051d30f4eb5be35b661f74a90a51390cbf (commit)
       via  a7872f60cba423597e3049aa8acd466fbe29882d (commit)
       via  973bfad40ba4fe946d53d200c7e27ab32d79d0de (commit)
      from  275a72e95dc82e105e48654e1a3ed71a48b7840a (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit 2283402c2b94545fa66cbc5e8f7294cb39924568
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Tue Jan 15 13:09:03 2013 +0100

    [2582] Background the handling of commands

commit 2f3611a364d4ec7b2626921c22029b8dbc024404
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Tue Jan 15 13:01:15 2013 +0100

    [2582] Logging of msgq configuration

commit 6a1247051d30f4eb5be35b661f74a90a51390cbf
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Tue Jan 15 12:54:55 2013 +0100

    [2582] Unrelated cleanup: don't use global variable
    
    The original code had a TODO, fixing it.

commit a7872f60cba423597e3049aa8acd466fbe29882d
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Tue Jan 15 12:52:22 2013 +0100

    [2582] Perform some locking of msgq
    
    Not that it would be obvious what should be locked, mostly to be on the
    safe side.

commit 973bfad40ba4fe946d53d200c7e27ab32d79d0de
Author: Michal 'vorner' Vaner <michal.vaner at nic.cz>
Date:   Tue Jan 15 12:43:50 2013 +0100

    [2582] Let the msgq connect to itself
    
    Once the config manager is ready, connect to itself to get the config.
    Not covered by unit tests, since it is part of startup routine and
    requires interaction with other modules, but lettuce succeeds, which
    means msgq itself works.

-----------------------------------------------------------------------

Summary of changes:
 src/bin/msgq/Makefile.am       |    5 +-
 src/bin/msgq/msgq.py.in        |  145 ++++++++++++++++++++++++++++------------
 src/bin/msgq/msgq.spec         |    8 +++
 src/bin/msgq/msgq_messages.mes |   11 +++
 4 files changed, 125 insertions(+), 44 deletions(-)
 create mode 100644 src/bin/msgq/msgq.spec

-----------------------------------------------------------------------
diff --git a/src/bin/msgq/Makefile.am b/src/bin/msgq/Makefile.am
index 5f377b1..a49b125 100644
--- a/src/bin/msgq/Makefile.am
+++ b/src/bin/msgq/Makefile.am
@@ -4,13 +4,16 @@ pkglibexecdir = $(libexecdir)/@PACKAGE@
 
 pkglibexec_SCRIPTS = b10-msgq
 
+b10_msgqdir = $(pkgdatadir)
+b10_msgq_DATA = msgq.spec
+
 CLEANFILES = b10-msgq msgq.pyc
 CLEANFILES += $(PYTHON_LOGMSGPKG_DIR)/work/msgq_messages.py
 CLEANFILES += $(PYTHON_LOGMSGPKG_DIR)/work/msgq_messages.pyc
 
 man_MANS = b10-msgq.8
 DISTCLEANFILES = $(man_MANS)
-EXTRA_DIST = $(man_MANS) msgq.xml msgq_messages.mes
+EXTRA_DIST = $(man_MANS) msgq.xml msgq_messages.mes msgq.spec
 
 nodist_pylogmessage_PYTHON = $(PYTHON_LOGMSGPKG_DIR)/work/msgq_messages.py
 pylogmessagedir = $(pyexecdir)/isc/log_messages/
diff --git a/src/bin/msgq/msgq.py.in b/src/bin/msgq/msgq.py.in
index ef1fe88..84f9b4f 100755
--- a/src/bin/msgq/msgq.py.in
+++ b/src/bin/msgq/msgq.py.in
@@ -30,6 +30,7 @@ import time
 import select
 import random
 import threading
+import isc.config.ccsession
 from optparse import OptionParser, OptionValueError
 import isc.util.process
 import isc.log
@@ -38,6 +39,8 @@ from isc.log_messages.msgq_messages import *
 import isc.cc
 
 isc.util.process.rename()
+
+isc.log.init("b10-msgq")
 # Logger that is used in the actual msgq handling - startup, shutdown and the
 # poller thread.
 logger = isc.log.Logger("msgq")
@@ -54,6 +57,17 @@ TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
 # number, and the overall BIND 10 version number (set in configure.ac).
 VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
 
+# If B10_FROM_BUILD is set in the environment, we use data files
+# from a directory relative to that, otherwise we use the ones
+# installed on the system
+if "B10_FROM_BUILD" in os.environ:
+    SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/msgq"
+else:
+    PREFIX = "@prefix@"
+    DATAROOTDIR = "@datarootdir@"
+    SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
+SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
+
 class MsgQReceiveError(Exception): pass
 
 class SubscriptionManager:
@@ -156,6 +170,13 @@ class MsgQ:
         self.running = False
         self.__cfgmgr_ready = None
         self.__cfgmgr_ready_cond = threading.Condition()
+        # A lock used when the message queue does anything more complicated.
+        # It is mostly a safety measure, the threads doing so should be mostly
+        # independent, and the one with config session should be read only,
+        # but with threads, one never knows. We use threads for concurrency,
+        # not for performance, so we use wide lock scopes to be on the safe
+        # side.
+        self.__lock = threading.Lock()
 
     def cfgmgr_ready(self, ready=True):
         """Notify that the config manager is either subscribed, or
@@ -563,20 +584,21 @@ class MsgQ:
                 else:
                     logger.fatal(MSGQ_POLL_ERR, err)
                     break
-            for (fd, event) in events:
-                if fd == self.listen_socket.fileno():
-                    self.process_accept()
-                elif fd == self.__poller_sock.fileno():
-                    # If it's the signal socket, we should terminate now.
-                    self.running = False
-                    break
-                else:
-                    if event & select.POLLOUT:
-                        self.__process_write(fd)
-                    elif event & select.POLLIN:
-                        self.process_socket(fd)
+            with self.__lock:
+                for (fd, event) in events:
+                    if fd == self.listen_socket.fileno():
+                        self.process_accept()
+                    elif fd == self.__poller_sock.fileno():
+                        # If it's the signal socket, we should terminate now.
+                        self.running = False
+                        break
                     else:
-                        logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
+                        if event & select.POLLOUT:
+                            self.__process_write(fd)
+                        elif event & select.POLLIN:
+                            self.process_socket(fd)
+                        else:
+                            logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
 
     def run_kqueue(self):
         while self.running:
@@ -586,22 +608,23 @@ class MsgQ:
             if not events:
                 raise RuntimeError('serve: kqueue returned no events')
 
-            for event in events:
-                if event.ident == self.listen_socket.fileno():
-                    self.process_accept()
-                elif event.ident == self.__poller_sock.fileno():
-                    # If it's the signal socket, we should terminate now.
-                    self.running = False
-                    break;
-                else:
-                    if event.filter == select.KQ_FILTER_WRITE:
-                        self.__process_write(event.ident)
-                    if event.filter == select.KQ_FILTER_READ and \
-                            event.data > 0:
-                        self.process_socket(event.ident)
-                    elif event.flags & select.KQ_EV_EOF:
-                        self.kill_socket(event.ident,
-                                         self.sockets[event.ident])
+            with self.__lock:
+                for event in events:
+                    if event.ident == self.listen_socket.fileno():
+                        self.process_accept()
+                    elif event.ident == self.__poller_sock.fileno():
+                        # If it's the signal socket, we should terminate now.
+                        self.running = False
+                        break;
+                    else:
+                        if event.filter == select.KQ_FILTER_WRITE:
+                            self.__process_write(event.ident)
+                        if event.filter == select.KQ_FILTER_READ and \
+                                event.data > 0:
+                            self.process_socket(event.ident)
+                        elif event.flags & select.KQ_EV_EOF:
+                            self.kill_socket(event.ident,
+                                             self.sockets[event.ident])
 
     def stop(self):
         # Signal it should terminate.
@@ -630,11 +653,36 @@ class MsgQ:
         if os.path.exists(self.socket_file):
             os.remove(self.socket_file)
 
-# can signal handling and calling a destructor be done without a
-# global variable?
-msgq = None
+    def config_handler(self, new_config):
+        """The configuration handler (run in a separate thread).
+           Not tested, currently effectively empty.
+        """
+        config_logger.debug(TRACE_DETAIL, MSGQ_CONFIG_DATA, new_config)
+
+        with self.__lock:
+            if not self.running:
+                return
+
+            # TODO: Any config handlig goes here.
 
-def signal_handler(signal, frame):
+            return isc.config.create_answer(0)
+
+    def command_handler(self, command, args):
+        """The command handler (run in a separate thread).
+           Not tested, currently effectively empty.
+        """
+        config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args)
+
+        with self.__lock:
+            if not self.running:
+                return
+
+            # TODO: Any commands go here
+
+            config_logger.error(MSGQ_COMMAND_UNKNOWN, command)
+            return isc.config.create_answer(1, 'unknown command: ' + command)
+
+def signal_handler(msgq, signal, frame):
     if msgq:
         msgq.stop()
 
@@ -649,6 +697,7 @@ if __name__ == "__main__":
 
     # Parse any command-line options.
     parser = OptionParser(version=VERSION)
+    # TODO: Should we remove the option?
     parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
                       help="display more about what is going on")
     parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
@@ -656,21 +705,14 @@ if __name__ == "__main__":
                       help="UNIX domain socket file the msgq daemon will use")
     (options, args) = parser.parse_args()
 
-    # Init logging, according to the parameters.
-    # FIXME: Do proper logger configuration, this is just a hack
-    # This is #2582
-    sev = 'INFO'
-    if options.verbose:
-        sev = 'DEBUG'
-    isc.log.init("b10-msgq", buffer=False, severity=sev, debuglevel=99)
-
-    signal.signal(signal.SIGTERM, signal_handler)
-
     # Announce startup.
     logger.debug(TRACE_START, MSGQ_START, VERSION)
 
     msgq = MsgQ(options.msgq_socket_file, options.verbose)
 
+    signal.signal(signal.SIGTERM,
+                  lambda signal, frame: signal_handler(msgq, signal, frame))
+
     try:
         msgq.setup()
     except Exception as e:
@@ -685,6 +727,23 @@ if __name__ == "__main__":
     poller_thread.daemon = True
     try:
         poller_thread.start()
+        if msgq.wait_cfgmgr():
+            # Once we get the config manager, we can read our own config.
+            session = isc.config.ModuleCCSession(SPECFILE_LOCATION,
+                                                 msgq.config_handler,
+                                                 msgq.command_handler,
+                                                 None, True,
+                                                 msgq.socket_file)
+            session.start()
+            # And we create a thread that'll just wait for commands and
+            # handle them. We don't terminate the thread, we set it to
+            # daemon. Once the main thread terminates, it'll just die.
+            def run_session():
+                while True:
+                    session.check_command(False)
+            background_thread = threading.Thread(target=run_session)
+            background_thread.daemon = True
+            background_thread.start()
         poller_thread.join()
     except KeyboardInterrupt:
         pass
diff --git a/src/bin/msgq/msgq.spec b/src/bin/msgq/msgq.spec
new file mode 100644
index 0000000..93204fa
--- /dev/null
+++ b/src/bin/msgq/msgq.spec
@@ -0,0 +1,8 @@
+{
+  "module_spec": {
+    "module_name": "Msgq",
+    "module_description": "The message queue",
+    "config_data": [],
+    "commands": []
+  }
+}
diff --git a/src/bin/msgq/msgq_messages.mes b/src/bin/msgq/msgq_messages.mes
index eb8ba82..b88486b 100644
--- a/src/bin/msgq/msgq_messages.mes
+++ b/src/bin/msgq/msgq_messages.mes
@@ -23,6 +23,17 @@
 This is a debug message. The message queue has little bit of special handling
 for the configuration manager. This special handling is happening now.
 
+% MSGQ_COMMAND Running command %1 with arguments %2
+Debug message. The message queue received a command and it is running it.
+
+% MSGQ_COMMAND_UNKNOWN Unknown command '%1'
+The message queue received a command from other module, but it doesn't
+recognize it. This is probably either a coding error or inconsistency between
+the message queue version and version of the module.
+
+% MSGQ_CONFIG_DATA Received configuration for the msgq: %1
+Debug message. The message queue received a configuration, handling it.
+
 % MSGQ_HDR_DECODE_ERR Error decoding header received from socket %1: %2
 The socket with mentioned file descriptor sent a packet. However, it was not
 possible to decode the routing header of the packet. The packet is ignored.



More information about the bind10-changes mailing list