[svn] commit: r2908 - in /branches/trac216/src/bin/xfrin: tests/xfrin_test.py xfrin.py.in

BIND 10 source code commits bind10-changes at lists.isc.org
Tue Sep 14 05:56:22 UTC 2010


Author: shentingting
Date: Tue Sep 14 05:56:21 2010
New Revision: 2908

Log:
delete shutdown_flag, add handle_config test, change variable name

Modified:
    branches/trac216/src/bin/xfrin/tests/xfrin_test.py
    branches/trac216/src/bin/xfrin/xfrin.py.in

Modified: branches/trac216/src/bin/xfrin/tests/xfrin_test.py
==============================================================================
--- branches/trac216/src/bin/xfrin/tests/xfrin_test.py (original)
+++ branches/trac216/src/bin/xfrin/tests/xfrin_test.py Tue Sep 14 05:56:21 2010
@@ -77,7 +77,6 @@
     def _cc_setup(self):
         isc.config.ModuleCCSession = MockModuleCCSession
         super()._cc_setup()
-        #self._max_transfers_in = 10
     
     def _cc_check_command(self):
         self._shutdown_flag = 1
@@ -108,9 +107,9 @@
         pass
 
 class MockXfrinConnection(XfrinConnection):
-    def __init__(self, conn_socket, zone_name, rrclass, db_file, shutdown_flag,
+    def __init__(self, conn_socket, zone_name, rrclass, db_file, 
                  master_addr):
-        super().__init__(conn_socket, zone_name, rrclass, db_file, shutdown_flag,
+        super().__init__(conn_socket, zone_name, rrclass, db_file,
                          master_addr)
         self.query_data = b''
         self.reply_data = b''
@@ -199,7 +198,7 @@
             os.remove(TEST_DB_FILE)
         self.conn = MockXfrinConnection(self.conn_sockets[1], 'example.com.',
                                         TEST_RRCLASS, TEST_DB_FILE,
-                                        0, TEST_MASTER_IPV4_ADDRINFO)
+                                        TEST_MASTER_IPV4_ADDRINFO)
         # replace the XFR socket with our local mock
         self.conn._socket = self.mock_xfrsockets[1]
         self.axfr_after_soa = False
@@ -221,7 +220,6 @@
             os.remove(TEST_DB_FILE)
 
     def test_connect(self):
-        #self.assertEqual(, "")
         self.assertRaises(Exception, self.conn.connect,
                           (TEST_MASTER_IPV4_ADDRESS,53))
 
@@ -275,7 +273,6 @@
         self.conn_sockets[0].send(b"shutdown")
         self.assertRaises(XfrinException, super(MockXfrinConnection,
                                                 self.conn)._select)
-
     def test_init_ip6(self):
         # This test simply creates a new XfrinConnection object with an
         # IPv6 address, tries to bind it to an IPv6 wildcard address/port
@@ -283,13 +280,13 @@
         # tends to assume it's IPv4 only and hardcode AF_INET.  This test
         # uncovers such a bug.
         c = MockXfrinConnection({}, 'example.com.', TEST_RRCLASS, TEST_DB_FILE,
-                                0, TEST_MASTER_IPV6_ADDRINFO)
+                                 TEST_MASTER_IPV6_ADDRINFO)
         c._socket.bind(('::', 0))
         c.close()
 
     def test_init_chclass(self):
         c = XfrinConnection({}, 'example.com.', RRClass.CH(), TEST_DB_FILE,
-                            0, TEST_MASTER_IPV4_ADDRINFO)
+                             TEST_MASTER_IPV4_ADDRINFO)
         axfrmsg = c._create_query(RRType.AXFR())
         self.assertEqual(axfrmsg.get_question()[0].get_class(),
                          RRClass.CH())
@@ -362,12 +359,6 @@
         self.soa_response_params['rcode'] = Rcode.SERVFAIL()
         self.conn.response_generator = self._create_soa_response_data
         self.assertRaises(XfrinException, self.conn._check_soa_serial)
-
-    def test_response_shutdown(self):
-        self.conn.response_generator = self._create_normal_response_data
-        self.conn._shutdown_flag = 1
-        self.conn._send_query(RRType.AXFR())
-        self.assertRaises(XfrinException, self._handle_xfrin_response)
 
     def test_response_timeout(self):
         self.conn.response_generator = self._create_normal_response_data
@@ -531,6 +522,15 @@
 
         self.args['port'] = 'http'
         self.assertRaises(XfrinException, self._do_parse)
+     
+    def test_config_handler_noupdate(self):
+        old_value = self.xfr._max_transfers_in
+        self.xfr.config_handler({})
+        self.assertEqual(old_value, self.xfr._max_transfers_in)
+
+    def test_config_handler(self):
+        self.xfr.config_handler({"transfers_in":5})
+        self.assertEqual(5, self.xfr._max_transfers_in)
 
     def test_command_handler_shutdown(self):
         self.assertEqual(self.xfr.command_handler("shutdown",
@@ -550,18 +550,18 @@
 
     def test_command_handler_retransfer_quota(self):
         for i in range(self.xfr._max_transfers_in - 1):
-            self.xfr._threads_zones[str(i) + TEST_ZONE_NAME] = MockThread()
+            self.xfr._zones_to_threads[str(i) + TEST_ZONE_NAME] = MockThread()
         # there can be one more outstanding transfer.
         self.assertEqual(self.xfr.command_handler("retransfer",
                                                   self.args)['result'][0], 0)
         # make sure the # xfrs would excceed the quota
-        self.xfr._threads_zones[str(self.xfr._max_transfers_in) + TEST_ZONE_NAME] = MockThread()
+        self.xfr._zones_to_threads[str(self.xfr._max_transfers_in) + TEST_ZONE_NAME] = MockThread()
         # this one should fail
         self.assertEqual(self.xfr.command_handler("retransfer",
                                                   self.args)['result'][0], 1)
 
     def test_command_handler_retransfer_inprogress(self):
-        self.xfr._threads_zones[TEST_ZONE_NAME] = MockThread()
+        self.xfr._zones_to_threads[TEST_ZONE_NAME] = MockThread()
         self.assertEqual(self.xfr.command_handler("retransfer",
                                                   self.args)['result'][0], 1)
 

Modified: branches/trac216/src/bin/xfrin/xfrin.py.in
==============================================================================
--- branches/trac216/src/bin/xfrin/xfrin.py.in (original)
+++ branches/trac216/src/bin/xfrin/xfrin.py.in Tue Sep 14 05:56:21 2010
@@ -71,30 +71,30 @@
     '''Do xfrin in this class. '''    
 
     def __init__(self,
-                 conn_socket, zone_name, rrclass, db_file, shutdown_flag,
+                 conn_socket, zone_name, rrclass, db_file, 
                  master_addrinfo, verbose = False, idle_timeout = 60): 
         ''' idle_timeout: max idle time for read data from socket.
             db_file: specify the data source file.
             check_soa: when it's true, check soa first before sending xfr query
         '''
         self._socket = socket.socket(master_addrinfo[0], master_addrinfo[1])
-        self._socket.setblocking(1)
         self._conn_socket = conn_socket
         self._zone_name = zone_name
         self._rrclass = rrclass
         self._db_file = db_file
         self._soa_rr_count = 0
         self._idle_timeout = idle_timeout
-        self._shutdown_flag = shutdown_flag
         self._verbose = verbose
         self._master_address = master_addrinfo[4]
     
     def connect(self, address):
-        err = self._socket.connect_ex(address)
-        if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
-            return 
-        if err not in (0, EISCONN):
-            raise socket.error(err, errorcode[err])
+        try:
+            self._socket.connect(address)
+        except socket.error as why:
+            if why.args[0] in (EINPROGRESS, EALREADY, EWOULDBLOCK):
+                return 
+            if why.args[0] not in (0, EISCONN):
+                raise
 
     def send(self, data):
         try:
@@ -192,7 +192,11 @@
                 else:
                     continue
             if self._conn_socket in rlist:
-                raise XfrinException("shutdown xfrin!")
+                data = self._conn_socket.recv(len(b'shutdown'))
+                if data == b'shutdown':
+                    raise XfrinException("shutdown xfrin!")
+                else:
+                    return
             return len(rlist)
     
     def _get_request_response(self, size):
@@ -349,9 +353,6 @@
 
             if self._soa_rr_count == 2:
                 break
-            
-            if self._shutdown_flag:
-                raise XfrinException('xfrin is forced to stop')
 
     def log_info(self, msg, type='info'):
         # Overwrite the log function, log nothing
@@ -363,9 +364,9 @@
 
 
 def process_xfrin(zone_name, rrclass, db_file, 
-                  shutdown_flag, master_addrinfo, check_soa, conn_socket, verbose):
+                   master_addrinfo, check_soa, conn_socket, verbose):
     conn = XfrinConnection(conn_socket, zone_name, rrclass, db_file,
-                           shutdown_flag, master_addrinfo, verbose)
+                            master_addrinfo, verbose)
     if conn.connect_to_master():
         conn.do_xfrin(check_soa)
 
@@ -375,12 +376,8 @@
         self._cc_setup()
         self._shutdown_flag = 0
         self._verbose = verbose
-
-        #the item in self._threads_zones: zone name and xfr communication thread. 
-        #The item in self._conn_sockets: a socket and xfr communication thread, the main thread uses 
-        #the socket to communicate with this xfr communication thread.
-        self._threads_zones = {}
-        self._conn_sockets = {}
+        self._zones_to_threads = {}
+        self._conn_sockets_to_threads = {}
 
     def _cc_setup(self):
         '''
@@ -411,9 +408,9 @@
         ''' shutdown the xfrin process. the thread which is doing xfrin will be 
         terminated.
         '''
-        self._filter_hash(self._conn_sockets)
-        for fd in self._conn_sockets.keys():
-            fd.send(b"shutdown")
+        self._filter_hash(self._conn_sockets_to_threads)
+        for socket in self._conn_sockets_to_threads.keys():
+            socket.send(b"shutdown")
 
         self._shutdown_flag = 1
         main_thread = threading.currentThread()
@@ -481,12 +478,8 @@
             self._cc_check_command()
 
     def _filter_hash(self, hash):
-        '''delete zone_name in self._threads_zones or a socket in self._conn_sockets.'''
-        keys = []
-        for key in hash.keys():
-            keys.append(key)
-
-        for key in keys:
+        '''delete zone_name in self._zones_to_threads or a socket in self._conn_sockets_to_threads.'''
+        for key in [k for k in hash.keys()]:
             if not (hash[key]).is_alive():
                 del hash[key]
 
@@ -496,29 +489,28 @@
             return (1, "xfrin failed, can't load dns message python library: 'libdns_python'")
 
         # check max_transfer_in, else return quota error
-        if len(self._threads_zones) >= self._max_transfers_in:
-            self._filter_hash(self._threads_zones)
-            self._filter_hash(self._conn_sockets)
-            if len(self._threads_zones) >= self._max_transfers_in:
+        if len(self._zones_to_threads) >= self._max_transfers_in:
+            self._filter_hash(self._zones_to_threads)
+            self._filter_hash(self._conn_sockets_to_threads)
+            if len(self._zones_to_threads) >= self._max_transfers_in:
                 return (1, 'xfrin quota error')
 
         # check whether the zone xfrin is in progress.
-        if zone_name in self._threads_zones.keys():
-            if not (self._threads_zones[zone_name]).is_alive():
-                del self._threads_zones[zone_name]
+        if zone_name in self._zones_to_threads.keys():
+            if not (self._zones_to_threads[zone_name]).is_alive():
+                del self._zones_to_threads[zone_name]
             else:
                 return (1, 'zone xfrin is in progress')
         conn_socket = socket.socketpair()
         xfrin_thread = threading.Thread(target = process_xfrin,
                                         args = (zone_name, rrclass,
                                                 db_file,
-                                                self._shutdown_flag,
                                                 master_addrinfo, check_soa, conn_socket[1],
                                                 self._verbose))
 
         # recored the zone name which zone xfrin is in process
-        self._threads_zones[zone_name] = xfrin_thread
-        self._conn_sockets[conn_socket[0]] = xfrin_thread
+        self._zones_to_threads[zone_name] = xfrin_thread
+        self._conn_sockets_to_threads[conn_socket[0]] = xfrin_thread
         xfrin_thread.start()
         return (0, 'zone xfrin is started')
 




More information about the bind10-changes mailing list