[svn] commit: r3200 - in /branches/trac335/src: bin/xfrout/xfrout.py.in lib/python/isc/notify/notify_out.py lib/python/isc/notify/tests/notify_out_test.py
BIND 10 source code commits
bind10-changes at lists.isc.org
Wed Oct 13 11:19:16 UTC 2010
Author: vorner
Date: Wed Oct 13 11:19:16 2010
New Revision: 3200
Log:
Race conditions in xfrout
Similar changes as in previous commit, just in the notify_out module.
Modified:
branches/trac335/src/bin/xfrout/xfrout.py.in
branches/trac335/src/lib/python/isc/notify/notify_out.py
branches/trac335/src/lib/python/isc/notify/tests/notify_out_test.py
Modified: branches/trac335/src/bin/xfrout/xfrout.py.in
==============================================================================
--- branches/trac335/src/bin/xfrout/xfrout.py.in (original)
+++ branches/trac335/src/bin/xfrout/xfrout.py.in Wed Oct 13 11:19:16 2010
@@ -426,8 +426,7 @@
def _start_notifier(self):
datasrc = self._unix_socket_server.get_db_file()
self._notifier = notify_out.NotifyOut(datasrc, self._log)
- td = threading.Thread(target=self._notifier.dispatcher)
- td.start()
+ self._notifier.dispatcher()
def send_notify(self, zone_name, zone_class):
self._notifier.send_notify(zone_name, zone_class)
@@ -440,7 +439,7 @@
answer = create_answer(1, "Unknown config data: " + str(key))
continue
self._config_data[key] = new_config[key]
-
+
if self._log:
self._log.update_config(new_config)
@@ -462,19 +461,12 @@
if self._unix_socket_server:
self._unix_socket_server.shutdown()
- main_thread = threading.currentThread()
- # close the thread which's doing zone transfer.
- for th in threading.enumerate():
- if th is main_thread:
- continue
- th.join()
-
def command_handler(self, cmd, args):
if cmd == "shutdown":
self._log.log_message("info", "Received shutdown command.")
self.shutdown()
answer = create_answer(0)
-
+
elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
zone_name = args.get('zone_name')
zone_class = args.get('zone_class')
@@ -489,7 +481,7 @@
else:
answer = create_answer(1, "Unknown command:" + str(cmd))
- return answer
+ return answer
def run(self):
'''Get and process all commands sent from cfgmgr or other modules. '''
Modified: branches/trac335/src/lib/python/isc/notify/notify_out.py
==============================================================================
--- branches/trac335/src/lib/python/isc/notify/notify_out.py (original)
+++ branches/trac335/src/lib/python/isc/notify/notify_out.py Wed Oct 13 11:19:16 2010
@@ -105,8 +105,7 @@
self._notifying_zones = []
self._log = log
self._serving = False
- self._is_shut_down = threading.Event()
- self._read_sock, self._write_sock = socket.socketpair()
+ self._read_sock = None
self.notify_num = 0 # the count of in progress notifies
self._verbose = verbose
self._lock = threading.Lock()
@@ -149,41 +148,76 @@
self.notify_num += 1
self._notifying_zones.append(zone_id)
- def dispatcher(self):
- '''The loop function for handling notify related events.
+ def _dispatcher(self):
+ while self._serving:
+ # Let the master know we are alive already
+ if self._started_event:
+ self._started_event.set()
+
+ replied_zones, not_replied_zones = self._wait_for_notify_reply()
+
+ for name_ in replied_zones:
+ self._zone_notify_handler(replied_zones[name_], _EVENT_READ)
+
+ for name_ in not_replied_zones:
+ if not_replied_zones[name_].notify_timeout <= time.time():
+ self._zone_notify_handler(not_replied_zones[name_], _EVENT_TIMEOUT)
+
+ def dispatcher(self, daemon=False):
+ """
+ Spawns a thread that will handle notify related events.
+
If one zone get the notify reply before timeout, call the
handle to process the reply. If one zone can't get the notify
before timeout, call the handler to resend notify or notify
next slave.
- The loop can be stoped by calling shutdown() in another
- thread. '''
+
+ The thread can be stopped by calling shutdown().
+
+ Returns the thread object to anyone interested.
+ """
+
+ if self._serving:
+ raise RuntimeError(
+ 'Dispatcher already running, tried to start twice')
+
+ # Prepare for launch
self._serving = True
- self._is_shut_down.clear()
- while self._serving:
- replied_zones, not_replied_zones = self._wait_for_notify_reply()
- if replied_zones is None:
- break
-
- if len(replied_zones) == 0 and len(not_replied_zones) == 0:
- time.sleep(_IDLE_SLEEP_TIME) #TODO set a better time for idle sleep
- continue
-
- for name_ in replied_zones:
- self._zone_notify_handler(replied_zones[name_], _EVENT_READ)
-
- for name_ in not_replied_zones:
- if not_replied_zones[name_].notify_timeout <= time.time():
- self._zone_notify_handler(not_replied_zones[name_], _EVENT_TIMEOUT)
-
- self._is_shut_down.set()
+ self._started_event = threading.Event()
+ self._read_sock, self._write_sock = socket.socketpair()
+
+ # Start
+ self._thread = threading.Thread(target=self._dispatcher, args=())
+ if daemon:
+ self._thread.daemon = daemon
+ self._thread.start()
+
+ # Wait for it to get started
+ self._started_event.wait()
+ self._started_event = None
+
+ # Return it to anyone listening
+ return self._thread
def shutdown(self):
- '''Stop the dispatcher() loop. Blocks until the loop has finished. This
- must be called when dispatcher() is running in anther thread, or it
- will deadlock. '''
+ """
+ Stop the dispatcher() thread. Blocks until the thread stopped.
+ """
+
+ if not self._serving:
+ raise RuntimeError('Tried to stop while not running')
+
+ # Ask it to stop
self._serving = False
self._write_sock.send(b'shutdown') # make self._read_sock be readable.
- self._is_shut_down.wait()
+
+ # Wait for it
+ self._thread.join()
+
+ # Clean up
+ self._write_sock = None
+ self._read_sock = None
+ self._thread = None
def _get_rdata_data(self, rr):
return rr[7].strip()
@@ -220,56 +254,62 @@
return addr_list
def _prepare_select_info(self):
- '''Prepare the information for select(), returned
- value is one tuple
+ '''
+ Prepare the information for select(), returned
+ value is one tuple
(block_timeout, valid_socks, notifying_zones)
block_timeout: the timeout for select()
valid_socks: sockets list for waiting ready reading.
- notifying_zones: the zones which have been triggered
- for notify. '''
+ notifying_zones: the zones which have been triggered
+ for notify.
+ '''
valid_socks = []
notifying_zones = {}
- min_timeout = None
+ min_timeout = None
for info in self._notify_infos:
sock = self._notify_infos[info].get_socket()
if sock:
valid_socks.append(sock)
notifying_zones[info] = self._notify_infos[info]
tmp_timeout = self._notify_infos[info].notify_timeout
- if min_timeout:
+ if min_timeout is not None:
if tmp_timeout < min_timeout:
min_timeout = tmp_timeout
else:
min_timeout = tmp_timeout
-
- block_timeout = 0
- if min_timeout:
+
+ block_timeout = _IDLE_SLEEP_TIME
+ if min_timeout is not None:
block_timeout = min_timeout - time.time()
if block_timeout < 0:
block_timeout = 0
-
+
return (block_timeout, valid_socks, notifying_zones)
def _wait_for_notify_reply(self):
- '''receive notify replies in specified time. returned value
- is one tuple:(replied_zones, not_replied_zones). (None, None)
- will be returned when self._read_sock is readable, since user
- has called shutdown().
+ '''
+ Receive notify replies in specified time. returned value
+ is one tuple:(replied_zones, not_replied_zones). ({}, {}) is
+ returned if shutdown() was called.
+
replied_zones: the zones which receive notify reply.
not_replied_zones: the zones which haven't got notify reply.
'''
- (block_timeout, valid_socks, notifying_zones) = self._prepare_select_info()
- valid_socks.append(self._read_sock)
+ (block_timeout, valid_socks, notifying_zones) = \
+ self._prepare_select_info()
+ # This is None only during some tests
+ if self._read_sock is not None:
+ valid_socks.append(self._read_sock)
try:
r_fds, w, e = select.select(valid_socks, [], [], block_timeout)
except select.error as err:
if err.args[0] != EINTR:
return {}, {}
-
+
if self._read_sock in r_fds:
- return None, None # user has called shutdown()
-
+ return {}, {} # user has called shutdown()
+
not_replied_zones = {}
replied_zones = {}
for info in notifying_zones:
Modified: branches/trac335/src/lib/python/isc/notify/tests/notify_out_test.py
==============================================================================
--- branches/trac335/src/lib/python/isc/notify/tests/notify_out_test.py (original)
+++ branches/trac335/src/lib/python/isc/notify/tests/notify_out_test.py Wed Oct 13 11:19:16 2010
@@ -128,10 +128,11 @@
# Now make one socket be readable
self._notify._notify_infos[('cn.', 'IN')].notify_timeout = time.time() + 10
self._notify._notify_infos[('com.', 'IN')].notify_timeout = time.time() + 10
- self._notify._write_sock.send(b'shutdown')
+ self._notify._read_sock, self._notify._write_sock = socket.socketpair()
+ self._notify._write_sock.send(b'shutdown')
replied_zones, timeout_zones = self._notify._wait_for_notify_reply()
- self.assertIsNone(replied_zones)
- self.assertIsNone(timeout_zones)
+ self.assertEqual(0, len(replied_zones))
+ self.assertEqual(0, len(timeout_zones))
def test_notify_next_target(self):
self._notify.send_notify('cn.')
@@ -268,7 +269,7 @@
def test_prepare_select_info(self):
timeout, valid_fds, notifying_zones = self._notify._prepare_select_info()
- self.assertEqual(0, timeout)
+ self.assertEqual(notify_out._IDLE_SLEEP_TIME, timeout)
self.assertListEqual([], valid_fds)
self._notify._notify_infos[('cn.', 'IN')]._sock = 1
@@ -290,12 +291,10 @@
self.assertListEqual([2, 1], valid_fds)
def test_shutdown(self):
- import threading
- td = threading.Thread(target=self._notify.dispatcher)
- td.start()
- self.assertTrue(td.is_alive())
+ thread = self._notify.dispatcher()
+ self.assertTrue(thread.is_alive())
self._notify.shutdown()
- self.assertFalse(td.is_alive())
+ self.assertFalse(thread.is_alive())
if __name__== "__main__":
unittest.main()
More information about the bind10-changes
mailing list