[svn] commit: r3273 - in /trunk: ./ src/bin/bind10/ src/bin/xfrin/ src/bin/xfrout/ src/bin/zonemgr/ src/bin/zonemgr/tests/ src/lib/python/isc/notify/ src/lib/python/isc/notify/tests/
BIND 10 source code commits
bind10-changes at lists.isc.org
Tue Oct 19 10:50:30 UTC 2010
Author: vorner
Date: Tue Oct 19 10:50:29 2010
New Revision: 3273
Log:
Merge trac #335
Modified:
trunk/ (props changed)
trunk/ChangeLog
trunk/src/bin/bind10/bind10.py.in
trunk/src/bin/xfrin/xfrin.py.in
trunk/src/bin/xfrout/xfrout.py.in
trunk/src/bin/zonemgr/tests/Makefile.am
trunk/src/bin/zonemgr/tests/zonemgr_test.py
trunk/src/bin/zonemgr/zonemgr.py.in
trunk/src/lib/python/isc/notify/notify_out.py
trunk/src/lib/python/isc/notify/tests/notify_out_test.py
Modified: trunk/ChangeLog
==============================================================================
--- trunk/ChangeLog (original)
+++ trunk/ChangeLog Tue Oct 19 10:50:29 2010
@@ -1,3 +1,12 @@
+ 111. [bug]* zhanglikun, Michal Vaner
+ Make sure process xfrin/xfrout/zonemgr/cmdctl can be stoped
+ properly when user enter "ctrl+c" or 'Boss shutdown' command
+ through bindctl.
+
+ The ZonemgrRefresh.run_timer and NotifyOut.dispatcher spawn
+ a thread themself.
+ (Trac #335, svn r3273)
+
110. [func] Michal Vaner
Added isc.net.check module to check ip addresses and ports for correctness
and isc.net.addr to hold IP address. The bind10, xfrin and cmdctl programs
Modified: trunk/src/bin/bind10/bind10.py.in
==============================================================================
--- trunk/src/bin/bind10/bind10.py.in (original)
+++ trunk/src/bin/bind10/bind10.py.in Tue Oct 19 10:50:29 2010
@@ -454,12 +454,12 @@
def stop_all_processes(self):
"""Stop all processes."""
cmd = { "command": ['shutdown']}
- self.cc_session.group_sendmsg(cmd, 'Boss', 'Cmdctl')
- self.cc_session.group_sendmsg(cmd, "Boss", "ConfigManager")
- self.cc_session.group_sendmsg(cmd, "Boss", "Auth")
- self.cc_session.group_sendmsg(cmd, "Boss", "Xfrout")
- self.cc_session.group_sendmsg(cmd, "Boss", "Xfrin")
- self.cc_session.group_sendmsg(cmd, "Boss", "Zonemgr")
+ self.cc_session.group_sendmsg(cmd, 'Cmdctl', 'Cmdctl')
+ self.cc_session.group_sendmsg(cmd, "ConfigManager", "ConfigManager")
+ self.cc_session.group_sendmsg(cmd, "Auth", "Auth")
+ self.cc_session.group_sendmsg(cmd, "Xfrout", "Xfrout")
+ self.cc_session.group_sendmsg(cmd, "Xfrin", "Xfrin")
+ self.cc_session.group_sendmsg(cmd, "Zonemgr", "Zonemgr")
self.cc_session.group_sendmsg(cmd, "Boss", "Stats")
def stop_process(self, process):
@@ -477,7 +477,9 @@
except:
pass
# XXX: some delay probably useful... how much is uncertain
- time.sleep(0.5)
+ # I have changed the delay from 0.5 to 1, but sometime it's
+ # still not enough.
+ time.sleep(1)
self.reap_children()
# next try sending a SIGTERM
processes_to_stop = list(self.processes.values())
Modified: trunk/src/bin/xfrin/xfrin.py.in
==============================================================================
--- trunk/src/bin/xfrin/xfrin.py.in (original)
+++ trunk/src/bin/xfrin/xfrin.py.in Tue Oct 19 10:50:29 2010
@@ -519,11 +519,21 @@
param = {'zone_name': zone_name, 'zone_class': zone_class.to_text()}
if xfr_result == XFRIN_OK:
msg = create_command(notify_out.ZONE_NEW_DATA_READY_CMD, param)
- self._send_cc_session.group_sendmsg(msg, XFROUT_MODULE_NAME)
- self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
+ # catch the exception, in case msgq has been killed.
+ try:
+ self._send_cc_session.group_sendmsg(msg, XFROUT_MODULE_NAME)
+ self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
+ except socket.error as err:
+ log_error("Fail to send message to %s and %s, msgq may has been killed"
+ % (XFROUT_MODULE_NAME, ZONE_MANAGER_MODULE_NAME))
else:
msg = create_command(ZONE_XFRIN_FAILED, param)
- self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
+ # catch the exception, in case msgq has been killed.
+ try:
+ self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
+ except socket.error as err:
+ log_error("Fail to send message to %s, msgq may has been killed"
+ % ZONE_MANAGER_MODULE_NAME)
def startup(self):
while not self._shutdown_event.is_set():
Modified: trunk/src/bin/xfrout/xfrout.py.in
==============================================================================
--- trunk/src/bin/xfrout/xfrout.py.in (original)
+++ trunk/src/bin/xfrout/xfrout.py.in Tue Oct 19 10:50:29 2010
@@ -266,7 +266,8 @@
for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()):
if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
- self._log.log_message("error", "shutdown!")
+ self._log.log_message("info", "xfrout process is being shutdown")
+ return
# TODO: RRType.SOA() ?
if RRType(rr_data[5]) == RRType("SOA"): #ignore soa record
@@ -398,8 +399,9 @@
# normal program flow continue by trying serve_forever()
# again.
if err.args[0] != errno.EINTR: raise
-
-
+ else:
+ # serve_forever() loop has been stoped normally.
+ break
class XfroutServer:
def __init__(self):
@@ -428,9 +430,7 @@
def _start_notifier(self):
datasrc = self._unix_socket_server.get_db_file()
self._notifier = notify_out.NotifyOut(datasrc, self._log)
- td = threading.Thread(target = notify_out.dispatcher, args = (self._notifier,))
- td.daemon = True
- td.start()
+ self._notifier.dispatcher()
def send_notify(self, zone_name, zone_class):
self._notifier.send_notify(zone_name, zone_class)
@@ -443,7 +443,7 @@
answer = create_answer(1, "Unknown config data: " + str(key))
continue
self._config_data[key] = new_config[key]
-
+
if self._log:
self._log.update_config(new_config)
@@ -461,9 +461,11 @@
global xfrout_server
xfrout_server = None #Avoid shutdown is called twice
self._shutdown_event.set()
+ self._notifier.shutdown()
if self._unix_socket_server:
self._unix_socket_server.shutdown()
+ # Wait for all threads to terminate
main_thread = threading.currentThread()
for th in threading.enumerate():
if th is main_thread:
@@ -475,7 +477,7 @@
self._log.log_message("info", "Received shutdown command.")
self.shutdown()
answer = create_answer(0)
-
+
elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
zone_name = args.get('zone_name')
zone_class = args.get('zone_class')
@@ -490,7 +492,7 @@
else:
answer = create_answer(1, "Unknown command:" + str(cmd))
- return answer
+ return answer
def run(self):
'''Get and process all commands sent from cfgmgr or other modules. '''
Modified: trunk/src/bin/zonemgr/tests/Makefile.am
==============================================================================
--- trunk/src/bin/zonemgr/tests/Makefile.am (original)
+++ trunk/src/bin/zonemgr/tests/Makefile.am Tue Oct 19 10:50:29 2010
@@ -1,5 +1,7 @@
PYTESTS = zonemgr_test.py
EXTRA_DIST = $(PYTESTS)
+
+CLEANFILES = initdb.file
# later will have configure option to choose this, like: coverage run --branch
PYCOVERAGE = $(PYTHON)
Modified: trunk/src/bin/zonemgr/tests/zonemgr_test.py
==============================================================================
--- trunk/src/bin/zonemgr/tests/zonemgr_test.py (original)
+++ trunk/src/bin/zonemgr/tests/zonemgr_test.py Tue Oct 19 10:50:29 2010
@@ -45,13 +45,22 @@
class MyZonemgrRefresh(ZonemgrRefresh):
def __init__(self):
- self._cc = MySession()
- self._db_file = "initdb.file"
- current_time = time.time()
- self._max_transfer_timeout = MAX_TRANSFER_TIMEOUT
- self._lowerbound_refresh = LOWERBOUND_REFRESH
- self._lowerbound_retry = LOWERBOUND_RETRY
- self._jitter_scope = JITTER_SCOPE
+ class FakeConfig:
+ def get(self, name):
+ if name == 'lowerbound_refresh':
+ return LOWERBOUND_REFRESH
+ elif name == 'lowerbound_retry':
+ return LOWERBOUND_RETRY
+ elif name == 'max_transfer_timeout':
+ return MAX_TRANSFER_TIMEOUT
+ elif name == 'jitter_scope':
+ return JITTER_SCOPE
+ else:
+ raise ValueError('Uknown config option')
+ self._master_socket, self._slave_socket = socket.socketpair()
+ ZonemgrRefresh.__init__(self, MySession(), "initdb.file",
+ self._slave_socket, FakeConfig())
+ current_time = time.time()
self._zonemgr_refresh_info = {
('sd.cn.', 'IN'): {
'last_refresh_time': current_time,
@@ -67,8 +76,8 @@
class TestZonemgrRefresh(unittest.TestCase):
def setUp(self):
- self.stdout_backup = sys.stdout
- sys.stdout = open(os.devnull, 'w')
+ self.stderr_backup = sys.stderr
+ sys.stderr = open(os.devnull, 'w')
self.zone_refresh = MyZonemgrRefresh()
def test_random_jitter(self):
@@ -101,7 +110,7 @@
time2 = time.time()
self.assertTrue((time1 + 7200 * 3 / 4) <= zone_timeout)
self.assertTrue(zone_timeout <= time2 + 7200)
-
+
def test_set_zone_retry_timer(self):
time1 = time.time()
self.zone_refresh._set_zone_retry_timer(ZONE_NAME_CLASS1_IN)
@@ -147,6 +156,8 @@
def test_zonemgr_reload_zone(self):
soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
+ # We need to restore this not to harm other tests
+ old_get_zone_soa = sqlite3_ds.get_zone_soa
def get_zone_soa(zone_name, db_file):
return (1, 2, 'sd.cn.', 'cn.sd.', 21600, 'SOA', None,
'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600')
@@ -154,6 +165,7 @@
self.zone_refresh.zonemgr_reload_zone(ZONE_NAME_CLASS1_IN)
self.assertEqual(soa_rdata, self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_soa_rdata"])
+ sqlite3_ds.get_zone_soa = old_get_zone_soa
def test_get_zone_notifier_master(self):
notify_master = "192.168.1.1"
@@ -231,6 +243,9 @@
def test_zonemgr_add_zone(self):
soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
+ # This needs to be restored. The following test actually failed if we left
+ # this unclean
+ old_get_zone_soa = sqlite3_ds.get_zone_soa
def get_zone_soa(zone_name, db_file):
return (1, 2, 'sd.cn.', 'cn.sd.', 21600, 'SOA', None,
@@ -251,7 +266,8 @@
return None
sqlite3_ds.get_zone_soa = get_zone_soa2
self.assertRaises(ZonemgrException, self.zone_refresh.zonemgr_add_zone, \
- ZONE_NAME_CLASS1_IN)
+ ZONE_NAME_CLASS1_IN)
+ sqlite3_ds.get_zone_soa = old_get_zone_soa
def test_build_zonemgr_refresh_info(self):
soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
@@ -382,7 +398,7 @@
"""This case will run timer in daemon thread.
The zone's next_refresh_time is less than now, so zonemgr will do zone refresh
immediately. The zone's state will become "refreshing".
- Then closing the socket ,the timer will stop, and throw a ZonemgrException."""
+ """
time1 = time.time()
self.zone_refresh._zonemgr_refresh_info = {
("sd.cn.", "IN"):{
@@ -391,17 +407,11 @@
'zone_soa_rdata': 'a.dns.cn. root.cnnic.cn. 2009073105 7200 3600 2419200 21600',
'zone_state': ZONE_OK}
}
- master_socket, slave_socket = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
- self.zone_refresh._socket = master_socket
- master_socket.close()
- self.assertRaises(ZonemgrException, self.zone_refresh.run_timer)
-
- self.zone_refresh._socket = slave_socket
- listener = threading.Thread(target = self.zone_refresh.run_timer, args = ())
- listener.setDaemon(True)
- listener.start()
- time.sleep(1)
-
+ self.zone_refresh._check_sock = self.zone_refresh._master_socket
+ listener = self.zone_refresh.run_timer(daemon=True)
+ # Shut down the timer thread
+ self.zone_refresh.shutdown()
+ # After running timer, the zone's state should become "refreshing".
zone_state = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"]
self.assertTrue("refresh_timeout" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN].keys())
self.assertTrue(zone_state == ZONE_REFRESHING)
@@ -419,9 +429,16 @@
self.assertEqual(19800, self.zone_refresh._max_transfer_timeout)
self.assertEqual(0.25, self.zone_refresh._jitter_scope)
+ def test_shutdown(self):
+ self.zone_refresh._check_sock = self.zone_refresh._master_socket
+ listener = self.zone_refresh.run_timer()
+ self.assertTrue(listener.is_alive())
+ # Shut down the timer thread
+ self.zone_refresh.shutdown()
+ self.assertFalse(listener.is_alive())
def tearDown(self):
- sys.stdout = self.stdout_backup
+ sys.stderr= self.stderr_backup
class MyCCSession():
Modified: trunk/src/bin/zonemgr/zonemgr.py.in
==============================================================================
--- trunk/src/bin/zonemgr/zonemgr.py.in (original)
+++ trunk/src/bin/zonemgr/zonemgr.py.in Tue Oct 19 10:50:29 2010
@@ -16,7 +16,7 @@
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-"""\
+"""
This file implements the Secondary Manager program.
The secondary manager is one of the co-operating processes
@@ -91,16 +91,20 @@
class ZonemgrRefresh:
"""This class will maintain and manage zone refresh info.
It also provides methods to keep track of zone timers and
- do zone refresh.
+ do zone refresh.
+ Zone timers can be started by calling run_timer(), and it
+ can be stopped by calling shutdown() in another thread.
+
"""
def __init__(self, cc, db_file, slave_socket, config_data):
self._cc = cc
- self._socket = slave_socket
+ self._check_sock = slave_socket
self._db_file = db_file
self.update_config_data(config_data)
self._zonemgr_refresh_info = {}
self._build_zonemgr_refresh_info()
+ self._running = False
def _random_jitter(self, max, jitter):
"""Imposes some random jitters for refresh and
@@ -319,39 +323,81 @@
return False
- def run_timer(self):
- """Keep track of zone timers."""
- while True:
- # Zonemgr has no zone.
+ def _run_timer(self):
+ while self._running:
+ # If zonemgr has no zone, set timer timeout to LOWERBOUND_RETRY.
if self._zone_mgr_is_empty():
- time.sleep(self._lowerbound_retry) # A better time?
- continue
-
- zone_need_refresh = self._find_need_do_refresh_zone()
- # If don't get zone with minimum next refresh time, set timer timeout = lowerbound_retry
- if not zone_need_refresh:
timeout = self._lowerbound_retry
else:
- timeout = self._get_zone_next_refresh_time(zone_need_refresh) - self._get_current_time()
- if (timeout < 0):
- self._do_refresh(zone_need_refresh)
- continue
+ zone_need_refresh = self._find_need_do_refresh_zone()
+ # If don't get zone with minimum next refresh time, set timer timeout to LOWERBOUND_RETRY
+ if not zone_need_refresh:
+ timeout = LOWERBOUND_RETRY
+ else:
+ timeout = self._get_zone_next_refresh_time(zone_need_refresh) - self._get_current_time()
+ if (timeout < 0):
+ self._do_refresh(zone_need_refresh)
+ continue
""" Wait for the socket notification for a maximum time of timeout
in seconds (as float)."""
try:
- (rlist, wlist, xlist) = select.select([self._socket], [], [], timeout)
- if rlist:
- self._socket.recv(32)
- except ValueError as e:
- raise ZonemgrException("[b10-zonemgr] Socket has been closed\n")
- break
+ rlist, wlist, xlist = select.select([self._check_sock, self._read_sock], [], [], timeout)
except select.error as e:
if e.args[0] == errno.EINTR:
(rlist, wlist, xlist) = ([], [], [])
else:
- raise ZonemgrException("[b10-zonemgr] Error with select(): %s\n" % e)
+ sys.stderr.write("[b10-zonemgr] Error with select(); %s\n" % e)
break
+
+ for fd in rlist:
+ if fd == self._read_sock: # awaken by shutdown socket
+ # self._running will be False by now, if it is not a false
+ # alarm
+ continue
+ if fd == self._check_sock: # awaken by check socket
+ self._check_sock.recv(32)
+
+ def run_timer(self, daemon=False):
+ """
+ Keep track of zone timers. Spawns and starts a thread. The thread object is returned.
+
+ You can stop it by calling shutdown().
+ """
+ # Small sanity check
+ if self._running:
+ raise RuntimeError("Trying to run the timers twice at the same time")
+
+ # Prepare the launch
+ self._running = True
+ (self._read_sock, self._write_sock) = socket.socketpair()
+
+ # Start the thread
+ self._thread = threading.Thread(target = self._run_timer, args = ())
+ if daemon:
+ self._thread.setDaemon(True)
+ self._thread.start()
+
+ # Return the thread to anyone interested
+ return self._thread
+
+ def shutdown(self):
+ """
+ Stop the run_timer() thread. Block until it finished. This must be
+ called from a different thread.
+ """
+ if not self._running:
+ raise RuntimeError("Trying to shutdown, but not running")
+
+ # Ask the thread to stop
+ self._running = False
+ self._write_sock.send(b'shutdown') # make self._read_sock readble
+ # Wait for it to actually finnish
+ self._thread.join()
+ # Wipe out what we do not need
+ self._thread = None
+ self._read_sock = None
+ self._write_sock = None
def update_config_data(self, new_config):
""" update ZonemgrRefresh config """
@@ -359,7 +405,6 @@
self._lowerbound_retry = new_config.get('lowerbound_retry')
self._max_transfer_timeout = new_config.get('max_transfer_timeout')
self._jitter_scope = new_config.get('jitter_scope')
-
class Zonemgr:
"""Zone manager class."""
@@ -370,16 +415,11 @@
# Create socket pair for communicating between main thread and zonemgr timer thread
self._master_socket, self._slave_socket = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
self._zone_refresh = ZonemgrRefresh(self._cc, self._db_file, self._slave_socket, self._config_data)
- self._start_zone_refresh_timer()
+ self._zone_refresh.run_timer()
self._lock = threading.Lock()
self._shutdown_event = threading.Event()
-
- def _start_zone_refresh_timer(self):
- """Start a new thread to keep track of zone timers"""
- listener = threading.Thread(target = self._zone_refresh.run_timer, args = ())
- listener.setDaemon(True)
- listener.start()
+ self.running = False
def _setup_session(self):
"""Setup two sessions for zonemgr, one(self._module_cc) is used for receiving
@@ -410,15 +450,12 @@
"""Shutdown the zonemgr process. the thread which is keeping track of zone
timers should be terminated.
"""
+ self._zone_refresh.shutdown()
+
self._slave_socket.close()
self._master_socket.close()
-
self._shutdown_event.set()
- main_thread = threading.currentThread()
- for th in threading.enumerate():
- if th is main_thread:
- continue
- th.join()
+ self.running = False
def config_handler(self, new_config):
""" Update config data. """
@@ -472,21 +509,21 @@
with self._lock:
self._zone_refresh.zone_handle_notify(zone_name_class, master)
# Send notification to zonemgr timer thread
- self._master_socket.send(b" ")
+ self._master_socket.send(b" ")# make self._slave_socket readble
elif command == ZONE_XFRIN_SUCCESS_COMMAND:
""" Handle xfrin success command"""
zone_name_class = self._parse_cmd_params(args, command)
with self._lock:
self._zone_refresh.zone_refresh_success(zone_name_class)
- self._master_socket.send(b" ")
+ self._master_socket.send(b" ")# make self._slave_socket readble
elif command == ZONE_XFRIN_FAILED_COMMAND:
""" Handle xfrin fail command"""
zone_name_class = self._parse_cmd_params(args, command)
with self._lock:
self._zone_refresh.zone_refresh_fail(zone_name_class)
- self._master_socket.send(b" ")
+ self._master_socket.send(b" ")# make self._slave_socket readble
elif command == "shutdown":
self.shutdown()
@@ -497,6 +534,7 @@
return answer
def run(self):
+ self.running = True
while not self._shutdown_event.is_set():
self._module_cc.check_command(False)
@@ -536,6 +574,6 @@
except isc.config.ModuleCCSessionError as e:
sys.stderr.write("[b10-zonemgr] exit zonemgr process: %s\n" % str(e))
- if zonemgrd:
+ if zonemgrd and zonemgrd.running:
zonemgrd.shutdown()
Modified: trunk/src/lib/python/isc/notify/notify_out.py
==============================================================================
--- trunk/src/lib/python/isc/notify/notify_out.py (original)
+++ trunk/src/lib/python/isc/notify/notify_out.py Tue Oct 19 10:50:29 2010
@@ -19,6 +19,7 @@
import socket
import threading
import time
+import errno
from isc.datasrc import sqlite3_ds
import isc
try:
@@ -44,29 +45,11 @@
_BAD_QR = 4
_BAD_REPLY_PACKET = 5
+SOCK_DATA = b's'
def addr_to_str(addr):
return '%s#%s' % (addr[0], addr[1])
-def dispatcher(notifier):
- '''The loop function for handling notify related events.
- If one zone get the notify reply before timeout, call the
- handle to process the reply. If one zone can't get the notify
- before timeout, call the handler to resend notify or notify
- next slave.
- notifier: one object of class NotifyOut. '''
- while True:
- replied_zones, not_replied_zones = notifier._wait_for_notify_reply()
- if len(replied_zones) == 0 and len(not_replied_zones) == 0:
- time.sleep(_IDLE_SLEEP_TIME) #TODO set a better time for idle sleep
- continue
-
- for name_ in replied_zones:
- notifier._zone_notify_handler(replied_zones[name_], _EVENT_READ)
-
- for name_ in not_replied_zones:
- if not_replied_zones[name_].notify_timeout <= time.time():
- notifier._zone_notify_handler(not_replied_zones[name_], _EVENT_TIMEOUT)
-
+
class ZoneNotifyInfo:
'''This class keeps track of notify-out information for one zone.'''
@@ -115,14 +98,17 @@
class NotifyOut:
'''This class is used to handle notify logic for all zones(sending
- notify message to its slaves).The only interface provided to
- the user is send_notify(). the object of this class should be
- used together with function dispatcher(). '''
+ notify message to its slaves). notify service can be started by
+ calling dispatcher(), and it can be stoped by calling shutdown()
+ in another thread. '''
def __init__(self, datasrc_file, log=None, verbose=True):
self._notify_infos = {} # key is (zone_name, zone_class)
self._waiting_zones = []
self._notifying_zones = []
self._log = log
+ self._serving = False
+ self._read_sock, self._write_sock = socket.socketpair()
+ self._read_sock.setblocking(False)
self.notify_num = 0 # the count of in progress notifies
self._verbose = verbose
self._lock = threading.Lock()
@@ -164,6 +150,70 @@
self._notify_infos[zone_id].prepare_notify_out()
self.notify_num += 1
self._notifying_zones.append(zone_id)
+
+ def _dispatcher(self, started_event):
+ started_event.set() # Let the master know we are alive already
+ while self._serving:
+ replied_zones, not_replied_zones = self._wait_for_notify_reply()
+
+ for name_ in replied_zones:
+ self._zone_notify_handler(replied_zones[name_], _EVENT_READ)
+
+ for name_ in not_replied_zones:
+ if not_replied_zones[name_].notify_timeout <= time.time():
+ self._zone_notify_handler(not_replied_zones[name_], _EVENT_TIMEOUT)
+
+ def dispatcher(self, daemon=False):
+ """Spawns a thread that will handle notify related events.
+
+ If one zone get the notify reply before timeout, call the
+ handle to process the reply. If one zone can't get the notify
+ before timeout, call the handler to resend notify or notify
+ next slave.
+
+ The thread can be stopped by calling shutdown().
+
+ Returns the thread object to anyone interested.
+ """
+
+ if self._serving:
+ raise RuntimeError(
+ 'Dispatcher already running, tried to start twice')
+
+ # Prepare for launch
+ self._serving = True
+ started_event = threading.Event()
+
+ # Start
+ self._thread = threading.Thread(target=self._dispatcher,
+ args=[started_event])
+ if daemon:
+ self._thread.daemon = daemon
+ self._thread.start()
+
+ # Wait for it to get started
+ started_event.wait()
+
+ # Return it to anyone listening
+ return self._thread
+
+ def shutdown(self):
+ """Stop the dispatcher() thread. Blocks until the thread stopped."""
+
+ if not self._serving:
+ raise RuntimeError('Tried to stop while not running')
+
+ # Ask it to stop
+ self._serving = False
+ self._write_sock.send(SOCK_DATA) # make self._read_sock be readable.
+
+ # Wait for it
+ self._thread.join()
+
+ # Clean up
+ self._write_sock = None
+ self._read_sock = None
+ self._thread = None
def _get_rdata_data(self, rr):
return rr[7].strip()
@@ -200,49 +250,68 @@
return addr_list
def _prepare_select_info(self):
- '''Prepare the information for select(), returned
- value is one tuple
+ '''
+ Prepare the information for select(), returned
+ value is one tuple
(block_timeout, valid_socks, notifying_zones)
block_timeout: the timeout for select()
valid_socks: sockets list for waiting ready reading.
- notifying_zones: the zones which have been triggered
- for notify. '''
+ notifying_zones: the zones which have been triggered
+ for notify.
+ '''
valid_socks = []
notifying_zones = {}
- min_timeout = None
+ min_timeout = None
for info in self._notify_infos:
sock = self._notify_infos[info].get_socket()
if sock:
valid_socks.append(sock)
notifying_zones[info] = self._notify_infos[info]
tmp_timeout = self._notify_infos[info].notify_timeout
- if min_timeout:
+ if min_timeout is not None:
if tmp_timeout < min_timeout:
min_timeout = tmp_timeout
else:
min_timeout = tmp_timeout
-
- block_timeout = 0
- if min_timeout:
+
+ block_timeout = _IDLE_SLEEP_TIME
+ if min_timeout is not None:
block_timeout = min_timeout - time.time()
if block_timeout < 0:
block_timeout = 0
-
+
return (block_timeout, valid_socks, notifying_zones)
def _wait_for_notify_reply(self):
- '''receive notify replies in specified time. returned value
- is one tuple:(replied_zones, not_replied_zones)
+ '''
+ Receive notify replies in specified time. returned value
+ is one tuple:(replied_zones, not_replied_zones). ({}, {}) is
+ returned if shutdown() was called.
+
replied_zones: the zones which receive notify reply.
not_replied_zones: the zones which haven't got notify reply.
+
'''
- (block_timeout, valid_socks, notifying_zones) = self._prepare_select_info()
+ (block_timeout, valid_socks, notifying_zones) = \
+ self._prepare_select_info()
+ # This is None only during some tests
+ if self._read_sock is not None:
+ valid_socks.append(self._read_sock)
try:
r_fds, w, e = select.select(valid_socks, [], [], block_timeout)
except select.error as err:
if err.args[0] != EINTR:
- return [], []
-
+ return {}, {}
+
+ if self._read_sock in r_fds: # user has called shutdown()
+ try:
+ # Noone should write anything else than shutdown
+ assert self._read_sock.recv(len(SOCK_DATA)) == SOCK_DATA
+ return {}, {}
+ except socket.error as e: # Workaround around rare linux bug
+ if e.errno != errno.EAGAIN and e.errno != errno.EWOULDBLOCK:
+ raise
+
not_replied_zones = {}
replied_zones = {}
for info in notifying_zones:
Modified: trunk/src/lib/python/isc/notify/tests/notify_out_test.py
==============================================================================
--- trunk/src/lib/python/isc/notify/tests/notify_out_test.py (original)
+++ trunk/src/lib/python/isc/notify/tests/notify_out_test.py Tue Oct 19 10:50:29 2010
@@ -20,7 +20,7 @@
import time
import socket
from isc.datasrc import sqlite3_ds
-from isc.notify import notify_out
+from isc.notify import notify_out, SOCK_DATA
class TestZoneNotifyInfo(unittest.TestCase):
def setUp(self):
@@ -53,8 +53,6 @@
class TestNotifyOut(unittest.TestCase):
def setUp(self):
- self.old_stdout = sys.stdout
- sys.stdout = open(os.devnull, 'w')
self._db_file = tempfile.NamedTemporaryFile(delete=False)
sqlite3_ds.load(self._db_file.name, 'cn.', self._cn_data_reader)
sqlite3_ds.load(self._db_file.name, 'com.', self._com_data_reader)
@@ -70,7 +68,6 @@
info.notify_slaves.append(('1.1.1.1', 5353))
def tearDown(self):
- sys.stdout = self.old_stdout
self._db_file.close()
os.unlink(self._db_file.name)
@@ -123,6 +120,20 @@
self.assertTrue(('com.', 'IN') in timeout_zones.keys())
self.assertLess(time.time(), self._notify._notify_infos[('com.', 'IN')].notify_timeout)
+ def test_wait_for_notify_reply_2(self):
+ # Test the returned value when the read_side socket is readable.
+ self._notify.send_notify('cn.')
+ self._notify.send_notify('com.')
+
+ # Now make one socket be readable
+ self._notify._notify_infos[('cn.', 'IN')].notify_timeout = time.time() + 10
+ self._notify._notify_infos[('com.', 'IN')].notify_timeout = time.time() + 10
+ self._notify._read_sock, self._notify._write_sock = socket.socketpair()
+ self._notify._write_sock.send(SOCK_DATA)
+ replied_zones, timeout_zones = self._notify._wait_for_notify_reply()
+ self.assertEqual(0, len(replied_zones))
+ self.assertEqual(0, len(timeout_zones))
+
def test_notify_next_target(self):
self._notify.send_notify('cn.')
self._notify.send_notify('com.')
@@ -258,7 +269,7 @@
def test_prepare_select_info(self):
timeout, valid_fds, notifying_zones = self._notify._prepare_select_info()
- self.assertEqual(0, timeout)
+ self.assertEqual(notify_out._IDLE_SLEEP_TIME, timeout)
self.assertListEqual([], valid_fds)
self._notify._notify_infos[('cn.', 'IN')]._sock = 1
@@ -279,6 +290,12 @@
self.assertEqual(timeout, 0)
self.assertListEqual([2, 1], valid_fds)
+ def test_shutdown(self):
+ thread = self._notify.dispatcher()
+ self.assertTrue(thread.is_alive())
+ self._notify.shutdown()
+ self.assertFalse(thread.is_alive())
+
if __name__== "__main__":
unittest.main()
More information about the bind10-changes
mailing list