[svn] commit: r2874 - in /branches/trac232/src/lib/datasrc/python: libdata_source_python.cc tests/data_source_python_test.py
BIND 10 source code commits
bind10-changes at lists.isc.org
Wed Sep 8 21:25:44 UTC 2010
Author: jelte
Date: Wed Sep 8 21:25:44 2010
New Revision: 2874
Log:
better pyRRset_Check()
doIXFR wrapper
Modified:
branches/trac232/src/lib/datasrc/python/libdata_source_python.cc
branches/trac232/src/lib/datasrc/python/tests/data_source_python_test.py
Modified: branches/trac232/src/lib/datasrc/python/libdata_source_python.cc
==============================================================================
--- branches/trac232/src/lib/datasrc/python/libdata_source_python.cc (original)
+++ branches/trac232/src/lib/datasrc/python/libdata_source_python.cc Wed Sep 8 21:25:44 2010
@@ -89,6 +89,7 @@
static PyObject* DataSrc_replaceZone(s_DataSrc* self, PyObject* args);
static PyObject* DataSrc_haveRRset(s_DataSrc* self, PyObject* args);
static PyObject* DataSrc_updateCheckPrerequisite(s_DataSrc* self, PyObject* args);
+static PyObject* DataSrc_doIXFR(s_DataSrc* self, PyObject* args);
static PyMethodDef DataSrc_methods[] = {
{ "get_class", reinterpret_cast<PyCFunction>(DataSrc_getClass), METH_NOARGS,
@@ -128,6 +129,8 @@
"Checks if the given RRset exists" },
{ "update_check_prerequisite", reinterpret_cast<PyCFunction>(DataSrc_updateCheckPrerequisite), METH_VARARGS,
"Checks the given DNS Update prerequisite" },
+ { "do_ixfr", reinterpret_cast<PyCFunction>(DataSrc_doIXFR), METH_VARARGS,
+ "Perform an IXFR with the given List of RRsets as IXFR input" },
{ NULL, NULL, 0, NULL }
};
@@ -422,17 +425,12 @@
return isc::dns::rdata::createRdata(rrtype, rrclass, rdata_str);
}
-bool
-pyRRset_Check(const PyObject* rrset_obj)
+int
+pyRRset_Check(PyObject* rrset_obj)
{
PyObject* rrset_class = PyObject_GetAttrString(po_DNS_module, "RRset");
- bool result = false;
-
- if (rrset_obj && rrset_obj->ob_type &&
- rrset_class->ob_type == rrset_obj->ob_type
- ) {
- result = true;
- }
+ int result = PyObject_IsInstance(rrset_obj, rrset_class);
+ Py_DECREF(rrset_class);
return result;
}
@@ -966,6 +964,43 @@
return (NULL);
}
+static PyObject*
+DataSrc_doIXFR(s_DataSrc* self, PyObject* args) {
+ s_DataSrcTransaction *transaction;
+ PyObject *rrset_list_obj;
+
+ if (PyArg_ParseTuple(args, "O!O",
+ &datasrc_transaction_type, &transaction,
+ &rrset_list_obj)) {
+ if (!PyList_Check(rrset_list_obj)) {
+ PyErr_SetString(PyExc_TypeError,
+ "argument 2 of do_ixfr() is not a List");
+ return (NULL);
+ }
+ // ok we should probably change the API for doIXFR there
+ // for now create a fake message (we need the sectioniterator
+ // for the call right now)
+ isc::dns::Message m(isc::dns::Message::RENDER);
+ for (int i = 0; i < PyList_Size(rrset_list_obj); ++i) {
+ PyObject* rrset_obj = PyList_GET_ITEM(rrset_list_obj, i);
+ if (!pyRRset_Check(rrset_obj)) {
+ PyErr_SetString(PyExc_TypeError,
+ "non-RRset object in argument 2 of do_ixfr()");
+ return (NULL);
+ }
+ m.addRRset(isc::dns::Section::ANSWER(), pyRRset_AsRRsetPtr(rrset_obj), false);
+ }
+
+ PyObject* result = Py_BuildValue("I",
+ self->datasrc->doIXFR(
+ *(transaction->datasrc_transaction),
+ m.beginSection(isc::dns::Section::ANSWER()),
+ m.endSection(isc::dns::Section::ANSWER())));
+ return result;
+ }
+ return (NULL);
+}
+
//
Modified: branches/trac232/src/lib/datasrc/python/tests/data_source_python_test.py
==============================================================================
--- branches/trac232/src/lib/datasrc/python/tests/data_source_python_test.py (original)
+++ branches/trac232/src/lib/datasrc/python/tests/data_source_python_test.py Wed Sep 8 21:25:44 2010
@@ -405,8 +405,55 @@
self.assertRaises(TypeError, self.ds.update_check_prerequisite, 1)
self.assertRaises(TypeError, self.ds.update_check_prerequisite, [])
-
-
+ def test_do_ixfr_ok(self):
+ rrsets = rrsets_from_file(testdata_path + "/ixfr_ok.rrs")
+
+ result = []
+ n = self.ds.find_addrs(Name("www.example.com"), RRClass.IN(), result, 0, Name("example.com"))
+ self.assertEqual(DataSrc.SUCCESS, n);
+ self.assertEqual(1, len(result))
+ self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n", str(result[0]))
+
+ transaction = DataSrcTransaction(self.ds, Name("example.com"), RRClass.IN())
+ result = self.ds.start_transaction(transaction)
+ self.assertEqual(DataSrc.T_SUCCESS, result)
+
+ result = self.ds.do_ixfr(transaction, rrsets)
+ self.assertEqual(DataSrc.T_SUCCESS, result)
+
+ result = self.ds.commit_transaction(transaction)
+ self.assertEqual(DataSrc.T_SUCCESS, result)
+
+ result = []
+ n = self.ds.find_addrs(Name("www.example.com"), RRClass.IN(), result, 0, Name("example.com"))
+ self.assertEqual(DataSrc.SUCCESS, n);
+ self.assertEqual(1, len(result))
+ self.assertEqual("www.example.com. 3600 IN A 192.0.2.3\n", str(result[0]))
+
+ def test_do_ixfr_bad(self):
+ rrsets = rrsets_from_file(testdata_path + "/ixfr_bad_remove_nonexisting.rrs")
+
+ result = []
+ n = self.ds.find_addrs(Name("www.example.com"), RRClass.IN(), result, 0, Name("example.com"))
+ self.assertEqual(DataSrc.SUCCESS, n);
+ self.assertEqual(1, len(result))
+ self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n", str(result[0]))
+
+ transaction = DataSrcTransaction(self.ds, Name("example.com"), RRClass.IN())
+ result = self.ds.start_transaction(transaction)
+ self.assertEqual(DataSrc.T_SUCCESS, result)
+
+ result = self.ds.do_ixfr(transaction, rrsets)
+ self.assertEqual(DataSrc.T_ERROR, result)
+
+ result = self.ds.rollback_transaction(transaction)
+ self.assertEqual(DataSrc.T_SUCCESS, result)
+
+ result = []
+ n = self.ds.find_addrs(Name("www.example.com"), RRClass.IN(), result, 0, Name("example.com"))
+ self.assertEqual(DataSrc.SUCCESS, n);
+ self.assertEqual(1, len(result))
+ self.assertEqual("www.example.com. 3600 IN A 192.0.2.1\n", str(result[0]))
# helper functions for tests taken from c++ unittests
@@ -437,53 +484,39 @@
"/test.sqlite3 " + testdata_out_path +
"/write_test.sqlite3" );
-def read_wire_data(filename):
- data = bytes()
- file = open(testdata_path + os.sep + filename, "r")
+def rrsets_from_file(filename):
+ rrsets = []
+ prev_rrset = None
+ file = open(filename)
for line in file:
- line = line.strip()
- if line == "" or line.startswith("#"):
- pass
+ i = 0
+ els = line.split("\t")
+ name = Name(els[0])
+ if (els[1] == ""):
+ i = i + 1
+ ttl = RRTTL(els[i + 1])
+ cls = RRClass(els[i + 2])
+ type = RRType(els[i + 3])
+ rdata_txt = els[i + 4].strip()
+
+ if prev_rrset is not None:
+ if prev_rrset.get_class() == cls and\
+ prev_rrset.get_type() == type and\
+ prev_rrset.get_name() == name and\
+ type != RRType.SOA():
+ prev_rrset.add_rdata(Rdata(type, cls, rdata_txt))
+ else:
+ rrsets.append(prev_rrset)
+ prev_rrset = RRset(name, cls, type, ttl)
+ prev_rrset.add_rdata(Rdata(type, cls, rdata_txt))
else:
- cur_data = bytes.fromhex(line)
- data += cur_data
-
- return data
-
-def factoryFromFile(message, file):
- data = read_wire_data(file)
- message.from_wire(data)
- pass
-
-# we don't have direct comparison for rrsets right now (should we?
-# should go in the cpp version first then), so also no direct list
-# comparison. Created a helper function
-def compare_rrset_list(list1, list2):
- if len(list1) != len(list2):
- return False
- for i in range(0, len(list1)):
- if str(list1[i]) != str(list2[i]):
- return False
- return True
-
-# a complete message taken from cpp tests, for testing towire and totext
-def create_message():
- message_render = Message(Message.RENDER)
- message_render.set_qid(0x1035)
- message_render.set_opcode(Opcode.QUERY())
- message_render.set_rcode(Rcode.NOERROR())
- message_render.set_header_flag(MessageFlag.QR())
- message_render.set_header_flag(MessageFlag.RD())
- message_render.set_header_flag(MessageFlag.AA())
- message_render.add_question(Question(Name("test.example.com"), RRClass("IN"), RRType("A")))
- rrset = RRset(Name("test.example.com"), RRClass("IN"),
- RRType("A"), RRTTL(3600))
- rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.1"))
- rrset.add_rdata(Rdata(RRType("A"), RRClass("IN"), "192.0.2.2"))
- message_render.add_rrset(Section.ANSWER(), rrset)
- return message_render
-
-
+ prev_rrset = RRset(name, cls, type, ttl)
+ prev_rrset.add_rdata(Rdata(type, cls, rdata_txt))
+ pass
+ if prev_rrset is not None:
+ rrsets.append(prev_rrset)
+ file.close()
+ return rrsets
if __name__ == '__main__':
unittest.main()
More information about the bind10-changes
mailing list