[svn] commit: r1875 - in /trunk/src/bin/xfrin: ./ tests/xfrin_test.py xfrin.py.in

BIND 10 source code commits bind10-changes at lists.isc.org
Thu May 20 18:10:45 UTC 2010


Author: jinmei
Date: Thu May 20 18:10:45 2010
New Revision: 1875

Log:
merge branches/trac179 (for trac #179).
make/test okay.

Modified:
    trunk/src/bin/xfrin/   (props changed)
    trunk/src/bin/xfrin/tests/xfrin_test.py
    trunk/src/bin/xfrin/xfrin.py.in

Modified: trunk/src/bin/xfrin/tests/xfrin_test.py
==============================================================================
--- trunk/src/bin/xfrin/tests/xfrin_test.py (original)
+++ trunk/src/bin/xfrin/tests/xfrin_test.py Thu May 20 18:10:45 2010
@@ -19,93 +19,379 @@
 import socket
 from xfrin import *
 
-# An axfr response of the simple zone "example.com(without soa record at the end)."
-axfr_response1 = b'\x84\x00\x00\x01\x00\x06\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01\xc0\x0c\x00\x06\x00\x01\x00\x00\x0e\x10\x00$\x05dns01\xc0\x0c\x05admin\xc0\x0c\x00\x00\x04\xd2\x00\x00\x0e\x10\x00\x00\x07\x08\x00$\xea\x00\x00\x00\x1c \xc0\x0c\x00\x02\x00\x01\x00\x00\x0e\x10\x00\x02\xc0)\xc0)\x00\x01\x00\x01\x00\x00\x0e\x10\x00\x04\xc0\xa8\x02\x02\x04sql1\xc0\x0c\x00\x02\x00\x01\x00\x00\x0e\x10\x00\x02\xc0)\x04sql2\xc0\x0c\x00\x02\x00\x01\x00\x00\x0e\x10\x00\x02\xc0)\x03ns1\x07subzone\xc0\x0c\x00\x01\x00\x01\x00\x00\x0e\x10\x00\x04\xc0\xa8\x03\x01'
-
-# The second axfr response with only the end soa record.
-axfr_response2 = b'\x84\x00\x00\x00\x00\x01\x00\x00\x00\x00\x07example\x03com\x00\x00\x06\x00\x01\x00\x00\x0e\x10\x00$\x05dns01\xc0\x0c\x05admin\xc0\x0c\x00\x00\x04\xd2\x00\x00\x0e\x10\x00\x00\x07\x08\x00$\xea\x00\x00\x00\x1c '
-
-DB_FILE = 'db_file'
+#
+# Commonly used (mostly constant) test parameters
+#
+TEST_ZONE_NAME = "example.com"
+TEST_RRCLASS = rr_class.IN()
+TEST_DB_FILE = 'db_file'
+TEST_MASTER_IPV4_ADDRESS = '127.0.0.1'
+TEST_MASTER_IPV6_ADDRESS = '::1'
+# XXX: This should be a non priviledge port that is unlikely to be used.
+# If some other process uses this port test will fail.
+TEST_MASTER_PORT = '53535'
+
+soa_rdata = create_rdata(rr_type.SOA(), TEST_RRCLASS,
+                         'master.example.com. admin.example.com ' +
+                         '1234 3600 1800 2419200 7200')
+soa_rrset = rrset(name(TEST_ZONE_NAME), TEST_RRCLASS, rr_type.SOA(),
+                  rr_ttl(3600))
+soa_rrset.add_rdata(soa_rdata)
+example_question = question(name(TEST_ZONE_NAME), TEST_RRCLASS, rr_type.AXFR())
+default_questions = [example_question]
+default_answers = [soa_rrset]
+
+class XfrinTestException(Exception):
+    pass
+
 # Rewrite the class for unittest.
-class MyXfrin(Xfrin):
-    def __init__(self):
+class MockXfrin(Xfrin):
+    def _cc_setup(self):
         pass
 
-class MyXfrinConnection(XfrinConnection):
-    query_data = b''
-    eply_data = b''
+class MockXfrinConnection(XfrinConnection):
+    def __init__(self, TEST_ZONE_NAME, db_file, shutdown_event, master_addr):
+        super().__init__(TEST_ZONE_NAME, db_file, shutdown_event, master_addr)
+        self.query_data = b''
+        self.reply_data = b''
+        self.force_time_out = False
+        self.force_close = False
+        self.qid = None
+        self.response_generator = None
 
     def _handle_xfrin_response(self):
         for rr in super()._handle_xfrin_response():
             pass
 
-    def _get_request_response(self, size):
-        ret = self.reply_data[:size]
+    def _asyncore_loop(self):
+        if self.force_close:
+            self.handle_close()
+        elif not self.force_time_out:
+            self.handle_read()
+
+    def connect_to_master(self):
+        return True
+
+    def recv(self, size):
+        data = self.reply_data[:size]
         self.reply_data = self.reply_data[size:]
-        if (len(ret) < size):
-            raise XfrinException('cannot get reply data')
-        return ret
+        if len(data) < size:
+            raise XfrinTestException('cannot get reply data')
+        return data
 
     def send(self, data):
         self.query_data += data
+        # when the outgoing data is sufficiently large to contain the QID field
+        # (4 octets or more - 16-bit length field + 16-bit QID), extract the
+        # value so that we can construct a matching response.
+        if len(self.query_data) >= 4 and self.qid == None:
+            self.qid = socket.htons(struct.unpack('H', self.query_data[2:4])[0])
+            # if the response generator method is specified, invoke it now.
+            if self.response_generator != None:
+                self.response_generator()
         return len(data)
 
-    def create_response_data(self, data):
-        reply_data = self.query_data[2:4] + data
-        size = socket.htons(len(reply_data))
-        reply_data = struct.pack('H', size) + reply_data
+    def create_response_data(self, response = True, bad_qid = False,
+                             rcode = rcode.NOERROR(),
+                             questions = default_questions,
+                             answers = default_answers):
+        resp = message(message_mode.RENDER)
+        qid = self.qid
+        if bad_qid:
+            qid += 1
+        resp.set_qid(qid)
+        resp.set_opcode(op_code.QUERY())
+        resp.set_rcode(rcode)
+        if response:
+            resp.set_header_flag(message_flag.QR())
+        [resp.add_question(q) for q in questions]
+        [resp.add_rrset(section.ANSWER(), a) for a in answers]
+
+        obuf = output_buffer(0)
+        renderer = message_render(obuf)
+        resp.to_wire(renderer)
+        reply_data = struct.pack('H', socket.htons(obuf.get_length()))
+        reply_data += obuf.get_data()
+
         return reply_data
-
 
 class TestXfrinConnection(unittest.TestCase):
     def setUp(self):
-        self.conn = MyXfrinConnection('example.com.', DB_FILE, threading.Event(), '1.1.1.1')
+        if os.path.exists(TEST_DB_FILE):
+            os.remove(TEST_DB_FILE)
+        self.conn = MockXfrinConnection('example.com.', TEST_DB_FILE,
+                                        threading.Event(),
+                                        TEST_MASTER_IPV4_ADDRESS)
+
+    def tearDown(self):
+        self.conn.close()
+        if os.path.exists(TEST_DB_FILE):
+            os.remove(TEST_DB_FILE)
+
+    def test_init_ip6(self):
+        # This test simply creates a new XfrinConnection object with an
+        # IPv6 address, tries to bind it to an IPv6 wildcard address/port
+        # to confirm an AF_INET6 socket has been created.  A naive application
+        # tends to assume it's IPv4 only and hardcode AF_INET.  This test
+        # uncovers such a bug.
+        c = MockXfrinConnection('example.com.', TEST_DB_FILE,
+                                threading.Event(),
+                                TEST_MASTER_IPV6_ADDRESS)
+        #This test currently fails.  Fix the code, then enable it
+        #c.bind(('::', 0))
+        c.close()
 
     def test_response_with_invalid_msg(self):
-        self.conn.data_exchange = b'aaaxxxx'
+        self.conn.reply_data = b'aaaxxxx'
+        self.assertRaises(XfrinTestException, self.conn._handle_xfrin_response)
+
+    def test_response_without_end_soa(self):
+        self.conn._send_query(rr_type.AXFR())
+        self.conn.reply_data = self.conn.create_response_data()
+        self.assertRaises(XfrinTestException, self.conn._handle_xfrin_response)
+
+    def test_response_bad_qid(self):
+        self.conn._send_query(rr_type.AXFR())
+        self.conn.reply_data = self.conn.create_response_data(bad_qid = True)
+        self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+    def test_response_non_response(self):
+        self.conn._send_query(rr_type.AXFR())
+        self.conn.reply_data = self.conn.create_response_data(response = False)
+        self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+    def test_response_error_code(self):
+        self.conn._send_query(rr_type.AXFR())
+        self.conn.reply_data = self.conn.create_response_data(
+            rcode = rcode.SERVFAIL())
+        self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+    def test_response_multi_question(self):
+        self.conn._send_query(rr_type.AXFR())
+        self.conn.reply_data = self.conn.create_response_data(
+            questions=[example_question, example_question])
+        self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+    def test_response_empty_answer(self):
+        self.conn._send_query(rr_type.AXFR())
+        self.conn.reply_data = self.conn.create_response_data(answers=[])
+        # Should an empty answer trigger an exception?  Even though it's very
+        # unusual it's not necessarily invalid.  Need to revisit.
+        self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+    def test_response_shutdown(self):
+        self.conn.response_generator = self._create_normal_response_data
+        self.conn._shutdown_event.set()
+        self.conn._send_query(rr_type.AXFR())
+        self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+    def test_response_timeout(self):
+        self.conn.response_generator = self._create_normal_response_data
+        self.conn.force_time_out = True
+        self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+    def test_response_remote_close(self):
+        self.conn.response_generator = self._create_normal_response_data
+        self.conn.force_close = True
+        self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
+
+    def test_response_bad_message(self):
+        self.conn.response_generator = self._create_broken_response_data
+        self.conn._send_query(rr_type.AXFR())
         self.assertRaises(Exception, self.conn._handle_xfrin_response)
 
-    def test_response_without_end_soa(self):
-        self.conn._send_query(rr_type.AXFR())
-        self.conn.reply_data = self.conn.create_response_data(axfr_response1) 
-        self.assertRaises(XfrinException, self.conn._handle_xfrin_response)
-
     def test_response(self):
-        self.conn._send_query(rr_type.AXFR())
-        self.conn.reply_data = self.conn.create_response_data(axfr_response1) 
-        self.conn.reply_data += self.conn.create_response_data(axfr_response2) 
+        # normal case.  should silently succeed.
+        self.conn.response_generator = self._create_normal_response_data
+        self.conn._send_query(rr_type.AXFR())
         self.conn._handle_xfrin_response()
 
+    def test_do_xfrin(self):
+        self.conn.response_generator = self._create_normal_response_data
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
+
+    def test_do_xfrin_empty_response(self):
+        # skipping the creation of response data, so the transfer will fail.
+        # (but do_xfrin() always return OK.)
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
+
+    def test_do_xfrin_empty_response(self):
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
+
+    def test_do_xfrin_bad_response(self):
+        self.conn.response_generator = self._create_broken_response_data
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
+
+    def test_do_xfrin_dberror(self):
+        # DB file is under a non existent directory, so its creation will fail,
+        # which will make the transfer fail.
+        self.conn._db_file = "not_existent/" + TEST_DB_FILE
+        self.assertEqual(self.conn.do_xfrin(False), XFRIN_OK)
+
+# This test currently doesn't work due to bug.  Fix it and then enable the test.
+#     def test_do_xfrin_with_soacheck(self):
+#         self.conn.response_generator = self._create_normal_response_data
+#         self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
+
+#     def test_do_xfrin_with_soacheck_bad_response(self):
+#         self.conn.response_generator = self._create_broken_response_data
+#         self.assertEqual(self.conn.do_xfrin(True), XFRIN_OK)
+
+    def _create_normal_response_data(self):
+        # This helper method creates a simple sequence of DNS messages that
+        # forms a valid XFR transaction.  It consists of two messages, each
+        # containing just a single SOA RR.
+        self.conn.reply_data = self.conn.create_response_data()
+        self.conn.reply_data += self.conn.create_response_data()
+
+    def _create_broken_response_data(self):
+        # This helper method creates a bogus "DNS message" that only contains
+        # 4 octets of data.  The DNS message parser will raise an exception.
+        bogus_data = b'xxxx'
+        self.conn.reply_data = struct.pack('H', socket.htons(len(bogus_data)))
+        self.conn.reply_data += bogus_data
+
+class TestXfrinRecorder(unittest.TestCase):
+    def setUp(self):
+        self.recorder = XfrinRecorder()
+
+    def test_increment(self):
+        self.assertEqual(self.recorder.count(), 0)
+        self.recorder.increment(TEST_ZONE_NAME)
+        self.assertEqual(self.recorder.count(), 1)
+        # duplicate "increment" should probably be rejected.  but it's not
+        # checked at this moment
+        self.recorder.increment(TEST_ZONE_NAME)
+        self.assertEqual(self.recorder.count(), 2)
+
+    def test_decrement(self):
+        self.assertEqual(self.recorder.count(), 0)
+        self.recorder.increment(TEST_ZONE_NAME)
+        self.assertEqual(self.recorder.count(), 1)
+        self.recorder.decrement(TEST_ZONE_NAME)
+        self.assertEqual(self.recorder.count(), 0)
+
+    def test_decrement_from_empty(self):
+        self.assertEqual(self.recorder.count(), 0)
+        self.recorder.decrement(TEST_ZONE_NAME)
+        self.assertEqual(self.recorder.count(), 0)
+
+    def test_inprogress(self):
+        self.assertEqual(self.recorder.count(), 0)
+        self.recorder.increment(TEST_ZONE_NAME)
+        self.assertEqual(self.recorder.xfrin_in_progress(TEST_ZONE_NAME), True)
+        self.recorder.decrement(TEST_ZONE_NAME)
+        self.assertEqual(self.recorder.xfrin_in_progress(TEST_ZONE_NAME), False)
+
 class TestXfrin(unittest.TestCase):
+    args = {}
+
+    def setUp(self):
+        self.xfr = MockXfrin()
+        self.args['zone_name'] = TEST_ZONE_NAME
+        self.args['port'] = TEST_MASTER_PORT
+        self.args['master'] = TEST_MASTER_IPV4_ADDRESS
+        self.args['db_file'] = TEST_DB_FILE
+
+    def tearDown(self):
+        self.xfr.shutdown()
+
+    def _do_parse(self):
+        return self.xfr._parse_cmd_params(self.args)
+
     def test_parse_cmd_params(self):
-        xfr = MyXfrin()
-        args = {}
-        args['zone_name'] = 'sd.cn.'
-        args['port'] = '12345'
-        args['master'] = '218.241.108.122'
-        args['db_file'] = '/home/tt'
-
-        name, master, port, db_file = xfr._parse_cmd_params(args)
-        self.assertEqual(port, 12345)
-        self.assertEqual(name, 'sd.cn.')
-        self.assertEqual(master, '218.241.108.122')
-        self.assertEqual(db_file, '/home/tt')
-
-    def test_parse_cmd_params_1(self):
-        xfr = MyXfrin()
-        args = {}
-        args['port'] = '12345'
-        args['master'] = '218.241.108.122'
-        args['db_file'] = '/home/tt'
-
-        self.assertRaises(XfrinException, xfr._parse_cmd_params, args)
-        self.assertRaises(XfrinException, xfr._parse_cmd_params, {'zone_name':'ds.cn.', 'master':'3.3.3'})
-        self.assertRaises(XfrinException, xfr._parse_cmd_params, {'zone_name':'ds.cn.'})
-        self.assertRaises(XfrinException, xfr._parse_cmd_params, {'master':'ds.cn.'})
+        name, master, port, db_file = self._do_parse()
+        self.assertEqual(port, int(TEST_MASTER_PORT))
+        self.assertEqual(name, TEST_ZONE_NAME)
+        self.assertEqual(master, TEST_MASTER_IPV4_ADDRESS)
+        self.assertEqual(db_file, TEST_DB_FILE)
+
+    def test_parse_cmd_params_default_port(self):
+        del self.args['port']
+        self.assertEqual(self._do_parse()[2], 53)
+
+    def test_parse_cmd_params_ip6master(self):
+        self.args['master'] = TEST_MASTER_IPV6_ADDRESS
+        self.assertEqual(self._do_parse()[1], TEST_MASTER_IPV6_ADDRESS)
+
+    def test_parse_cmd_params_nozone(self):
+        # zone name is mandatory.
+        del self.args['zone_name']
+        self.assertRaises(XfrinException, self._do_parse)
+
+    def test_parse_cmd_params_nomaster(self):
+        # master address is mandatory.
+        del self.args['master']
+        self.assertRaises(XfrinException, self._do_parse)
+
+    def test_parse_cmd_params_bad_ip4(self):
+        self.args['master'] = '3.3.3'
+        self.assertRaises(XfrinException, self._do_parse)
+
+    def test_parse_cmd_params_bad_ip6(self):
+        self.args['master'] = '1::1::1'
+        self.assertRaises(XfrinException, self._do_parse)
+
+    def test_parse_cmd_params_bad_port(self):
+        self.args['port'] = '-1'
+        self.assertRaises(XfrinException, self._do_parse)
+
+        self.args['port'] = '65536'
+        self.assertRaises(XfrinException, self._do_parse)
+
+    def test_command_handler_shutdown(self):
+        self.assertEqual(self.xfr.command_handler("shutdown",
+                                                  None)['result'][0], 0)
+        # shutdown command doesn't expect an argument, but accepts it if any.
+        self.assertEqual(self.xfr.command_handler("shutdown",
+                                                  "unused")['result'][0], 0)
+
+        self.assertEqual(self.xfr.command_handler("Shutdown",
+                                                  "unused")['result'][0], 0)
+
+    def test_command_handler_retransfer(self):
+        self.assertEqual(self.xfr.command_handler("retransfer",
+                                                  self.args)['result'][0], 0)
+
+    def test_command_handler_retransfer_badcommand(self):
+        self.args['master'] = 'invalid'
+        self.assertEqual(self.xfr.command_handler("retransfer",
+                                                  self.args)['result'][0], 1)
+
+    def test_command_handler_retransfer_quota(self):
+        for i in range(self.xfr._max_transfers_in - 1):
+            self.xfr.recorder.increment(str(i) + TEST_ZONE_NAME)
+        # there can be one more outstanding transfer.
+        self.assertEqual(self.xfr.command_handler("retransfer",
+                                                  self.args)['result'][0], 0)
+        # make sure the # xfrs would excceed the quota
+        self.xfr.recorder.increment(str(self.xfr._max_transfers_in) + TEST_ZONE_NAME)
+        # this one should fail
+        self.assertEqual(self.xfr.command_handler("retransfer",
+                                                  self.args)['result'][0], 1)
+
+    def test_command_handler_retransfer_inprogress(self):
+        self.xfr.recorder.increment(TEST_ZONE_NAME)
+        self.assertEqual(self.xfr.command_handler("retransfer",
+                                                  self.args)['result'][0], 1)
+
+    def test_command_handler_retransfer_nomodule(self):
+        dns_module = sys.modules['bind10_dns'] # this must exist
+        del sys.modules['bind10_dns']
+        self.assertEqual(self.xfr.command_handler("retransfer",
+                                                  self.args)['result'][0], 1)
+        # sys.modules is global, so we must recover it
+        sys.modules['bind10_dns'] = dns_module
+
+    def test_command_handler_refresh(self):
+        # at this level, refresh is no different than retransfer.
+        # just confirm the successful case with a different family of address.
+        self.args['master'] = TEST_MASTER_IPV6_ADDRESS
+        self.assertEqual(self.xfr.command_handler("refresh",
+                                                  self.args)['result'][0], 0)
 
 if __name__== "__main__":
     try:
         unittest.main()
-        os.remove(DB_FILE)
     except KeyboardInterrupt as e:
         print(e)

Modified: trunk/src/bin/xfrin/xfrin.py.in
==============================================================================
--- trunk/src/bin/xfrin/xfrin.py.in (original)
+++ trunk/src/bin/xfrin/xfrin.py.in Thu May 20 18:10:45 2010
@@ -123,6 +123,14 @@
 
         self._send_data(header_len)
         self._send_data(obuf.get_data())
+
+    def _asyncore_loop(self):
+        '''
+This method is a trivial wrapper for asyncore.loop().  It's extracted from
+_get_request_response so that we can test the rest of the code without
+involving actual communication with a remote server.
+'''
+        asyncore.loop(self._idle_timeout, count = 1)
     
     def _get_request_response(self, size):
         recv_size = 0
@@ -130,7 +138,7 @@
         while recv_size < size:
             self._recv_time_out = True
             self._need_recv_size = size - recv_size
-            asyncore.loop(self._idle_timeout, count = 1)
+            self._asyncore_loop()
             if self._recv_time_out:
                 raise XfrinException('receive data from socket time out.')
 
@@ -320,13 +328,22 @@
 
 class Xfrin():
     def __init__(self, verbose = False):
-        self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
-        self._cc.start()
+        self._cc_setup()
         self._max_transfers_in = 10
         self.recorder = XfrinRecorder()
         self._shutdown_event = threading.Event()
         self._verbose = verbose
 
+    def _cc_setup(self):
+        '''
+This method is used only as part of initialization, but is implemented
+separately for convenience of unit tests; by letting the test code override
+this method we can test most of this class without requiring a command channel.
+'''
+        self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
+                                              self.config_handler,
+                                              self.command_handler)
+        self._cc.start()
 
     def config_handler(self, new_config):
         # TODO, process new config data
@@ -415,7 +432,7 @@
                                                 self._shutdown_event,
                                                 master_addr,
                                                 port, check_soa, self._verbose))
-                                                
+
         xfrin_thread.start()
         return (0, 'zone xfrin is started')
 




More information about the bind10-changes mailing list