BIND 10 master, updated. e93ef80d1c7e0f431f37f75c68f45c3aa78b70f7 Merge branch 'master' of git+ssh://git.bind10.isc.org/var/bind10/git/bind10
BIND 10 source code commits
bind10-changes at lists.isc.org
Thu Jun 13 08:52:06 UTC 2013
The branch, master has been updated
via e93ef80d1c7e0f431f37f75c68f45c3aa78b70f7 (commit)
via 696f1e012969f47880ecf8c5be0c5935a99e3501 (commit)
via ea778b2bd22edec0b971791ae59008328163542f (commit)
via 9bbd16649e750e9a4c7ce5fec0265f91b10cc4fb (commit)
via 9d30cc97c91754a94f15ae288220401fb11cfc06 (commit)
via 15810521bfeea89e77334df66512863911a4d03c (commit)
via af91bbf6f4104b9731264270e2f1356b0ec74981 (commit)
via 779d5913cc017664e72d4e6008b1e7dced1a6ae4 (commit)
via 523e679dd68695f1815cfb5e0a31cc7bf4f742f6 (commit)
via e12bb75e5e456ecaaa738936aea36fc255d342f7 (commit)
via 7035e189a7f94e8284c1da4eb538e6db24fafca4 (commit)
via c049cba60db67b2ab0beeaccb690d77bd0c63689 (commit)
via b9fefbedb20b98f3a8a39550a9a1183208eb10ea (commit)
via 61a6cfae29c657daedd334fc2a157d57e85948a3 (commit)
via c188affb416fc02d45a0932fec1e788303425d88 (commit)
via 64dec90474339bab08323f8b76cde47ddd2d8f73 (commit)
via 4bed4a498354aab3faf234bc76b3f0fbc5a94fe3 (commit)
via 6ebe6bc5875d21ad1d822c251462cebe3e769aee (commit)
via 6bcd96718c01bdbe0d563f6c67c46b441147ba60 (commit)
via c1d7e3f9bfa473478a1070467bf0773f02e54b6e (commit)
via a8186631e5cd811b2ebf8df6d837182ea39177ea (commit)
via 6ca9ce763678cd8ce47680d083eeee299889427a (commit)
via 40e0617397c03de943d81e05541c4de17885f6f0 (commit)
via 33352a6b9af110cb06c40c8cfbecc26b5e3e89c2 (commit)
via e567e51a23beed32f03870d9885e462e1bdc276c (commit)
via d65d61499f0f2ca9547d90e3c36fcd83d55d8ba8 (commit)
via 8c00b35f6427ba14e85e7c335367418b28599154 (commit)
via e6a262a1e5a0091b06840a46a20a53eed38eee42 (commit)
from 310c6201416fe8b57c691a9bbb045d781c601fea (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
commit e93ef80d1c7e0f431f37f75c68f45c3aa78b70f7
Merge: 696f1e0 310c620
Author: Michal 'vorner' Vaner <vorner at vorner.cz>
Date: Thu Jun 13 10:51:54 2013 +0200
Merge branch 'master' of git+ssh://git.bind10.isc.org/var/bind10/git/bind10
commit 696f1e012969f47880ecf8c5be0c5935a99e3501
Merge: 32f964b ea778b2
Author: Michal 'vorner' Vaner <vorner at vorner.cz>
Date: Thu Jun 13 10:15:13 2013 +0200
Merge #2922
Add commands to list MsgQ's users and group subscriptions.
Notify about subscriptions and unsubscriptions.
commit ea778b2bd22edec0b971791ae59008328163542f
Author: Michal 'vorner' Vaner <vorner at vorner.cz>
Date: Thu Jun 13 10:12:48 2013 +0200
[2922] Fix path to spec file
The spec file is not generated, so use B10_FROM_SOURCE, not
B10_FROM_BUILD.
-----------------------------------------------------------------------
Summary of changes:
src/bin/bind10/init.py.in | 99 +++++++++++++-----
src/bin/bind10/tests/init_test.py.in | 55 +++++++++-
src/bin/cmdctl/cmdctl.py.in | 1 -
src/bin/msgq/msgq.py.in | 113 ++++++++++++++++++---
src/bin/msgq/msgq.spec | 14 ++-
src/bin/msgq/tests/msgq_run_test.py | 56 +++++++++++
src/bin/msgq/tests/msgq_test.py | 182 ++++++++++++++++++++++++++++++++--
src/bin/zonemgr/zonemgr.py.in | 1 +
8 files changed, 472 insertions(+), 49 deletions(-)
-----------------------------------------------------------------------
diff --git a/src/bin/bind10/init.py.in b/src/bin/bind10/init.py.in
index efc0b04..3bb7ea7 100755
--- a/src/bin/bind10/init.py.in
+++ b/src/bin/bind10/init.py.in
@@ -89,7 +89,8 @@ logger = isc.log.Logger("init")
DBG_PROCESS = logger.DBGLVL_TRACE_BASIC
DBG_COMMANDS = logger.DBGLVL_TRACE_DETAIL
-# Messages sent over the unix domain socket to indicate if it is followed by a real socket
+# Messages sent over the unix domain socket to indicate if it is followed by a
+# real socket
CREATOR_SOCKET_OK = b"1\n"
CREATOR_SOCKET_UNAVAILABLE = b"0\n"
@@ -200,7 +201,8 @@ class Init:
verbose=False, nokill=False, setuid=None, setgid=None,
username=None, cmdctl_port=None, wait_time=10):
"""
- Initialize the Init of BIND. This is a singleton (only one can run).
+ Initialize the Init of BIND. This is a singleton (only one can
+ run).
The msgq_socket_file specifies the UNIX domain socket file that the
msgq process listens on. If verbose is True, then b10-init reports
@@ -223,12 +225,13 @@ class Init:
self.component_config = {}
# Some time in future, it may happen that a single component has
# multple processes (like a pipeline-like component). If so happens,
- # name "components" may be inappropriate. But as the code isn't probably
- # completely ready for it, we leave it at components for now. We also
- # want to support multiple instances of a single component. If it turns
- # out that we'll have a single component with multiple same processes
- # or if we start multiple components with the same configuration (we do
- # this now, but it might change) is an open question.
+ # name "components" may be inappropriate. But as the code isn't
+ # probably completely ready for it, we leave it at components for
+ # now. We also want to support multiple instances of a single
+ # component. If it turns out that we'll have a single component with
+ # multiple same processes or if we start multiple components with the
+ # same configuration (we do this now, but it might change) is an open
+ # question.
self.components = {}
# Simply list of components that died and need to wait for a
# restart. Components manage their own restart schedule now
@@ -351,7 +354,8 @@ class Init:
def command_handler(self, command, args):
logger.debug(DBG_COMMANDS, BIND10_RECEIVED_COMMAND, command)
- answer = isc.config.ccsession.create_answer(1, "command not implemented")
+ answer = isc.config.ccsession.create_answer(1,
+ "command not implemented")
if type(command) != str:
answer = isc.config.ccsession.create_answer(1, "bad command")
else:
@@ -440,7 +444,8 @@ class Init:
if pid is None:
logger.debug(DBG_PROCESS, BIND10_STARTED_PROCESS, self.curproc)
else:
- logger.debug(DBG_PROCESS, BIND10_STARTED_PROCESS_PID, self.curproc, pid)
+ logger.debug(DBG_PROCESS, BIND10_STARTED_PROCESS_PID, self.curproc,
+ pid)
def process_running(self, msg, who):
"""
@@ -499,7 +504,8 @@ class Init:
if msgq_proc.process:
msgq_proc.process.kill()
logger.error(BIND10_CONNECTING_TO_CC_FAIL)
- raise CChannelConnectError("Unable to connect to c-channel after 5 seconds")
+ raise CChannelConnectError("Unable to connect to c-channel " +
+ "after 5 seconds")
# try to connect, and if we can't wait a short while
try:
@@ -507,13 +513,43 @@ class Init:
except isc.cc.session.SessionError:
time.sleep(0.1)
- # Subscribe to the message queue. The only messages we expect to receive
- # on this channel are once relating to process startup.
+ # Subscribe to the message queue. The only messages we expect to
+ # receive on this channel are once relating to process startup.
if self.cc_session is not None:
self.cc_session.group_subscribe("Init")
return msgq_proc
+ def wait_msgq(self):
+ """
+ Wait for the message queue to fully start. It does so only after
+ the config manager connects to it. We know it is ready when it
+ starts answering commands.
+
+ We don't add a specific command for it here, an error response is
+ as good as positive one to know it is alive.
+ """
+ # We do 10 times shorter sleep here (since the start should be fast
+ # now), so we have 10 times more attempts.
+ time_remaining = self.wait_time * 10
+ retry = True
+ while time_remaining > 0 and retry:
+ try:
+ self.ccs.rpc_call('AreYouThere?', 'Msgq')
+ # We don't expect this to succeed. If it does, it's programmer
+ # error
+ raise Exception("Non-existing RPC call succeeded")
+ except isc.config.RPCRecipientMissing:
+ retry = True # Not there yet
+ time.sleep(0.1)
+ time_remaining -= 1
+ except isc.config.RPCError:
+ retry = False # It doesn't like the RPC, so it's alive now
+
+ if retry: # Still not started
+ raise ProcessStartError("Msgq didn't complete the second stage " +
+ "of startup")
+
def start_cfgmgr(self):
"""
Starts the configuration manager process
@@ -536,14 +572,16 @@ class Init:
# time to wait can be set on the command line.
time_remaining = self.wait_time
msg, env = self.cc_session.group_recvmsg()
- while time_remaining > 0 and not self.process_running(msg, "ConfigManager"):
+ while time_remaining > 0 and not self.process_running(msg,
+ "ConfigManager"):
logger.debug(DBG_PROCESS, BIND10_WAIT_CFGMGR)
time.sleep(1)
time_remaining = time_remaining - 1
msg, env = self.cc_session.group_recvmsg()
if not self.process_running(msg, "ConfigManager"):
- raise ProcessStartError("Configuration manager process has not started")
+ raise ProcessStartError("Configuration manager process has not " +
+ "started")
return bind_cfgd
@@ -567,7 +605,8 @@ class Init:
# A couple of utility methods for starting processes...
- def start_process(self, name, args, c_channel_env, port=None, address=None):
+ def start_process(self, name, args, c_channel_env, port=None,
+ address=None):
"""
Given a set of command arguments, start the process and output
appropriate log messages. If the start is successful, the process
@@ -612,9 +651,9 @@ class Init:
# The next few methods start up the rest of the BIND-10 processes.
# Although many of these methods are little more than a call to
- # start_simple, they are retained (a) for testing reasons and (b) as a place
- # where modifications can be made if the process start-up sequence changes
- # for a given process.
+ # start_simple, they are retained (a) for testing reasons and (b) as a
+ # place where modifications can be made if the process start-up sequence
+ # changes for a given process.
def start_auth(self):
"""
@@ -666,6 +705,10 @@ class Init:
# inside the configurator.
self.start_ccsession(self.c_channel_env)
+ # Make sure msgq is fully started before proceeding to the rest
+ # of the components.
+ self.wait_msgq()
+
# Extract the parameters associated with Init. This can only be
# done after the CC Session is started. Note that the logging
# configuration may override the "-v" switch set on the command line.
@@ -689,7 +732,8 @@ class Init:
try:
self.cc_session = isc.cc.Session(self.msgq_socket_file)
logger.fatal(BIND10_MSGQ_ALREADY_RUNNING)
- return "b10-msgq already running, or socket file not cleaned , cannot start"
+ return "b10-msgq already running, or socket file not cleaned , " +\
+ "cannot start"
except isc.cc.session.SessionError:
# this is the case we want, where the msgq is not running
pass
@@ -948,8 +992,8 @@ class Init:
def set_creator(self, creator):
"""
- Registeres a socket creator into the b10-init. The socket creator is not
- used directly, but through a cache. The cache is created in this
+ Registeres a socket creator into the b10-init. The socket creator is
+ not used directly, but through a cache. The cache is created in this
method.
If called more than once, it raises a ValueError.
@@ -1121,9 +1165,12 @@ def parse_args(args=sys.argv[1:], Parser=OptionParser):
parser = Parser(version=VERSION)
parser.add_option("-m", "--msgq-socket-file", dest="msgq_socket_file",
type="string", default=None,
- help="UNIX domain socket file the b10-msgq daemon will use")
+ help="UNIX domain socket file the b10-msgq daemon " +
+ "will use")
parser.add_option("-i", "--no-kill", action="store_true", dest="nokill",
- default=False, help="do not send SIGTERM and SIGKILL signals to modules during shutdown")
+ default=False,
+ help="do not send SIGTERM and SIGKILL signals to " +
+ "modules during shutdown")
parser.add_option("-u", "--user", dest="user", type="string", default=None,
help="Change user after startup (must run as root)")
parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
@@ -1147,7 +1194,9 @@ def parse_args(args=sys.argv[1:], Parser=OptionParser):
default=None,
help="file to dump the PID of the BIND 10 process")
parser.add_option("-w", "--wait", dest="wait_time", type="int",
- default=10, help="Time (in seconds) to wait for config manager to start up")
+ default=10,
+ help="Time (in seconds) to wait for config manager to "
+ "start up")
(options, args) = parser.parse_args(args)
diff --git a/src/bin/bind10/tests/init_test.py.in b/src/bin/bind10/tests/init_test.py.in
index 8ac6458..913e642 100644
--- a/src/bin/bind10/tests/init_test.py.in
+++ b/src/bin/bind10/tests/init_test.py.in
@@ -16,7 +16,8 @@
# Most of the time, we omit the "init" for brevity. Sometimes,
# we want to be explicit about what we do, like when hijacking a library
# call used by the b10-init.
-from init import Init, ProcessInfo, parse_args, dump_pid, unlink_pid_file, _BASETIME
+from init import Init, ProcessInfo, parse_args, dump_pid, unlink_pid_file, \
+ _BASETIME
import init
# XXX: environment tests are currently disabled, due to the preprocessor
@@ -941,6 +942,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
init.start_ccsession = lambda _: start_ccsession()
# We need to return the original _read_bind10_config
init._read_bind10_config = lambda: Init._read_bind10_config(init)
+ init.wait_msgq = lambda: None
init.start_all_components()
self.check_started(init, True, start_auth, start_resolver)
self.check_environment_unchanged()
@@ -967,6 +969,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
init = MockInit()
self.check_preconditions(init)
+ init.wait_msgq = lambda: None
init.start_all_components()
init.runnable = True
init.config_handler(self.construct_config(False, False))
@@ -1028,6 +1031,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
init = MockInit()
self.check_preconditions(init)
+ init.wait_msgq = lambda: None
init.start_all_components()
init.runnable = True
@@ -1066,6 +1070,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
init = MockInit()
self.check_preconditions(init)
+ init.wait_msgq = lambda: None
init.start_all_components()
init.config_handler(self.construct_config(False, False))
self.check_started_dhcp(init, False, False)
@@ -1075,6 +1080,7 @@ class TestStartStopProcessesInit(unittest.TestCase):
init = MockInit()
self.check_preconditions(init)
# v6 only enabled
+ init.wait_msgq = lambda: None
init.start_all_components()
init.runnable = True
init._Init_started = True
@@ -1347,6 +1353,7 @@ class TestInitComponents(unittest.TestCase):
# Start it
orig = init._component_configurator.startup
init._component_configurator.startup = self.__unary_hook
+ init.wait_msgq = lambda: None
init.start_all_components()
init._component_configurator.startup = orig
self.__check_core(self.__param)
@@ -1499,6 +1506,7 @@ class TestInitComponents(unittest.TestCase):
pass
init.ccs = CC()
init.ccs.get_full_config = lambda: {'components': self.__compconfig}
+ init.wait_msgq = lambda: None
init.start_all_components()
self.__check_extended(self.__param)
@@ -1768,6 +1776,51 @@ class TestInitComponents(unittest.TestCase):
# this is set by ProcessInfo.spawn()
self.assertEqual(42147, pi.pid)
+ def test_wait_msgq(self):
+ """
+ Test we can wait for msgq to provide its own alias.
+
+ It is not available the first time, the second it is.
+ """
+ class RpcSession:
+ def __init__(self):
+ # Not yet called
+ self.called = 0
+
+ def rpc_call(self, command, recipient):
+ self.called += 1
+ if self.called == 1:
+ raise isc.config.RPCRecipientMissing("Not yet")
+ elif self.called == 2:
+ raise isc.config.RPCError(1, "What?")
+ else:
+ raise Exception("Called too many times")
+
+ init = MockInitSimple()
+ init.wait_time = 1
+ init.ccs = RpcSession()
+ init.wait_msgq()
+ self.assertEqual(2, init.ccs.called)
+
+ def test_wait_msgq_fail(self):
+ """
+ Test the wait_msgq fails in case the msgq does not appear
+ after so many attempts.
+ """
+ class RpcSession:
+ def __init__(self):
+ self.called = 0
+
+ def rpc_call(self, command, recipient):
+ self.called += 1
+ raise isc.config.RPCRecipientMissing("Not yet")
+
+ b10init = MockInitSimple()
+ b10init.wait_time = 1
+ b10init.ccs = RpcSession()
+ self.assertRaises(init.ProcessStartError, b10init.wait_msgq)
+ self.assertEqual(10, b10init.ccs.called)
+
def test_start_cfgmgr(self):
'''Test that b10-cfgmgr is started.'''
class DummySession():
diff --git a/src/bin/cmdctl/cmdctl.py.in b/src/bin/cmdctl/cmdctl.py.in
index b1ee903..7a9e8b8 100755
--- a/src/bin/cmdctl/cmdctl.py.in
+++ b/src/bin/cmdctl/cmdctl.py.in
@@ -36,7 +36,6 @@ import re
import ssl, socket
import isc
import pprint
-import select
import csv
import random
import time
diff --git a/src/bin/msgq/msgq.py.in b/src/bin/msgq/msgq.py.in
index efa3cbd..b6b1106 100755
--- a/src/bin/msgq/msgq.py.in
+++ b/src/bin/msgq/msgq.py.in
@@ -61,12 +61,14 @@ VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
# If B10_FROM_BUILD is set in the environment, we use data files
# from a directory relative to that, otherwise we use the ones
# installed on the system
-if "B10_FROM_BUILD" in os.environ:
- SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/msgq"
+if "B10_FROM_SOURCE" in os.environ:
+ SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/msgq"
else:
PREFIX = "@prefix@"
DATAROOTDIR = "@datarootdir@"
- SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
+ SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}",
+ DATAROOTDIR). \
+ replace("${prefix}", PREFIX)
SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
class MsgQReceiveError(Exception): pass
@@ -122,12 +124,17 @@ class SubscriptionManager:
if target in self.subscriptions:
if socket in self.subscriptions[target]:
self.subscriptions[target].remove(socket)
+ return True
+ return False
def unsubscribe_all(self, socket):
"""Remove the socket from all subscriptions."""
- for socklist in self.subscriptions.values():
+ removed_from = []
+ for subs, socklist in self.subscriptions.items():
if socket in socklist:
socklist.remove(socket)
+ removed_from.append(subs)
+ return removed_from
def find_sub(self, group, instance):
"""Return an array of sockets which want this specific group,
@@ -184,6 +191,7 @@ class MsgQ:
self.hostname = socket.gethostname()
self.subs = SubscriptionManager(self.cfgmgr_ready)
self.lnames = {}
+ self.fd_to_lname = {}
self.sendbuffs = {}
self.running = False
self.__cfgmgr_ready = None
@@ -195,6 +203,33 @@ class MsgQ:
# not for performance, so we use wide lock scopes to be on the safe
# side.
self.__lock = threading.Lock()
+ self._session = None
+
+ def members_notify(self, event, params):
+ """
+ Thin wrapper around ccs's notify. Send a notification about change
+ of some list that can be requested by the members command.
+
+ The event is one of:
+ - connected (client connected to MsgQ)
+ - disconected (client disconnected from MsgQ)
+ - subscribed (client subscribed to a group)
+ - unsubscribed (client unsubscribed from a group)
+
+ The params is dict containing:
+ - client: The lname of the client in question.
+ - group (for 'subscribed' and 'unsubscribed' events):
+ The group the client subscribed or unsubscribed from.
+
+ The notification occurs after the event, so client a subscribing for
+ notifications will get a notification about its own subscription, but
+ will not get a notification when it unsubscribes.
+ """
+ # Due to the interaction between threads (and fear it might influence
+ # sending stuff), we test this method in msgq_run_test, instead of
+ # mocking the ccs.
+ if self._session: # Don't send before we have started up
+ self._session.notify('cc_members', event, params)
def cfgmgr_ready(self, ready=True):
"""Notify that the config manager is either subscribed, or
@@ -323,11 +358,13 @@ class MsgQ:
def register_socket(self, newsocket):
"""
- Internal function to insert a socket. Used by process_accept and some tests.
+ Internal function to insert a socket. Used by process_accept and some
+ tests.
"""
self.sockets[newsocket.fileno()] = newsocket
lname = self.newlname()
self.lnames[lname] = newsocket
+ self.fd_to_lname[newsocket.fileno()] = lname
logger.debug(TRACE_BASIC, MSGQ_SOCKET_REGISTERED, newsocket.fileno(),
lname)
@@ -337,6 +374,8 @@ class MsgQ:
else:
self.add_kqueue_socket(newsocket)
+ self.members_notify('connected', {'client': lname})
+
def kill_socket(self, fd, sock):
"""Fully close down the socket."""
# Unregister events on the socket. Note that we don't have to do
@@ -345,14 +384,23 @@ class MsgQ:
if self.poller:
self.poller.unregister(sock)
- self.subs.unsubscribe_all(sock)
- lname = [ k for k, v in self.lnames.items() if v == sock ][0]
+ unsubscribed_from = self.subs.unsubscribe_all(sock)
+ lname = self.fd_to_lname[fd]
+ del self.fd_to_lname[fd]
del self.lnames[lname]
sock.close()
del self.sockets[fd]
if fd in self.sendbuffs:
del self.sendbuffs[fd]
logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
+ # Filter out just the groups.
+ unsubscribed_from_groups = set(map(lambda x: x[0], unsubscribed_from))
+ for group in unsubscribed_from_groups:
+ self.members_notify('unsubscribed', {
+ 'client': lname,
+ 'group': group
+ })
+ self.members_notify('disconnected', {'client': lname})
def __getbytes(self, fd, sock, length, continued):
"""Get exactly the requested bytes, or raise an exception if
@@ -567,7 +615,8 @@ class MsgQ:
This is done by using an increasing counter and the current
time."""
self.connection_counter += 1
- return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
+ return "%x_%x@%s" % (time.time(), self.connection_counter,
+ self.hostname)
def process_command_ping(self, sock, routing, data):
self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_PONG }, data)
@@ -644,13 +693,25 @@ class MsgQ:
if group == None or instance == None:
return # ignore invalid packets entirely
self.subs.subscribe(group, instance, sock)
+ lname = self.fd_to_lname[sock.fileno()]
+ self.members_notify('subscribed',
+ {
+ 'client': lname,
+ 'group': group
+ })
def process_command_unsubscribe(self, sock, routing, data):
group = routing[CC_HEADER_GROUP]
instance = routing[CC_HEADER_INSTANCE]
if group == None or instance == None:
return # ignore invalid packets entirely
- self.subs.unsubscribe(group, instance, sock)
+ if self.subs.unsubscribe(group, instance, sock):
+ lname = self.fd_to_lname[sock.fileno()]
+ self.members_notify('unsubscribed',
+ {
+ 'client': lname,
+ 'group': group
+ })
def run(self):
"""Process messages. Forever. Mostly."""
@@ -795,16 +856,27 @@ class MsgQ:
return isc.config.create_answer(0)
def command_handler(self, command, args):
- """The command handler (run in a separate thread).
- Not tested, currently effectively empty.
- """
+ """The command handler (run in a separate thread)."""
config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args)
with self.__lock:
if not self.running:
return
- # TODO: Any commands go here
+ # TODO: Who does validation? The ModuleCCSession or must we?
+
+ if command == 'members':
+ # List all members of MsgQ or of a group.
+ if args is None:
+ args = {}
+ group = args.get('group')
+ if group:
+ return isc.config.create_answer(0,
+ list(map(lambda sock: self.fd_to_lname[sock.fileno()],
+ self.subs.find(group, ''))))
+ else:
+ return isc.config.create_answer(0,
+ list(self.lnames.keys()))
config_logger.error(MSGQ_COMMAND_UNKNOWN, command)
return isc.config.create_answer(1, 'unknown command: ' + command)
@@ -819,7 +891,8 @@ if __name__ == "__main__":
a valid port number. Used by OptionParser() on startup."""
intval = int(value)
if (intval < 0) or (intval > 65535):
- raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
+ raise OptionValueError("%s requires a port number (0-65535)" %
+ opt_str)
parser.values.msgq_port = intval
# Parse any command-line options.
@@ -861,13 +934,23 @@ if __name__ == "__main__":
msgq.command_handler,
None, True,
msgq.socket_file)
+ msgq._session = session
session.start()
# And we create a thread that'll just wait for commands and
# handle them. We don't terminate the thread, we set it to
# daemon. Once the main thread terminates, it'll just die.
def run_session():
while True:
- session.check_command(False)
+ # As the check_command has internal mutex that is shared
+ # with sending part (which includes notify). So we don't
+ # want to hold it long-term and block using select.
+ fileno = session.get_socket().fileno()
+ try:
+ (reads, _, _) = select.select([fileno], [], [])
+ except select.error as se:
+ if se.args[0] != errno.EINTR:
+ raise
+ session.check_command(True)
background_thread = threading.Thread(target=run_session)
background_thread.daemon = True
background_thread.start()
diff --git a/src/bin/msgq/msgq.spec b/src/bin/msgq/msgq.spec
index 93204fa..4b388c5 100644
--- a/src/bin/msgq/msgq.spec
+++ b/src/bin/msgq/msgq.spec
@@ -3,6 +3,18 @@
"module_name": "Msgq",
"module_description": "The message queue",
"config_data": [],
- "commands": []
+ "commands": [
+ {
+ "command_name": "members",
+ "command_description": "Provide the list of members of a group or of the whole MsgQ if no group is given.",
+ "command_args": [
+ {
+ "item_name": "group",
+ "item_optional": true,
+ "item_type": "string"
+ }
+ ]
+ }
+ ]
}
}
diff --git a/src/bin/msgq/tests/msgq_run_test.py b/src/bin/msgq/tests/msgq_run_test.py
index 95173e0..9cf6da6 100644
--- a/src/bin/msgq/tests/msgq_run_test.py
+++ b/src/bin/msgq/tests/msgq_run_test.py
@@ -272,6 +272,62 @@ class MsgqRunTest(unittest.TestCase):
conn.close()
conn = new
+ def test_notifications(self):
+ """
+ Check that the MsgQ is actually sending notifications about events.
+ We create a socket, subscribe the socket itself and see it receives
+ it's own notification.
+
+ Testing all the places where notifications happen is task for the
+ common unit tests in msgq_test.py.
+
+ The test is here, because there might be some trouble with multiple
+ threads in msgq (see the note about locking on the module CC session
+ when sending message from one thread and listening for commands in the
+ other) which would be hard to test using pure unit tests. Testing
+ runnig whole msgq tests that implicitly.
+ """
+ conn = self.__get_connection()
+ # Activate the session, pretend to be the config manager.
+ conn.group_subscribe('ConfigManager')
+ # Answer request for logging config
+ (msg, env) = conn.group_recvmsg(nonblock=False)
+ self.assertEqual({'command': ['get_config',
+ {'module_name': 'Logging'}]},
+ msg)
+ conn.group_reply(env, {'result': [0, {}]})
+ # It sends its spec.
+ (msg, env) = conn.group_recvmsg(nonblock=False)
+ self.assertEqual('module_spec', msg['command'][0])
+ conn.group_reply(env, {'result': [0]})
+ # It asks for its own config
+ (msg, env) = conn.group_recvmsg(nonblock=False)
+ self.assertEqual({'command': ['get_config',
+ {'module_name': 'Msgq'}]},
+ msg)
+ conn.group_reply(env, {'result': [0, {}]})
+ # Synchronization - make sure the session is running before
+ # we continue, so we get the notification. Similar synchronisation
+ # as in b10-init, but we don't have full ccsession here, so we
+ # do so manually.
+ synchronised = False
+ attempts = 100
+ while not synchronised and attempts > 0:
+ time.sleep(0.1)
+ seq = conn.group_sendmsg({'command': ['Are you running?']},
+ 'Msgq', want_answer=True)
+ msg = conn.group_recvmsg(nonblock=False, seq=seq)
+ synchronised = msg[0] != -1
+ attempts -= 1
+ self.assertTrue(synchronised)
+ # The actual test
+ conn.group_subscribe('notifications/cc_members')
+ (msg, env) = conn.group_recvmsg(nonblock=False)
+ self.assertEqual({'notification': ['subscribed', {
+ 'client': conn.lname,
+ 'group': 'notifications/cc_members'
+ }]}, msg)
+
if __name__ == '__main__':
isc.log.init("msgq-tests")
isc.log.resetUnitTestRootLogger()
diff --git a/src/bin/msgq/tests/msgq_test.py b/src/bin/msgq/tests/msgq_test.py
index e5a5656..ffe1940 100644
--- a/src/bin/msgq/tests/msgq_test.py
+++ b/src/bin/msgq/tests/msgq_test.py
@@ -63,8 +63,11 @@ class TestSubscriptionManager(unittest.TestCase):
socks = [ 's1', 's2', 's3', 's4', 's5' ]
for s in socks:
self.sm.subscribe("a", "*", s)
- self.sm.unsubscribe("a", "*", 's3')
- self.assertEqual(self.sm.find_sub("a", "*"), [ 's1', 's2', 's4', 's5' ])
+ self.assertTrue(self.sm.unsubscribe("a", "*", 's3'))
+ # Unsubscribe from group it is not in
+ self.assertFalse(self.sm.unsubscribe("a", "*", 's42'))
+ self.assertEqual(self.sm.find_sub("a", "*"),
+ [ 's1', 's2', 's4', 's5' ])
def test_unsubscribe_all(self):
self.sm.subscribe('g1', 'i1', 's1')
@@ -75,7 +78,9 @@ class TestSubscriptionManager(unittest.TestCase):
self.sm.subscribe('g2', 'i1', 's2')
self.sm.subscribe('g2', 'i2', 's1')
self.sm.subscribe('g2', 'i2', 's2')
- self.sm.unsubscribe_all('s1')
+ self.assertEqual(set([('g1', 'i1'), ('g1', 'i2'), ('g2', 'i1'),
+ ('g2', 'i2')]),
+ set(self.sm.unsubscribe_all('s1')))
self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's2' ])
self.assertEqual(self.sm.find_sub("g1", "i2"), [ 's2' ])
self.assertEqual(self.sm.find_sub("g2", "i1"), [ 's2' ])
@@ -178,6 +183,157 @@ class MsgQTest(unittest.TestCase):
data = json.loads(msg[6 + header_len:].decode('utf-8'))
return (header, data)
+ def test_unknown_command(self):
+ """
+ Test the command handler returns error when the command is unknown.
+ """
+ # Fake we are running, to disable test workarounds
+ self.__msgq.running = True
+ self.assertEqual({'result': [1, "unknown command: unknown"]},
+ self.__msgq.command_handler('unknown', {}))
+
+ def test_get_members(self):
+ """
+ Test getting members of a group or of all connected clients.
+ """
+ # Push two dummy "clients" into msgq (the ugly way, by directly
+ # tweaking relevant data structures).
+ class Sock:
+ def __init__(self, fileno):
+ self.fileno = lambda: fileno
+ self.__msgq.lnames['first'] = Sock(1)
+ self.__msgq.lnames['second'] = Sock(2)
+ self.__msgq.fd_to_lname[1] = 'first'
+ self.__msgq.fd_to_lname[2] = 'second'
+ # Subscribe them to some groups
+ self.__msgq.process_command_subscribe(self.__msgq.lnames['first'],
+ {'group': 'G1', 'instance': '*'},
+ None)
+ self.__msgq.process_command_subscribe(self.__msgq.lnames['second'],
+ {'group': 'G1', 'instance': '*'},
+ None)
+ self.__msgq.process_command_subscribe(self.__msgq.lnames['second'],
+ {'group': 'G2', 'instance': '*'},
+ None)
+ # Now query content of some groups through the command handler.
+ self.__msgq.running = True # Enable the command handler
+ def check_both(result):
+ """
+ Check the result is successful one and it contains both lnames (in
+ any order).
+ """
+ array = result['result'][1]
+ self.assertEqual(set(['first', 'second']), set(array))
+ self.assertEqual({'result': [0, array]}, result)
+ # Make sure the result can be encoded as JSON
+ # (there seems to be types that look like a list but JSON choks
+ # on them)
+ json.dumps(result)
+ # Members of the G1 and G2
+ self.assertEqual({'result': [0, ['second']]},
+ self.__msgq.command_handler('members',
+ {'group': 'G2'}))
+ check_both(self.__msgq.command_handler('members', {'group': 'G1'}))
+ # We pretend that all the possible groups exist, just that most
+ # of them are empty. So requesting for Empty is request for an empty
+ # group and should not fail.
+ self.assertEqual({'result': [0, []]},
+ self.__msgq.command_handler('members',
+ {'group': 'Empty'}))
+ # Without the name of the group, we just get all the clients.
+ check_both(self.__msgq.command_handler('members', {}))
+ # Omitting the parameters completely in such case is OK
+ check_both(self.__msgq.command_handler('members', None))
+
+ def notifications_setup(self):
+ """
+ Common setup of some notifications tests. Mock several things.
+ """
+ # Mock the method to send notifications (we don't really want
+ # to send them now, just see they'd be sent).
+ # Mock the poller, as we don't need it at all (and we don't have
+ # real socket to give it now).
+ notifications = []
+ def send_notification(event, params):
+ notifications.append((event, params))
+ class FakePoller:
+ def register(self, socket, mode):
+ pass
+ def unregister(self, sock):
+ pass
+ self.__msgq.members_notify = send_notification
+ self.__msgq.poller = FakePoller()
+
+ # Create a socket
+ class Sock:
+ def __init__(self, fileno):
+ self.fileno = lambda: fileno
+ def close(self):
+ pass
+ sock = Sock(1)
+ return notifications, sock
+
+ def test_notifies(self):
+ """
+ Test the message queue sends notifications about connecting,
+ disconnecting and subscription changes.
+ """
+ notifications, sock = self.notifications_setup()
+
+ # We should notify about new cliend when we register it
+ self.__msgq.register_socket(sock)
+ lname = self.__msgq.fd_to_lname[1] # Steal the lname
+ self.assertEqual([('connected', {'client': lname})], notifications)
+ notifications.clear()
+
+ # A notification should happen for a subscription to a group
+ self.__msgq.process_command_subscribe(sock, {'group': 'G',
+ 'instance': '*'},
+ None)
+ self.assertEqual([('subscribed', {'client': lname, 'group': 'G'})],
+ notifications)
+ notifications.clear()
+
+ # As well for unsubscription
+ self.__msgq.process_command_unsubscribe(sock, {'group': 'G',
+ 'instance': '*'},
+ None)
+ self.assertEqual([('unsubscribed', {'client': lname, 'group': 'G'})],
+ notifications)
+ notifications.clear()
+
+ # Unsubscription from a group it isn't subscribed to
+ self.__msgq.process_command_unsubscribe(sock, {'group': 'H',
+ 'instance': '*'},
+ None)
+ self.assertEqual([], notifications)
+
+ # And, finally, for removal of client
+ self.__msgq.kill_socket(sock.fileno(), sock)
+ self.assertEqual([('disconnected', {'client': lname})], notifications)
+
+ def test_notifies_implicit_kill(self):
+ """
+ Test that the unsubscription notifications are sent before the socket
+ is dropped, even in case it does not unsubscribe explicitly.
+ """
+ notifications, sock = self.notifications_setup()
+
+ # Register and subscribe. Notifications for these are in above test.
+ self.__msgq.register_socket(sock)
+ lname = self.__msgq.fd_to_lname[1] # Steal the lname
+ self.__msgq.process_command_subscribe(sock, {'group': 'G',
+ 'instance': '*'},
+ None)
+ notifications.clear()
+
+ self.__msgq.kill_socket(sock.fileno(), sock)
+ # Now, the notification for unsubscribe should be first, second for
+ # the disconnection.
+ self.assertEqual([('unsubscribed', {'client': lname, 'group': 'G'}),
+ ('disconnected', {'client': lname})
+ ], notifications)
+
def test_undeliverable_errors(self):
"""
Send several packets through the MsgQ and check it generates
@@ -412,12 +568,17 @@ class SendNonblock(unittest.TestCase):
The write end is put into the message queue, so we can check it.
It returns (msgq, read_end, write_end). It is expected the sockets
are closed by the caller afterwards.
+
+ Also check the sockets are registered correctly (eg. internal data
+ structures are there for them).
'''
msgq = MsgQ()
# We do only partial setup, so we don't create the listening socket
msgq.setup_poller()
(read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
msgq.register_socket(write)
+ self.assertEqual(1, len(msgq.lnames))
+ self.assertEqual(write, msgq.lnames[msgq.fd_to_lname[write.fileno()]])
return (msgq, read, write)
def infinite_sender(self, sender):
@@ -437,8 +598,15 @@ class SendNonblock(unittest.TestCase):
# Explicitly close temporary socket pair as the Python
# interpreter expects it. It may not be 100% exception safe,
# but since this is only for tests we prefer brevity.
+ # Actually, the write end is often closed by the sender.
+ if write.fileno() != -1:
+ # Some of the senders passed here kill the socket internally.
+ # So kill it only if not yet done so. If the socket is closed,
+ # it gets -1 as fileno().
+ msgq.kill_socket(write.fileno(), write)
+ self.assertFalse(msgq.lnames)
+ self.assertFalse(msgq.fd_to_lname)
read.close()
- write.close()
def test_infinite_sendmsg(self):
"""
@@ -640,9 +808,11 @@ class SendNonblock(unittest.TestCase):
send_exception is raised by BadSocket.
"""
(write, read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
- (control_write, control_read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+ (control_write, control_read) = socket.socketpair(socket.AF_UNIX,
+ socket.SOCK_STREAM)
badwrite = BadSocket(write, raise_on_send, send_exception)
- self.do_send(badwrite, read, control_write, control_read, expect_answer, expect_send_exception)
+ self.do_send(badwrite, read, control_write, control_read,
+ expect_answer, expect_send_exception)
write.close()
read.close()
control_write.close()
diff --git a/src/bin/zonemgr/zonemgr.py.in b/src/bin/zonemgr/zonemgr.py.in
index fcb929a..71c7aae 100755
--- a/src/bin/zonemgr/zonemgr.py.in
+++ b/src/bin/zonemgr/zonemgr.py.in
@@ -691,6 +691,7 @@ class Zonemgr:
try:
while not self._shutdown_event.is_set():
fileno = self._module_cc.get_socket().fileno()
+ reads = []
# Wait with select() until there is something to read,
# and then read it using a non-blocking read
# This may or may not be relevant data for this loop,
More information about the bind10-changes
mailing list