[svn] commit: r1875 - in /trunk/src/bin/xfrin: ./ tests/xfrin_test.py xfrin.py.in
BIND 10 source code commits
bind10-changes at lists.isc.org
Thu May 20 18:10:45 UTC 2010
Author: jinmei
Date: Thu May 20 18:10:45 2010
New Revision: 1875
Log:
merge branches/trac179 (for trac #179).
make/test okay.
Modified:
trunk/src/bin/xfrin/ (props changed)
trunk/src/bin/xfrin/tests/xfrin_test.py
trunk/src/bin/xfrin/xfrin.py.in
Modified: trunk/src/bin/xfrin/tests/xfrin_test.py
==============================================================================
--- trunk/src/bin/xfrin/tests/xfrin_test.py (original)
+++ trunk/src/bin/xfrin/tests/xfrin_test.py Thu May 20 18:10:45 2010
@@ -19,93 +19,379 @@
import socket
from xfrin import *
-# An axfr response of the simple zone "example.com(without soa record at the end)."
-axfr_response1 = b'\x84\x00\x00\x01\x00\x06\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01\xc0\x0c\x00\x06\x00\x01\x00\x00\x0e\x10\x00$\x05dns01\xc0\x0c\x05admin\xc0\x0c\x00\x00\x04\xd2\x00\x00\x0e\x10\x00\x00\x07\x08\x00$\xea\x00\x00\x00\x1c \xc0\x0c\x00\x02\x00\x01\x00\x00\x0e\x10\x00\x02\xc0)\xc0)\x00\x01\x00\x01\x00\x00\x0e\x10\x00\x04\xc0\xa8\x02\x02\x04sql1\xc0\x0c\x00\x02\x00\x01\x00\x00\x0e\x10\x00\x02\xc0)\x04sql2\xc0\x0c\x00\x02\x00\x01\x00\x00\x0e\x10\x00\x02\xc0)\x03ns1\x07subzone\xc0\x0c\x00\x01\x00\x01\x00\x00\x0e\x10\x00\x04\xc0\xa8\x03\x01'
-
-# The second axfr response with only the end soa record.
-axfr_response2 = b'\x84\x00\x00\x00\x00\x01\x00\x00\x00\x00\x07example\x03com\x00\x00\x06\x00\x01\x00\x00\x0e\x10\x00$\x05dns01\xc0\x0c\x05admin\xc0\x0c\x00\x00\x04\xd2\x00\x00\x0e\x10\x00\x00\x07\x08\x00$\xea\x00\x00\x00\x1c '
-
-DB_FILE = 'db_file'
+#
+# Commonly used (mostly constant) test parameters
+#
+TEST_ZONE_NAME = "example.com"
+TEST_RRCLASS = rr_class.IN()
+TEST_DB_FILE = 'db_file'
+TEST_MASTER_IPV4_ADDRESS = '127.0.0.1'
+TEST_MASTER_IPV6_ADDRESS = '::1'
+# XXX: This should be a non priviledge port that is unlikely to be used.
+# If some other process uses this port test will fail.
+TEST_MASTER_PORT = '53535'
+
+soa_rdata = create_rdata(rr_type.SOA(), TEST_RRCLASS,
+ 'master.example.com. admin.example.com ' +
+ '1234 3600 1800 2419200 7200')
+soa_rrset = rrset(name(TEST_ZONE_NAME), TEST_RRCLASS, rr_type.SOA(),
+ rr_ttl(3600))
+soa_rrset.add_rdata(soa_rdata)
+example_question = question(name(TEST_ZONE_NAME), TEST_RRCLASS, rr_type.AXFR())
+default_questions = [example_question]
+default_answers = [soa_rrset]
+
+class XfrinTestException(Exception):
+ pass
+
# Rewrite the class for unittest.
-class MyXfrin(Xfrin):
- def __init__(self):
+class MockXfrin(Xfrin):
+ def _cc_setup(self):
pass
-class MyXfrinConnection(XfrinConnection):
- query_data = b''
- eply_data = b''
+class MockXfrinConnection(XfrinConnection):
+ def __init__(self, TEST_ZONE_NAME, db_file, shutdown_event, master_addr):
+ super().__init__(TEST_ZONE_NAME, db_file, shutdown_event, master_addr)
+ self.query_data = b''
+ self.reply_data = b''
+ self.force_time_out = False
+ self.force_close = False
+ self.qid = None
+ self.response_generator = None
def _handle_xfrin_response(self):
for rr in super()._handle_xfrin_response():
pass
- def _get_request_response(self, size):
- ret = self.reply_data[:size]
+ def _asyncore_loop(self):
+ if self.force_close:
+ self.handle_close()
+ elif not self.force_time_out:
+ self.handle_read()
+
+ def connect_to_master(self):
+ return True
+
+ def recv(self, size):
+ data = self.reply_data[:size]
self.reply_data = self.reply_data[size:]
- if (len(ret) < size):
- raise XfrinException('cannot get reply data')
- return ret
+ if len(data) < size:
+ raise XfrinTestException('cannot get reply data')
+ return data
def send(self, data):
self.query_data += data
+ # when the outgoing data is sufficiently large to contain the QID field
+ # (4 octets or more - 16-bit length field + 16-bit QID), extract the
+ # value so that we can construct a matching response.
+ if len(self.query_data) >= 4 and self.qid == None:
+ self.qid = socket.htons(struct.unpack('H', self.query_data[2:4])[0])
+ # if the response generator method is specified, invoke it now.
+ if self.response_generator != None:
+ self.response_generator()
return len(data)
- def create_response_data(self, data):
- reply_data = self.query_data[2:4] + data
- size = socket.htons(len(reply_data))
- reply_data = struct.pack('H', size) + reply_data
+ def create_response_data(self, response = True, bad_qid = False,
+ rcode = rcode.NOERROR(),
+ questions = default_questions,
+ answers = default_answers):
+ resp = message(message_mode.RENDER)
+ qid = self.qid
+ if bad_qid:
+ qid += 1
+ resp.set_qid(qid)
+ resp.set_opcode(op_code.QUERY())
+ resp.set_rcode(rcode)
+ if response:
+ resp.set_header_flag(message_flag.QR())
+ [resp.add_question(q) for q in questions]
+ [resp.add_rrset(section.ANSWER(), a) for a in answers]
+
+ obuf = output_buffer(0)
+ renderer = message_render(obuf)
+ resp.to_wire(renderer)
+ reply_data = struct.pack('H', socket.htons(obuf.get_length()))
+ reply_data += obuf.get_data()
+
return reply_data
-
class TestXfrinConnection(unittest.TestCase):
def setUp(self):
- self.conn = MyXfrinConnection('example.com.', DB_FILE, threading.Event(), '1.1.1.1')
+ if os.path.exists(TEST_DB_FILE):
+ os.remove(TEST_DB_FILE)
+ self.conn = MockXfrinConnection('example.com.', TEST_DB_FILE,
+ threading.Event(),
+ TEST_MASTER_IPV4_ADDRESS)
+
+ def tearDown(self):
+ self.conn.close()
+ if os.path.exists(TEST_DB_FILE):
+ os.remove(TEST_DB_FILE)
+
+ def test_init_ip6(self):
+ # This test simply creates a new XfrinConnection object with an
+ # IPv6 address, tries to bind it to an IPv6 wildcard address/port
+ # to confirm an AF_INET6 socket has been created. A naive application
+ # tends to assume it's IPv4 only and hardcode AF_INET. This test
+ # uncovers such a bug.
+ c = MockXfrinConnection('example.com.', TEST_DB_FILE,
+ threading.Event(),
+ TEST_MASTER_IPV6_ADDRESS)
+ #This test currently fails. Fix the code, then enable it
+ #c.bind(('::', 0))
+ c.close()
def test_response_with_invalid_msg(self):
- self.conn.data_exchange = b'aaaxxxx'
+ self.conn.reply_data = b'aaaxxxx'
+ self.assertRaises(XfrinTestException, self.conn._handle_xfrin_response)
+
+ def test_response_without_end_soa(self):
+ self.conn._send_query(rr_type.AXFR())
+ self.conn.reply_data = self.conn.create_response_data()
+ self.assertRaises(XfrinTestException, self.conn._handle_xfrin_response)
+
+ def test_response_bad_qid(self):
+ self.conn._send_query(rr_type.AXFR())
+ self.conn.reply_data = self.conn.create_response_data(bad_qid = True)
+ self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+ def test_response_non_response(self):
+ self.conn._send_query(rr_type.AXFR())
+ self.conn.reply_data = self.conn.create_response_data(response = False)
+ self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+ def test_response_error_code(self):
+ self.conn._send_query(rr_type.AXFR())
+ self.conn.reply_data = self.conn.create_response_data(
+ rcode = rcode.SERVFAIL())
+ self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+ def test_response_multi_question(self):
+ self.conn._send_query(rr_type.AXFR())
+ self.conn.reply_data = self.conn.create_response_data(
+ questions=[example_question, example_question])
+ self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+ def test_response_empty_answer(self):
+ self.conn._send_query(rr_type.AXFR())
+ self.conn.reply_data = self.conn.create_response_data(answers=[])
+ # Should an empty answer trigger an exception? Even though it's very
+ # unusual it's not necessarily invalid. Need to revisit.
+ self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+ def test_response_shutdown(self):
+ self.conn.response_generator = self._create_normal_response_data
+ self.conn._shutdown_event.set()
+ self.conn._send_query(rr_type.AXFR())
+ self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+ def test_response_timeout(self):
+ self.conn.response_generator = self._create_normal_response_data
+ self.conn.force_time_out = True
+ self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+ def test_response_remote_close(self):
+ self.conn.response_generator = self._create_normal_response_data
+ self.conn.force_close = True
+ self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+ def test_response_bad_message(self):
+ self.conn.response_generator = self._create_broken_response_data
+ self.conn._send_query(rr_type.AXFR())
self.assertRaises(Exception, self.conn._handle_xfrin_response)
- def test_response_without_end_soa(self):
- self.conn._send_query(rr_type.AXFR())
- self.conn.reply_data = self.conn.create_response_data(axfr_response1)
- self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
-
def test_response(self):
- self.conn._send_query(rr_type.AXFR())
- self.conn.reply_data = self.conn.create_response_data(axfr_response1)
- self.conn.reply_data += self.conn.create_response_data(axfr_response2)
+ # normal case. should silently succeed.
+ self.conn.response_generator = self._create_normal_response_data
+ self.conn._send_query(rr_type.AXFR())
self.conn._handle_xfrin_response()
+ def test_do_xfrin(self):
+ self.conn.response_generator = self._create_normal_response_data
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
+
+ def test_do_xfrin_empty_response(self):
+ # skipping the creation of response data, so the transfer will fail.
+ # (but do_xfrin() always return OK.)
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
+
+ def test_do_xfrin_empty_response(self):
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
+
+ def test_do_xfrin_bad_response(self):
+ self.conn.response_generator = self._create_broken_response_data
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
+
+ def test_do_xfrin_dberror(self):
+ # DB file is under a non existent directory, so its creation will fail,
+ # which will make the transfer fail.
+ self.conn._db_file = "not_existent/" + TEST_DB_FILE
+ self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
+
+# This test currently doesn't work due to bug. Fix it and then enable the test.
+# def test_do_xfrin_with_soacheck(self):
+# self.conn.response_generator = self._create_normal_response_data
+# self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
+
+# def test_do_xfrin_with_soacheck_bad_response(self):
+# self.conn.response_generator = self._create_broken_response_data
+# self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
+
+ def _create_normal_response_data(self):
+ # This helper method creates a simple sequence of DNS messages that
+ # forms a valid XFR transaction. It consists of two messages, each
+ # containing just a single SOA RR.
+ self.conn.reply_data = self.conn.create_response_data()
+ self.conn.reply_data += self.conn.create_response_data()
+
+ def _create_broken_response_data(self):
+ # This helper method creates a bogus "DNS message" that only contains
+ # 4 octets of data. The DNS message parser will raise an exception.
+ bogus_data = b'xxxx'
+ self.conn.reply_data = struct.pack('H', socket.htons(len(bogus_data)))
+ self.conn.reply_data += bogus_data
+
+class TestXfrinRecorder(unittest.TestCase):
+ def setUp(self):
+ self.recorder = XfrinRecorder()
+
+ def test_increment(self):
+ self.assertEqual(self.recorder.count(), 0)
+ self.recorder.increment(TEST_ZONE_NAME)
+ self.assertEqual(self.recorder.count(), 1)
+ # duplicate "increment" should probably be rejected. but it's not
+ # checked at this moment
+ self.recorder.increment(TEST_ZONE_NAME)
+ self.assertEqual(self.recorder.count(), 2)
+
+ def test_decrement(self):
+ self.assertEqual(self.recorder.count(), 0)
+ self.recorder.increment(TEST_ZONE_NAME)
+ self.assertEqual(self.recorder.count(), 1)
+ self.recorder.decrement(TEST_ZONE_NAME)
+ self.assertEqual(self.recorder.count(), 0)
+
+ def test_decrement_from_empty(self):
+ self.assertEqual(self.recorder.count(), 0)
+ self.recorder.decrement(TEST_ZONE_NAME)
+ self.assertEqual(self.recorder.count(), 0)
+
+ def test_inprogress(self):
+ self.assertEqual(self.recorder.count(), 0)
+ self.recorder.increment(TEST_ZONE_NAME)
+ self.assertEqual(self.recorder.xfrin_in_progress(TEST_ZONE_NAME), True)
+ self.recorder.decrement(TEST_ZONE_NAME)
+ self.assertEqual(self.recorder.xfrin_in_progress(TEST_ZONE_NAME), False)
+
class TestXfrin(unittest.TestCase):
+ args = {}
+
+ def setUp(self):
+ self.xfr = MockXfrin()
+ self.args['zone_name'] = TEST_ZONE_NAME
+ self.args['port'] = TEST_MASTER_PORT
+ self.args['master'] = TEST_MASTER_IPV4_ADDRESS
+ self.args['db_file'] = TEST_DB_FILE
+
+ def tearDown(self):
+ self.xfr.shutdown()
+
+ def _do_parse(self):
+ return self.xfr._parse_cmd_params(self.args)
+
def test_parse_cmd_params(self):
- xfr = MyXfrin()
- args = {}
- args['zone_name'] = 'sd.cn.'
- args['port'] = '12345'
- args['master'] = '218.241.108.122'
- args['db_file'] = '/home/tt'
-
- name, master, port, db_file = xfr._parse_cmd_params(args)
- self.assertEqual(port, 12345)
- self.assertEqual(name, 'sd.cn.')
- self.assertEqual(master, '218.241.108.122')
- self.assertEqual(db_file, '/home/tt')
-
- def test_parse_cmd_params_1(self):
- xfr = MyXfrin()
- args = {}
- args['port'] = '12345'
- args['master'] = '218.241.108.122'
- args['db_file'] = '/home/tt'
-
- self.assertRaises(XfrinException, xfr._parse_cmd_params, args)
- self.assertRaises(XfrinException, xfr._parse_cmd_params, {'zone_name':'ds.cn.', 'master':'3.3.3'})
- self.assertRaises(XfrinException, xfr._parse_cmd_params, {'zone_name':'ds.cn.'})
- self.assertRaises(XfrinException, xfr._parse_cmd_params, {'master':'ds.cn.'})
+ name, master, port, db_file = self._do_parse()
+ self.assertEqual(port, int(TEST_MASTER_PORT))
+ self.assertEqual(name, TEST_ZONE_NAME)
+ self.assertEqual(master, TEST_MASTER_IPV4_ADDRESS)
+ self.assertEqual(db_file, TEST_DB_FILE)
+
+ def test_parse_cmd_params_default_port(self):
+ del self.args['port']
+ self.assertEqual(self._do_parse()[2], 53)
+
+ def test_parse_cmd_params_ip6master(self):
+ self.args['master'] = TEST_MASTER_IPV6_ADDRESS
+ self.assertEqual(self._do_parse()[1], TEST_MASTER_IPV6_ADDRESS)
+
+ def test_parse_cmd_params_nozone(self):
+ # zone name is mandatory.
+ del self.args['zone_name']
+ self.assertRaises(XfrinException, self._do_parse)
+
+ def test_parse_cmd_params_nomaster(self):
+ # master address is mandatory.
+ del self.args['master']
+ self.assertRaises(XfrinException, self._do_parse)
+
+ def test_parse_cmd_params_bad_ip4(self):
+ self.args['master'] = '3.3.3'
+ self.assertRaises(XfrinException, self._do_parse)
+
+ def test_parse_cmd_params_bad_ip6(self):
+ self.args['master'] = '1::1::1'
+ self.assertRaises(XfrinException, self._do_parse)
+
+ def test_parse_cmd_params_bad_port(self):
+ self.args['port'] = '-1'
+ self.assertRaises(XfrinException, self._do_parse)
+
+ self.args['port'] = '65536'
+ self.assertRaises(XfrinException, self._do_parse)
+
+ def test_command_handler_shutdown(self):
+ self.assertEqual(self.xfr.command_handler("shutdown",
+ None)['result'][0], 0)
+ # shutdown command doesn't expect an argument, but accepts it if any.
+ self.assertEqual(self.xfr.command_handler("shutdown",
+ "unused")['result'][0], 0)
+
+ self.assertEqual(self.xfr.command_handler("Shutdown",
+ "unused")['result'][0], 0)
+
+ def test_command_handler_retransfer(self):
+ self.assertEqual(self.xfr.command_handler("retransfer",
+ self.args)['result'][0], 0)
+
+ def test_command_handler_retransfer_badcommand(self):
+ self.args['master'] = 'invalid'
+ self.assertEqual(self.xfr.command_handler("retransfer",
+ self.args)['result'][0], 1)
+
+ def test_command_handler_retransfer_quota(self):
+ for i in range(self.xfr._max_transfers_in - 1):
+ self.xfr.recorder.increment(str(i) + TEST_ZONE_NAME)
+ # there can be one more outstanding transfer.
+ self.assertEqual(self.xfr.command_handler("retransfer",
+ self.args)['result'][0], 0)
+ # make sure the # xfrs would excceed the quota
+ self.xfr.recorder.increment(str(self.xfr._max_transfers_in) + TEST_ZONE_NAME)
+ # this one should fail
+ self.assertEqual(self.xfr.command_handler("retransfer",
+ self.args)['result'][0], 1)
+
+ def test_command_handler_retransfer_inprogress(self):
+ self.xfr.recorder.increment(TEST_ZONE_NAME)
+ self.assertEqual(self.xfr.command_handler("retransfer",
+ self.args)['result'][0], 1)
+
+ def test_command_handler_retransfer_nomodule(self):
+ dns_module = sys.modules['bind10_dns'] # this must exist
+ del sys.modules['bind10_dns']
+ self.assertEqual(self.xfr.command_handler("retransfer",
+ self.args)['result'][0], 1)
+ # sys.modules is global, so we must recover it
+ sys.modules['bind10_dns'] = dns_module
+
+ def test_command_handler_refresh(self):
+ # at this level, refresh is no different than retransfer.
+ # just confirm the successful case with a different family of address.
+ self.args['master'] = TEST_MASTER_IPV6_ADDRESS
+ self.assertEqual(self.xfr.command_handler("refresh",
+ self.args)['result'][0], 0)
if __name__== "__main__":
try:
unittest.main()
- os.remove(DB_FILE)
except KeyboardInterrupt as e:
print(e)
Modified: trunk/src/bin/xfrin/xfrin.py.in
==============================================================================
--- trunk/src/bin/xfrin/xfrin.py.in (original)
+++ trunk/src/bin/xfrin/xfrin.py.in Thu May 20 18:10:45 2010
@@ -123,6 +123,14 @@
self._send_data(header_len)
self._send_data(obuf.get_data())
+
+ def _asyncore_loop(self):
+ '''
+This method is a trivial wrapper for asyncore.loop(). It's extracted from
+_get_request_response so that we can test the rest of the code without
+involving actual communication with a remote server.
+'''
+ asyncore.loop(self._idle_timeout, count = 1)
def _get_request_response(self, size):
recv_size = 0
@@ -130,7 +138,7 @@
while recv_size < size:
self._recv_time_out = True
self._need_recv_size = size - recv_size
- asyncore.loop(self._idle_timeout, count = 1)
+ self._asyncore_loop()
if self._recv_time_out:
raise XfrinException('receive data from socket time out.')
@@ -320,13 +328,22 @@
class Xfrin():
def __init__(self, verbose = False):
- self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
- self._cc.start()
+ self._cc_setup()
self._max_transfers_in = 10
self.recorder = XfrinRecorder()
self._shutdown_event = threading.Event()
self._verbose = verbose
+ def _cc_setup(self):
+ '''
+This method is used only as part of initialization, but is implemented
+separately for convenience of unit tests; by letting the test code override
+this method we can test most of this class without requiring a command channel.
+'''
+ self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
+ self.config_handler,
+ self.command_handler)
+ self._cc.start()
def config_handler(self, new_config):
# TODO, process new config data
@@ -415,7 +432,7 @@
self._shutdown_event,
master_addr,
port, check_soa, self._verbose))
-
+
xfrin_thread.start()
return (0, 'zone xfrin is started')
More information about the bind10-changes
mailing list