[svn] commit: r3200 - in /branches/trac335/src: bin/xfrout/xfrout.py.in lib/python/isc/notify/notify_out.py lib/python/isc/notify/tests/notify_out_test.py

BIND 10 source code commits bind10-changes at lists.isc.org
Wed Oct 13 11:19:16 UTC 2010


Author: vorner
Date: Wed Oct 13 11:19:16 2010
New Revision: 3200

Log:
Race conditions in xfrout

Similar changes as in previous commit, just in the notify_out module.

Modified:
    branches/trac335/src/bin/xfrout/xfrout.py.in
    branches/trac335/src/lib/python/isc/notify/notify_out.py
    branches/trac335/src/lib/python/isc/notify/tests/notify_out_test.py

Modified: branches/trac335/src/bin/xfrout/xfrout.py.in
==============================================================================
--- branches/trac335/src/bin/xfrout/xfrout.py.in (original)
+++ branches/trac335/src/bin/xfrout/xfrout.py.in Wed Oct 13 11:19:16 2010
@@ -426,8 +426,7 @@
     def _start_notifier(self):
         datasrc = self._unix_socket_server.get_db_file()
         self._notifier = notify_out.NotifyOut(datasrc, self._log)
-        td = threading.Thread(target=self._notifier.dispatcher)
-        td.start()
+        self._notifier.dispatcher()
 
     def send_notify(self, zone_name, zone_class):
         self._notifier.send_notify(zone_name, zone_class)
@@ -440,7 +439,7 @@
                 answer = create_answer(1, "Unknown config data: " + str(key))
                 continue
             self._config_data[key] = new_config[key]
-        
+
         if self._log:
             self._log.update_config(new_config)
 
@@ -462,19 +461,12 @@
         if self._unix_socket_server:
             self._unix_socket_server.shutdown()
 
-        main_thread = threading.currentThread()
-        # close the thread which's doing zone transfer.
-        for th in threading.enumerate():
-            if th is main_thread:
-                continue
-            th.join()
-
     def command_handler(self, cmd, args):
         if cmd == "shutdown":
             self._log.log_message("info", "Received shutdown command.")
             self.shutdown()
             answer = create_answer(0)
-        
+
         elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
             zone_name = args.get('zone_name')
             zone_class = args.get('zone_class')
@@ -489,7 +481,7 @@
         else: 
             answer = create_answer(1, "Unknown command:" + str(cmd))
 
-        return answer    
+        return answer
 
     def run(self):
         '''Get and process all commands sent from cfgmgr or other modules. '''

Modified: branches/trac335/src/lib/python/isc/notify/notify_out.py
==============================================================================
--- branches/trac335/src/lib/python/isc/notify/notify_out.py (original)
+++ branches/trac335/src/lib/python/isc/notify/notify_out.py Wed Oct 13 11:19:16 2010
@@ -105,8 +105,7 @@
         self._notifying_zones = []
         self._log = log
         self._serving = False
-        self._is_shut_down = threading.Event()
-        self._read_sock, self._write_sock = socket.socketpair()
+        self._read_sock = None
         self.notify_num = 0  # the count of in progress notifies
         self._verbose = verbose
         self._lock = threading.Lock()
@@ -149,41 +148,76 @@
                 self.notify_num += 1 
                 self._notifying_zones.append(zone_id)
 
-    def dispatcher(self):
-        '''The loop function for handling notify related events.
+    def _dispatcher(self):
+        while self._serving:
+            # Let the master know we are alive already
+            if self._started_event:
+                self._started_event.set()
+
+            replied_zones, not_replied_zones = self._wait_for_notify_reply()
+
+            for name_ in replied_zones:
+                self._zone_notify_handler(replied_zones[name_], _EVENT_READ)
+
+            for name_ in not_replied_zones:
+                if not_replied_zones[name_].notify_timeout <= time.time():
+                    self._zone_notify_handler(not_replied_zones[name_], _EVENT_TIMEOUT)
+
+    def dispatcher(self, daemon=False):
+        """
+        Spawns a thread that will handle notify related events.
+
         If one zone get the notify reply before timeout, call the
         handle to process the reply. If one zone can't get the notify
         before timeout, call the handler to resend notify or notify 
         next slave.  
-           The loop can be stoped by calling shutdown() in another 
-        thread. '''
+
+        The thread can be stopped by calling shutdown().
+
+        Returns the thread object to anyone interested.
+        """
+
+        if self._serving:
+            raise RuntimeError(
+                'Dispatcher already running, tried to start twice')
+
+        # Prepare for launch
         self._serving = True
-        self._is_shut_down.clear()
-        while self._serving:
-            replied_zones, not_replied_zones = self._wait_for_notify_reply()
-            if replied_zones is None:
-                break
-
-            if len(replied_zones) == 0 and len(not_replied_zones) == 0:
-                time.sleep(_IDLE_SLEEP_TIME) #TODO set a better time for idle sleep
-                continue
-
-            for name_ in replied_zones:
-                self._zone_notify_handler(replied_zones[name_], _EVENT_READ)
-
-            for name_ in not_replied_zones:
-                if not_replied_zones[name_].notify_timeout <= time.time():
-                    self._zone_notify_handler(not_replied_zones[name_], _EVENT_TIMEOUT)
-
-        self._is_shut_down.set()
+        self._started_event = threading.Event()
+        self._read_sock, self._write_sock = socket.socketpair()
+
+        # Start
+        self._thread = threading.Thread(target=self._dispatcher, args=())
+        if daemon:
+            self._thread.daemon = daemon
+        self._thread.start()
+
+        # Wait for it to get started
+        self._started_event.wait()
+        self._started_event = None
+
+        # Return it to anyone listening
+        return self._thread
 
     def shutdown(self):
-        '''Stop the dispatcher() loop. Blocks until the loop has finished. This
-        must be called when dispatcher() is running in anther thread, or it
-        will deadlock.  '''
+        """
+        Stop the dispatcher() thread. Blocks until the thread stopped.
+        """
+
+        if not self._serving:
+            raise RuntimeError('Tried to stop while not running')
+
+        # Ask it to stop
         self._serving = False
         self._write_sock.send(b'shutdown') # make self._read_sock be readable.
-        self._is_shut_down.wait()
+
+        # Wait for it
+        self._thread.join()
+
+        # Clean up
+        self._write_sock = None
+        self._read_sock = None
+        self._thread = None
 
     def _get_rdata_data(self, rr):
         return rr[7].strip()
@@ -220,56 +254,62 @@
         return addr_list
 
     def _prepare_select_info(self):
-        '''Prepare the information for select(), returned 
-        value is one tuple 
+        '''
+        Prepare the information for select(), returned
+        value is one tuple
         (block_timeout, valid_socks, notifying_zones)
         block_timeout: the timeout for select()
         valid_socks: sockets list for waiting ready reading.
-        notifying_zones: the zones which have been triggered 
-                        for notify. '''
+        notifying_zones: the zones which have been triggered
+                        for notify.
+        '''
         valid_socks = []
         notifying_zones = {}
-        min_timeout = None 
+        min_timeout = None
         for info in self._notify_infos:
             sock = self._notify_infos[info].get_socket()
             if sock:
                 valid_socks.append(sock)
                 notifying_zones[info] = self._notify_infos[info]
                 tmp_timeout = self._notify_infos[info].notify_timeout
-                if min_timeout:
+                if min_timeout is not None:
                     if tmp_timeout < min_timeout:
                         min_timeout = tmp_timeout
                 else:
                     min_timeout = tmp_timeout
-       
-        block_timeout = 0
-        if min_timeout:
+
+        block_timeout = _IDLE_SLEEP_TIME
+        if min_timeout is not None:
             block_timeout = min_timeout - time.time()
             if block_timeout < 0:
                 block_timeout = 0
-        
+
         return (block_timeout, valid_socks, notifying_zones)
 
     def _wait_for_notify_reply(self):
-        '''receive notify replies in specified time. returned value 
-        is one tuple:(replied_zones, not_replied_zones). (None, None)
-        will be returned when self._read_sock is readable, since user
-        has called shutdown().
+        '''
+        Receive notify replies in specified time. returned value
+        is one tuple:(replied_zones, not_replied_zones). ({}, {}) is
+        returned if shutdown() was called.
+
         replied_zones: the zones which receive notify reply.
         not_replied_zones: the zones which haven't got notify reply.
 
         '''
-        (block_timeout, valid_socks, notifying_zones) = self._prepare_select_info()
-        valid_socks.append(self._read_sock)
+        (block_timeout, valid_socks, notifying_zones) = \
+            self._prepare_select_info()
+        # This is None only during some tests
+        if self._read_sock is not None:
+            valid_socks.append(self._read_sock)
         try:
             r_fds, w, e = select.select(valid_socks, [], [], block_timeout)
         except select.error as err:
             if err.args[0] != EINTR:
                 return {}, {}
-        
+
         if self._read_sock in r_fds:
-            return None, None # user has called shutdown()
-        
+            return {}, {} # user has called shutdown()
+
         not_replied_zones = {}
         replied_zones = {}
         for info in notifying_zones:

Modified: branches/trac335/src/lib/python/isc/notify/tests/notify_out_test.py
==============================================================================
--- branches/trac335/src/lib/python/isc/notify/tests/notify_out_test.py (original)
+++ branches/trac335/src/lib/python/isc/notify/tests/notify_out_test.py Wed Oct 13 11:19:16 2010
@@ -128,10 +128,11 @@
         # Now make one socket be readable
         self._notify._notify_infos[('cn.', 'IN')].notify_timeout = time.time() + 10
         self._notify._notify_infos[('com.', 'IN')].notify_timeout = time.time() + 10
-        self._notify._write_sock.send(b'shutdown')    
+        self._notify._read_sock, self._notify._write_sock = socket.socketpair()
+        self._notify._write_sock.send(b'shutdown')
         replied_zones, timeout_zones = self._notify._wait_for_notify_reply()
-        self.assertIsNone(replied_zones) 
-        self.assertIsNone(timeout_zones) 
+        self.assertEqual(0, len(replied_zones))
+        self.assertEqual(0, len(timeout_zones))
 
     def test_notify_next_target(self):
         self._notify.send_notify('cn.')
@@ -268,7 +269,7 @@
         
     def test_prepare_select_info(self):
         timeout, valid_fds, notifying_zones = self._notify._prepare_select_info()
-        self.assertEqual(0, timeout)
+        self.assertEqual(notify_out._IDLE_SLEEP_TIME, timeout)
         self.assertListEqual([], valid_fds)
 
         self._notify._notify_infos[('cn.', 'IN')]._sock = 1
@@ -290,12 +291,10 @@
         self.assertListEqual([2, 1], valid_fds)
 
     def test_shutdown(self):
-        import threading
-        td = threading.Thread(target=self._notify.dispatcher)
-        td.start()
-        self.assertTrue(td.is_alive())
+        thread = self._notify.dispatcher()
+        self.assertTrue(thread.is_alive())
         self._notify.shutdown()
-        self.assertFalse(td.is_alive())
+        self.assertFalse(thread.is_alive())
 
 if __name__== "__main__":
     unittest.main()




More information about the bind10-changes mailing list