[svn] commit: r2874 - in /branches/trac232/src/lib/datasrc/python: libdata_source_python.cc tests/data_source_python_test.py

BIND 10 source code commits bind10-changes at lists.isc.org
Wed Sep 8 21:25:44 UTC 2010


Author: jelte
Date: Wed Sep  8 21:25:44 2010
New Revision: 2874

Log:
better pyRRset_Check()
doIXFR wrapper

Modified:
    branches/trac232/src/lib/datasrc/python/libdata_source_python.cc
    branches/trac232/src/lib/datasrc/python/tests/data_source_python_test.py

Modified: branches/trac232/src/lib/datasrc/python/libdata_source_python.cc
==============================================================================
--- branches/trac232/src/lib/datasrc/python/libdata_source_python.cc (original)
+++ branches/trac232/src/lib/datasrc/python/libdata_source_python.cc Wed Sep  8 21:25:44 2010
@@ -89,6 +89,7 @@
 static PyObject* DataSrc_replaceZone(s_DataSrc* self, PyObject* args);
 static PyObject* DataSrc_haveRRset(s_DataSrc* self, PyObject* args);
 static PyObject* DataSrc_updateCheckPrerequisite(s_DataSrc* self, PyObject* args);
+static PyObject* DataSrc_doIXFR(s_DataSrc* self, PyObject* args);
 
 static PyMethodDef DataSrc_methods[] = {
     { "get_class", reinterpret_cast<PyCFunction>(DataSrc_getClass), METH_NOARGS,
@@ -128,6 +129,8 @@
       "Checks if the given RRset exists" },
     { "update_check_prerequisite", reinterpret_cast<PyCFunction>(DataSrc_updateCheckPrerequisite), METH_VARARGS,
       "Checks the given DNS Update prerequisite" },
+    { "do_ixfr", reinterpret_cast<PyCFunction>(DataSrc_doIXFR), METH_VARARGS,
+      "Perform an IXFR with the given List of RRsets as IXFR input" },
     { NULL, NULL, 0, NULL }
 };
 
@@ -422,17 +425,12 @@
     return isc::dns::rdata::createRdata(rrtype, rrclass, rdata_str);
 }
 
-bool
-pyRRset_Check(const PyObject* rrset_obj)
+int
+pyRRset_Check(PyObject* rrset_obj)
 {
     PyObject* rrset_class = PyObject_GetAttrString(po_DNS_module, "RRset");
-    bool result = false;
-    
-    if (rrset_obj && rrset_obj->ob_type &&
-        rrset_class->ob_type == rrset_obj->ob_type
-    ) {
-        result = true;
-    }
+    int result = PyObject_IsInstance(rrset_obj, rrset_class);
+    Py_DECREF(rrset_class);
     return result;
 }
 
@@ -966,6 +964,43 @@
     return (NULL);
 }
 
+static PyObject*
+DataSrc_doIXFR(s_DataSrc* self, PyObject* args) {
+    s_DataSrcTransaction *transaction;
+    PyObject *rrset_list_obj;
+    
+    if (PyArg_ParseTuple(args, "O!O",
+                         &datasrc_transaction_type, &transaction,
+                         &rrset_list_obj)) {
+        if (!PyList_Check(rrset_list_obj)) {
+            PyErr_SetString(PyExc_TypeError,
+                            "argument 2 of do_ixfr() is not a List");
+            return (NULL);
+        }
+        // ok we should probably change the API for doIXFR there
+        // for now create a fake message (we need the sectioniterator
+        // for the call right now)
+        isc::dns::Message m(isc::dns::Message::RENDER);
+        for (int i = 0; i < PyList_Size(rrset_list_obj); ++i) {
+            PyObject* rrset_obj = PyList_GET_ITEM(rrset_list_obj, i);
+            if (!pyRRset_Check(rrset_obj)) {
+                PyErr_SetString(PyExc_TypeError,
+                    "non-RRset object in argument 2 of do_ixfr()");
+                return (NULL);
+            }
+            m.addRRset(isc::dns::Section::ANSWER(), pyRRset_AsRRsetPtr(rrset_obj), false);
+        }
+        
+        PyObject* result = Py_BuildValue("I",
+                              self->datasrc->doIXFR(
+                              *(transaction->datasrc_transaction),
+                              m.beginSection(isc::dns::Section::ANSWER()),
+                              m.endSection(isc::dns::Section::ANSWER())));
+        return result;
+    }
+    return (NULL);
+}
+
 
 
 //

Modified: branches/trac232/src/lib/datasrc/python/tests/data_source_python_test.py
==============================================================================
--- branches/trac232/src/lib/datasrc/python/tests/data_source_python_test.py (original)
+++ branches/trac232/src/lib/datasrc/python/tests/data_source_python_test.py Wed Sep  8 21:25:44 2010
@@ -405,8 +405,55 @@
         self.assertRaises(TypeError, self.ds.update_check_prerequisite, 1)
         self.assertRaises(TypeError, self.ds.update_check_prerequisite, [])
     
-    
-        
+    def test_do_ixfr_ok(self):
+        rrsets = rrsets_from_file(testdata_path + "/ixfr_ok.rrs")
+
+        result = []
+        n = self.ds.find_addrs(Name("www.example.com"), RRClass.IN(), result, 0, Name("example.com"))
+        self.assertEqual(DataSrc.SUCCESS, n);
+        self.assertEqual(1, len(result))
+        self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n", str(result[0]))
+
+        transaction = DataSrcTransaction(self.ds, Name("example.com"), RRClass.IN())
+        result = self.ds.start_transaction(transaction)
+        self.assertEqual(DataSrc.T_SUCCESS, result)
+
+        result = self.ds.do_ixfr(transaction, rrsets)
+        self.assertEqual(DataSrc.T_SUCCESS, result)
+
+        result = self.ds.commit_transaction(transaction)
+        self.assertEqual(DataSrc.T_SUCCESS, result)
+        
+        result = []
+        n = self.ds.find_addrs(Name("www.example.com"), RRClass.IN(), result, 0, Name("example.com"))
+        self.assertEqual(DataSrc.SUCCESS, n);
+        self.assertEqual(1, len(result))
+        self.assertEqual("www.example.com. 3600 IN A 192.0.2.3\n", str(result[0]))
+        
+    def test_do_ixfr_bad(self):
+        rrsets = rrsets_from_file(testdata_path + "/ixfr_bad_remove_nonexisting.rrs")
+
+        result = []
+        n = self.ds.find_addrs(Name("www.example.com"), RRClass.IN(), result, 0, Name("example.com"))
+        self.assertEqual(DataSrc.SUCCESS, n);
+        self.assertEqual(1, len(result))
+        self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n", str(result[0]))
+
+        transaction = DataSrcTransaction(self.ds, Name("example.com"), RRClass.IN())
+        result = self.ds.start_transaction(transaction)
+        self.assertEqual(DataSrc.T_SUCCESS, result)
+
+        result = self.ds.do_ixfr(transaction, rrsets)
+        self.assertEqual(DataSrc.T_ERROR, result)
+
+        result = self.ds.rollback_transaction(transaction)
+        self.assertEqual(DataSrc.T_SUCCESS, result)
+        
+        result = []
+        n = self.ds.find_addrs(Name("www.example.com"), RRClass.IN(), result, 0, Name("example.com"))
+        self.assertEqual(DataSrc.SUCCESS, n);
+        self.assertEqual(1, len(result))
+        self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n", str(result[0]))
         
 
 # helper functions for tests taken from c++ unittests
@@ -437,53 +484,39 @@
                      "/test.sqlite3 " + testdata_out_path +
                      "/write_test.sqlite3" );
 
-def read_wire_data(filename):
-    data = bytes()
-    file = open(testdata_path + os.sep + filename, "r")
+def rrsets_from_file(filename):
+    rrsets = []
+    prev_rrset = None
+    file = open(filename)
     for line in file:
-        line = line.strip()
-        if line == "" or line.startswith("#"):
-            pass
+        i = 0
+        els = line.split("\t")
+        name = Name(els[0])
+        if (els[1] == ""):
+            i = i + 1
+        ttl = RRTTL(els[i + 1])
+        cls = RRClass(els[i + 2])
+        type = RRType(els[i + 3])
+        rdata_txt = els[i + 4].strip()
+
+        if prev_rrset is not None:
+            if prev_rrset.get_class() == cls and\
+               prev_rrset.get_type() == type and\
+               prev_rrset.get_name() == name and\
+               type != RRType.SOA():
+                prev_rrset.add_rdata(Rdata(type, cls, rdata_txt))
+            else:
+                rrsets.append(prev_rrset)
+                prev_rrset = RRset(name, cls, type, ttl)
+                prev_rrset.add_rdata(Rdata(type, cls, rdata_txt))
         else:
-            cur_data = bytes.fromhex(line)
-            data += cur_data
-
-    return data
-
-def factoryFromFile(message, file):
-    data = read_wire_data(file)
-    message.from_wire(data)
-    pass
-
-# we don't have direct comparison for rrsets right now (should we?
-# should go in the cpp version first then), so also no direct list
-# comparison. Created a helper function
-def compare_rrset_list(list1, list2):
-    if len(list1) != len(list2):
-        return False
-    for i in range(0, len(list1)):
-        if str(list1[i]) != str(list2[i]):
-            return False
-    return True
-
-# a complete message taken from cpp tests, for testing towire and totext
-def create_message():
-    message_render = Message(Message.RENDER)
-    message_render.set_qid(0x1035)
-    message_render.set_opcode(Opcode.QUERY())
-    message_render.set_rcode(Rcode.NOERROR())
-    message_render.set_header_flag(MessageFlag.QR())
-    message_render.set_header_flag(MessageFlag.RD())
-    message_render.set_header_flag(MessageFlag.AA())
-    message_render.add_question(Question(Name("test.example.com"), RRClass("IN"), RRType("A")))
-    rrset = RRset(Name("test.example.com"), RRClass("IN"),
-                                        RRType("A"), RRTTL(3600))
-    rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.1"))
-    rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.2"))
-    message_render.add_rrset(Section.ANSWER(), rrset)
-    return message_render
-
-    
+            prev_rrset = RRset(name, cls, type, ttl)
+            prev_rrset.add_rdata(Rdata(type, cls, rdata_txt))
+        pass
+    if prev_rrset is not None:
+        rrsets.append(prev_rrset)
+    file.close()
+    return rrsets
 
 if __name__ == '__main__':
     unittest.main()




More information about the bind10-changes mailing list