[svn] commit: r2908 - in /branches/trac216/src/bin/xfrin: tests/xfrin_test.py xfrin.py.in
BIND 10 source code commits
bind10-changes at lists.isc.org
Tue Sep 14 05:56:22 UTC 2010
Author: shentingting
Date: Tue Sep 14 05:56:21 2010
New Revision: 2908
Log:
delete shutdown_flag, add handle_config test, change variable name
Modified:
branches/trac216/src/bin/xfrin/tests/xfrin_test.py
branches/trac216/src/bin/xfrin/xfrin.py.in
Modified: branches/trac216/src/bin/xfrin/tests/xfrin_test.py
==============================================================================
--- branches/trac216/src/bin/xfrin/tests/xfrin_test.py (original)
+++ branches/trac216/src/bin/xfrin/tests/xfrin_test.py Tue Sep 14 05:56:21 2010
@@ -77,7 +77,6 @@
def _cc_setup(self):
isc.config.ModuleCCSession = MockModuleCCSession
super()._cc_setup()
- #self._max_transfers_in = 10
def _cc_check_command(self):
self._shutdown_flag = 1
@@ -108,9 +107,9 @@
pass
class MockXfrinConnection(XfrinConnection):
- def __init__(self, conn_socket, zone_name, rrclass, db_file, shutdown_flag,
+ def __init__(self, conn_socket, zone_name, rrclass, db_file,
master_addr):
- super().__init__(conn_socket, zone_name, rrclass, db_file, shutdown_flag,
+ super().__init__(conn_socket, zone_name, rrclass, db_file,
master_addr)
self.query_data = b''
self.reply_data = b''
@@ -199,7 +198,7 @@
os.remove(TEST_DB_FILE)
self.conn = MockXfrinConnection(self.conn_sockets[1], 'example.com.',
TEST_RRCLASS, TEST_DB_FILE,
- 0, TEST_MASTER_IPV4_ADDRINFO)
+ TEST_MASTER_IPV4_ADDRINFO)
# replace the XFR socket with our local mock
self.conn._socket = self.mock_xfrsockets[1]
self.axfr_after_soa = False
@@ -221,7 +220,6 @@
os.remove(TEST_DB_FILE)
def test_connect(self):
- #self.assertEqual(, "")
self.assertRaises(Exception, self.conn.connect,
(TEST_MASTER_IPV4_ADDRESS,53))
@@ -275,7 +273,6 @@
self.conn_sockets[0].send(b"shutdown")
self.assertRaises(XfrinException, super(MockXfrinConnection,
self.conn)._select)
-
def test_init_ip6(self):
# This test simply creates a new XfrinConnection object with an
# IPv6 address, tries to bind it to an IPv6 wildcard address/port
@@ -283,13 +280,13 @@
# tends to assume it's IPv4 only and hardcode AF_INET. This test
# uncovers such a bug.
c = MockXfrinConnection({}, 'example.com.', TEST_RRCLASS, TEST_DB_FILE,
- 0, TEST_MASTER_IPV6_ADDRINFO)
+ TEST_MASTER_IPV6_ADDRINFO)
c._socket.bind(('::', 0))
c.close()
def test_init_chclass(self):
c = XfrinConnection({}, 'example.com.', RRClass.CH(), TEST_DB_FILE,
- 0, TEST_MASTER_IPV4_ADDRINFO)
+ TEST_MASTER_IPV4_ADDRINFO)
axfrmsg = c._create_query(RRType.AXFR())
self.assertEqual(axfrmsg.get_question()[0].get_class(),
RRClass.CH())
@@ -362,12 +359,6 @@
self.soa_response_params['rcode'] = Rcode.SERVFAIL()
self.conn.response_generator = self._create_soa_response_data
self.assertRaises(XfrinException, self.conn._check_soa_serial)
-
- def test_response_shutdown(self):
- self.conn.response_generator = self._create_normal_response_data
- self.conn._shutdown_flag = 1
- self.conn._send_query(RRType.AXFR())
- self.assertRaises(XfrinException, self._handle_xfrin_response)
def test_response_timeout(self):
self.conn.response_generator = self._create_normal_response_data
@@ -531,6 +522,15 @@
self.args['port'] = 'http'
self.assertRaises(XfrinException, self._do_parse)
+
+ def test_config_handler_noupdate(self):
+ old_value = self.xfr._max_transfers_in
+ self.xfr.config_handler({})
+ self.assertEqual(old_value, self.xfr._max_transfers_in)
+
+ def test_config_handler(self):
+ self.xfr.config_handler({"transfers_in":5})
+ self.assertEqual(5, self.xfr._max_transfers_in)
def test_command_handler_shutdown(self):
self.assertEqual(self.xfr.command_handler("shutdown",
@@ -550,18 +550,18 @@
def test_command_handler_retransfer_quota(self):
for i in range(self.xfr._max_transfers_in - 1):
- self.xfr._threads_zones[str(i) + TEST_ZONE_NAME] = MockThread()
+ self.xfr._zones_to_threads[str(i) + TEST_ZONE_NAME] = MockThread()
# there can be one more outstanding transfer.
self.assertEqual(self.xfr.command_handler("retransfer",
self.args)['result'][0], 0)
# make sure the # xfrs would excceed the quota
- self.xfr._threads_zones[str(self.xfr._max_transfers_in) + TEST_ZONE_NAME] = MockThread()
+ self.xfr._zones_to_threads[str(self.xfr._max_transfers_in) + TEST_ZONE_NAME] = MockThread()
# this one should fail
self.assertEqual(self.xfr.command_handler("retransfer",
self.args)['result'][0], 1)
def test_command_handler_retransfer_inprogress(self):
- self.xfr._threads_zones[TEST_ZONE_NAME] = MockThread()
+ self.xfr._zones_to_threads[TEST_ZONE_NAME] = MockThread()
self.assertEqual(self.xfr.command_handler("retransfer",
self.args)['result'][0], 1)
Modified: branches/trac216/src/bin/xfrin/xfrin.py.in
==============================================================================
--- branches/trac216/src/bin/xfrin/xfrin.py.in (original)
+++ branches/trac216/src/bin/xfrin/xfrin.py.in Tue Sep 14 05:56:21 2010
@@ -71,30 +71,30 @@
'''Do xfrin in this class. '''
def __init__(self,
- conn_socket, zone_name, rrclass, db_file, shutdown_flag,
+ conn_socket, zone_name, rrclass, db_file,
master_addrinfo, verbose = False, idle_timeout = 60):
''' idle_timeout: max idle time for read data from socket.
db_file: specify the data source file.
check_soa: when it's true, check soa first before sending xfr query
'''
self._socket = socket.socket(master_addrinfo[0], master_addrinfo[1])
- self._socket.setblocking(1)
self._conn_socket = conn_socket
self._zone_name = zone_name
self._rrclass = rrclass
self._db_file = db_file
self._soa_rr_count = 0
self._idle_timeout = idle_timeout
- self._shutdown_flag = shutdown_flag
self._verbose = verbose
self._master_address = master_addrinfo[4]
def connect(self, address):
- err = self._socket.connect_ex(address)
- if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
- return
- if err not in (0, EISCONN):
- raise socket.error(err, errorcode[err])
+ try:
+ self._socket.connect(address)
+ except socket.error as why:
+ if why.args[0] in (EINPROGRESS, EALREADY, EWOULDBLOCK):
+ return
+ if why.args[0] not in (0, EISCONN):
+ raise
def send(self, data):
try:
@@ -192,7 +192,11 @@
else:
continue
if self._conn_socket in rlist:
- raise XfrinException("shutdown xfrin!")
+ data = self._conn_socket.recv(len(b'shutdown'))
+ if data == b'shutdown':
+ raise XfrinException("shutdown xfrin!")
+ else:
+ return
return len(rlist)
def _get_request_response(self, size):
@@ -349,9 +353,6 @@
if self._soa_rr_count == 2:
break
-
- if self._shutdown_flag:
- raise XfrinException('xfrin is forced to stop')
def log_info(self, msg, type='info'):
# Overwrite the log function, log nothing
@@ -363,9 +364,9 @@
def process_xfrin(zone_name, rrclass, db_file,
- shutdown_flag, master_addrinfo, check_soa, conn_socket, verbose):
+ master_addrinfo, check_soa, conn_socket, verbose):
conn = XfrinConnection(conn_socket, zone_name, rrclass, db_file,
- shutdown_flag, master_addrinfo, verbose)
+ master_addrinfo, verbose)
if conn.connect_to_master():
conn.do_xfrin(check_soa)
@@ -375,12 +376,8 @@
self._cc_setup()
self._shutdown_flag = 0
self._verbose = verbose
-
- #the item in self._threads_zones: zone name and xfr communication thread.
- #The item in self._conn_sockets: a socket and xfr communication thread, the main thread uses
- #the socket to communicate with this xfr communication thread.
- self._threads_zones = {}
- self._conn_sockets = {}
+ self._zones_to_threads = {}
+ self._conn_sockets_to_threads = {}
def _cc_setup(self):
'''
@@ -411,9 +408,9 @@
''' shutdown the xfrin process. the thread which is doing xfrin will be
terminated.
'''
- self._filter_hash(self._conn_sockets)
- for fd in self._conn_sockets.keys():
- fd.send(b"shutdown")
+ self._filter_hash(self._conn_sockets_to_threads)
+ for socket in self._conn_sockets_to_threads.keys():
+ socket.send(b"shutdown")
self._shutdown_flag = 1
main_thread = threading.currentThread()
@@ -481,12 +478,8 @@
self._cc_check_command()
def _filter_hash(self, hash):
- '''delete zone_name in self._threads_zones or a socket in self._conn_sockets.'''
- keys = []
- for key in hash.keys():
- keys.append(key)
-
- for key in keys:
+ '''delete zone_name in self._zones_to_threads or a socket in self._conn_sockets_to_threads.'''
+ for key in [k for k in hash.keys()]:
if not (hash[key]).is_alive():
del hash[key]
@@ -496,29 +489,28 @@
return (1, "xfrin failed, can't load dns message python library: 'libdns_python'")
# check max_transfer_in, else return quota error
- if len(self._threads_zones) >= self._max_transfers_in:
- self._filter_hash(self._threads_zones)
- self._filter_hash(self._conn_sockets)
- if len(self._threads_zones) >= self._max_transfers_in:
+ if len(self._zones_to_threads) >= self._max_transfers_in:
+ self._filter_hash(self._zones_to_threads)
+ self._filter_hash(self._conn_sockets_to_threads)
+ if len(self._zones_to_threads) >= self._max_transfers_in:
return (1, 'xfrin quota error')
# check whether the zone xfrin is in progress.
- if zone_name in self._threads_zones.keys():
- if not (self._threads_zones[zone_name]).is_alive():
- del self._threads_zones[zone_name]
+ if zone_name in self._zones_to_threads.keys():
+ if not (self._zones_to_threads[zone_name]).is_alive():
+ del self._zones_to_threads[zone_name]
else:
return (1, 'zone xfrin is in progress')
conn_socket = socket.socketpair()
xfrin_thread = threading.Thread(target = process_xfrin,
args = (zone_name, rrclass,
db_file,
- self._shutdown_flag,
master_addrinfo, check_soa, conn_socket[1],
self._verbose))
# recored the zone name which zone xfrin is in process
- self._threads_zones[zone_name] = xfrin_thread
- self._conn_sockets[conn_socket[0]] = xfrin_thread
+ self._zones_to_threads[zone_name] = xfrin_thread
+ self._conn_sockets_to_threads[conn_socket[0]] = xfrin_thread
xfrin_thread.start()
return (0, 'zone xfrin is started')
More information about the bind10-changes
mailing list