[svn] commit: r3455 - in /branches/trac299/src: bin/xfrout/tests/xfrout_test.py bin/xfrout/xfrout.py.in lib/xfr/fd_share.cc lib/xfr/fd_share.h lib/xfr/fdshare_python.cc

BIND 10 source code commits bind10-changes at lists.isc.org
Fri Nov 5 04:59:32 UTC 2010


Author: chenzhengzhang
Date: Fri Nov  5 04:59:31 2010
New Revision: 3455

Log:
update xfrout according to review comments

Modified:
    branches/trac299/src/bin/xfrout/tests/xfrout_test.py
    branches/trac299/src/bin/xfrout/xfrout.py.in
    branches/trac299/src/lib/xfr/fd_share.cc
    branches/trac299/src/lib/xfr/fd_share.h
    branches/trac299/src/lib/xfr/fdshare_python.cc

Modified: branches/trac299/src/bin/xfrout/tests/xfrout_test.py
==============================================================================
--- branches/trac299/src/bin/xfrout/tests/xfrout_test.py (original)
+++ branches/trac299/src/bin/xfrout/tests/xfrout_test.py Fri Nov  5 04:59:31 2010
@@ -63,6 +63,13 @@
     def handle(self):
         pass
 
+    def _send_data(self, sock, data):
+        size = len(data)
+        total_count = 0
+        while total_count < size:
+            count = sock.send(data[total_count:])
+            total_count += count
+
 class Dbserver:
     def __init__(self):
         self._shutdown_event = threading.Event()
@@ -80,12 +87,20 @@
     def setUp(self):
         request = MySocket(socket.AF_INET,socket.SOCK_STREAM)
         self.log = isc.log.NSLogger('xfrout', '',  severity = 'critical', log_to_console = False )
-        (self.mastsock, self.slavesock) = socket.socketpair()
-        self.xfrsess = MyXfroutSession(request, None, None, self.log, self.slavesock)
+        (self.write_sock, self.read_sock) = socket.socketpair()
+        self.xfrsess = MyXfroutSession(request, None, None, self.log, self.read_sock)
         self.xfrsess.server = Dbserver()
         self.mdata = bytes(b'\xd6=\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
         self.sock = MySocket(socket.AF_INET,socket.SOCK_STREAM)
         self.soa_record = (4, 3, 'example.com.', 'com.example.', 3600, 'SOA', None, 'master.example.com. admin.example.com. 1234 3600 1800 2419200 7200')
+
+    def test_receive_query_message(self):
+        send_msg = b"\xd6=\x00\x00\x00\x01\x00"
+        msg_len = struct.pack('H', socket.htons(len(send_msg)))
+        self.write_sock.send(msg_len)
+        self.write_sock.send(send_msg)
+        recv_msg = self.xfrsess._receive_query_message(self.read_sock)
+        self.assertEqual(recv_msg, send_msg)
 
     def test_parse_query_message(self):
         [get_rcode, get_msg] = self.xfrsess._parse_query_message(self.mdata)

Modified: branches/trac299/src/bin/xfrout/xfrout.py.in
==============================================================================
--- branches/trac299/src/bin/xfrout/xfrout.py.in (original)
+++ branches/trac299/src/bin/xfrout/xfrout.py.in Fri Nov  5 04:59:31 2010
@@ -85,47 +85,53 @@
             try:
                 rlist, wlist, xlist = select.select([self._shutdown_sock, self.request], [], [])
             except select.error as e:
-                if e.args[0] == errno.EINTR:
-                    (rlist, wlist, xlist) = ([], [], [])
-                else:
-                    sys.stderr.write("[b10-xfrout] Error with select(): %s\n" %e)
-                    break
+                if not e.args[0] == errno.EINTR:
+                    self._log.log_message("error", "Error with select(): %s" %e)
+                break
             if self._shutdown_sock in rlist:
                 continue
 
-            fd = recv_fd(self.request.fileno())
-
-            if fd < 0:
+            sock_fd = recv_fd(self.request.fileno())
+
+            if sock_fd < 0:
                 # This may happen when one xfrout process try to connect to
                 # xfrout unix socket server, to check whether there is another
                 # xfrout running.
-                if fd == XFR_FD_REV_FAIL:
+                if sock_fd == XFR_FD_RECEIVE_FAIL:
                     self._log.log_message("error", "Failed to receive the file descriptor for XFR connection")
                 break
 
-            data_len = self.request.recv(2)
-            if not data_len:
-                break
-            msg_len = struct.unpack('!H', data_len)[0]
-            msgdata = self.request.recv(msg_len)
+            # receive query msg
+            msgdata = self._receive_query_message(self.request)
             if not msgdata:
                 break
-            sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+
             try:
-                self.dns_xfrout_start(sock, msgdata)
+                self.dns_xfrout_start(sock_fd, msgdata)
                 #TODO, avoid catching all exceptions
             except Exception as e:
                 self._log.log_message("error", str(e))
 
-            try:
-                sock.shutdown(socket.SHUT_RDWR)
-            except socket.error:
-                # Avoid socket error caused by shutting down
-                # one non-connected socket.
-                pass
-
-            sock.close()
-            os.close(fd)
+            os.close(sock_fd)
+
+    def _receive_query_message(self, sock):
+        ''' receive query message from sock'''
+        # receive data length
+        data_len = sock.recv(2)
+        if not data_len:
+            return None
+        msg_len = struct.unpack('!H', data_len)[0]
+        # receive data
+        recv_size = 0
+        msgdata = b''
+        while recv_size < msg_len:
+            data = sock.recv(msg_len - recv_size)
+            if not data:
+                return None
+            recv_size += len(data)
+            msgdata += data
+
+        return msgdata
 
     def _parse_query_message(self, mdata):
         ''' parse query message to [socket,message]'''
@@ -144,37 +150,37 @@
         return question.get_name().to_text()
 
 
-    def _send_data(self, sock, data):
+    def _send_data(self, sock_fd, data):
         size = len(data)
         total_count = 0
         while total_count < size:
-            count = sock.send(data[total_count:])
+            count = os.write(sock_fd, data[total_count:])
             total_count += count
 
 
-    def _send_message(self, sock, msg):
+    def _send_message(self, sock_fd, msg):
         render = MessageRenderer()
         render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
         msg.to_wire(render)
         header_len = struct.pack('H', socket.htons(render.get_length()))
-        self._send_data(sock, header_len)
-        self._send_data(sock, render.get_data())
-
-
-    def _reply_query_with_error_rcode(self, msg, sock, rcode_):
+        self._send_data(sock_fd, header_len)
+        self._send_data(sock_fd, render.get_data())
+
+
+    def _reply_query_with_error_rcode(self, msg, sock_fd, rcode_):
         msg.make_response()
         msg.set_rcode(rcode_)
-        self._send_message(sock, msg)
-
-
-    def _reply_query_with_format_error(self, msg, sock):
+        self._send_message(sock_fd, msg)
+
+
+    def _reply_query_with_format_error(self, msg, sock_fd):
         '''query message format isn't legal.'''
         if not msg:
             return # query message is invalid. send nothing back.
 
         msg.make_response()
         msg.set_rcode(Rcode.FORMERR())
-        self._send_message(sock, msg)
+        self._send_message(sock_fd, msg)
 
 
     def _zone_is_empty(self, zone):
@@ -210,22 +216,22 @@
         return Rcode.NOERROR()
 
 
-    def dns_xfrout_start(self, sock, msg_query):
+    def dns_xfrout_start(self, sock_fd, msg_query):
         rcode_, msg = self._parse_query_message(msg_query)
         #TODO. create query message and parse header
         if rcode_ != Rcode.NOERROR():
-            return self._reply_query_with_format_error(msg, sock)
+            return self._reply_query_with_format_error(msg, sock_fd)
 
         zone_name = self._get_query_zone_name(msg)
         rcode_ = self._check_xfrout_available(zone_name)
         if rcode_ != Rcode.NOERROR():
             self._log.log_message("info", "transfer of '%s/IN' failed: %s",
                                   zone_name, rcode_.to_text())
-            return self. _reply_query_with_error_rcode(msg, sock, rcode_)
+            return self. _reply_query_with_error_rcode(msg, sock_fd, rcode_)
 
         try:
             self._log.log_message("info", "transfer of '%s/IN': AXFR started" % zone_name)
-            self._reply_xfrout_query(msg, sock, zone_name)
+            self._reply_xfrout_query(msg, sock_fd, zone_name)
             self._log.log_message("info", "transfer of '%s/IN': AXFR end" % zone_name)
         except Exception as err:
             self._log.log_message("error", str(err))
@@ -257,7 +263,7 @@
         rrset_.add_rdata(rdata_)
         return rrset_
 
-    def _send_message_with_last_soa(self, msg, sock, rrset_soa, message_upper_len):
+    def _send_message_with_last_soa(self, msg, sock_fd, rrset_soa, message_upper_len):
         '''Add the SOA record to the end of message. If it can't be
         added, a new message should be created to send out the last soa .
         '''
@@ -266,14 +272,14 @@
         if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
             msg.add_rrset(Section.ANSWER(), rrset_soa)
         else:
-            self._send_message(sock, msg)
+            self._send_message(sock_fd, msg)
             msg = self._clear_message(msg)
             msg.add_rrset(Section.ANSWER(), rrset_soa)
 
-        self._send_message(sock, msg)
-
-
-    def _reply_xfrout_query(self, msg, sock, zone_name):
+        self._send_message(sock_fd, msg)
+
+
+    def _reply_xfrout_query(self, msg, sock_fd, zone_name):
         #TODO, there should be a better way to insert rrset.
         msg.make_response()
         msg.set_header_flag(MessageFlag.AA())
@@ -303,12 +309,12 @@
                 message_upper_len += rrset_len
                 continue
 
-            self._send_message(sock, msg)
+            self._send_message(sock_fd, msg)
             msg = self._clear_message(msg)
             msg.add_rrset(Section.ANSWER(), rrset_) # Add the rrset to the new message
             message_upper_len = rrset_len
 
-        self._send_message_with_last_soa(msg, sock, rrset_soa, message_upper_len)
+        self._send_message_with_last_soa(msg, sock_fd, rrset_soa, message_upper_len)
 
 class UnixSockServer(ThreadingUnixStreamServer):
     '''The unix domain socket server which accept xfr query sent from auth server.'''
@@ -320,14 +326,14 @@
         self._lock = threading.Lock()
         self._transfers_counter = 0
         self._shutdown_event = shutdown_event
-        self._master_sock, self._slave_sock = socket.socketpair()
+        self._write_sock, self._read_sock = socket.socketpair()
         self._log = log
         self.update_config_data(config_data)
         self._cc = cc
 
     def finish_request(self, request, client_address):
         '''Finish one request by instantiating RequestHandlerClass.'''
-        self.RequestHandlerClass(request, client_address, self, self._log, self._slave_sock)
+        self.RequestHandlerClass(request, client_address, self, self._log, self._read_sock)
 
     def _remove_unused_sock_file(self, sock_file):
         '''Try to remove the socket file. If the file is being used
@@ -335,8 +341,8 @@
         If it's not a socket file or nobody is listening
         , it will be removed. If it can't be removed, exit from python. '''
         if self._sock_file_in_use(sock_file):
-            sys.stderr.write("[b10-xfrout] Fail to start xfrout process, unix socket"
-                  " file '%s' is being used by another xfrout process\n" % sock_file)
+            self._log.log_message("error", "Fail to start xfrout process, unix socket file '%s'"
+                                 " is being used by another xfrout process\n" % sock_file)
             sys.exit(0)
         else:
             if not os.path.exists(sock_file):
@@ -345,7 +351,7 @@
             try:
                 os.unlink(sock_file)
             except OSError as err:
-                sys.stderr.write('[b10-xfrout] Fail to remove file %s: %s\n' % (sock_file, err))
+                self._log.log_message("error", '[b10-xfrout] Fail to remove file %s: %s\n' % (sock_file, err))
                 sys.exit(0)
 
     def _sock_file_in_use(self, sock_file):
@@ -361,7 +367,7 @@
             return True
 
     def shutdown(self):
-        self._master_sock.send(b"shutdown") #terminate the xfrout session thread
+        self._write_sock.send(b"shutdown") #terminate the xfrout session thread
         ThreadingUnixStreamServer.shutdown(self)
         try:
             os.unlink(self._sock_file)

Modified: branches/trac299/src/lib/xfr/fd_share.cc
==============================================================================
--- branches/trac299/src/lib/xfr/fd_share.cc (original)
+++ branches/trac299/src/lib/xfr/fd_share.cc Fri Nov  5 04:59:31 2010
@@ -88,12 +88,12 @@
     msghdr.msg_controllen = cmsg_space(sizeof(int));
     msghdr.msg_control = malloc(msghdr.msg_controllen);
     if (msghdr.msg_control == NULL) {
-        return (XFR_FD_REV_FAIL);
+        return (-1);
     }
 
     if (recvmsg(sock, &msghdr, 0) < 0) {
         free(msghdr.msg_control);
-        return (XFR_FD_REV_FAIL);
+        return (XFR_FD_RECEIVE_FAIL);
     }
     const struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msghdr);
     int fd = -1;

Modified: branches/trac299/src/lib/xfr/fd_share.h
==============================================================================
--- branches/trac299/src/lib/xfr/fd_share.h (original)
+++ branches/trac299/src/lib/xfr/fd_share.h Fri Nov  5 04:59:31 2010
@@ -20,10 +20,12 @@
 namespace isc {
 namespace xfr {
 
-const int XFR_FD_REV_FAIL = -2;
+// Failed to receive xfr socket descriptor "fd" on unix domain socket 'sock'
+const int XFR_FD_RECEIVE_FAIL = -2;
 
 // Receive socket descriptor on unix domain socket 'sock'.
 // Returned value is the socket descriptor received.
+// Returned XFR_FD_RECEIVE_FAIL if failed to receive xfr socket descriptor
 // Errors are indicated by a return value of -1.
 int recv_fd(const int sock);
 
@@ -37,6 +39,6 @@
 
 #endif
 
-// Local Variables: 
+// Local Variables:
 // mode: c++
-// End: 
+// End:

Modified: branches/trac299/src/lib/xfr/fdshare_python.cc
==============================================================================
--- branches/trac299/src/lib/xfr/fdshare_python.cc (original)
+++ branches/trac299/src/lib/xfr/fdshare_python.cc Fri Nov  5 04:59:31 2010
@@ -22,7 +22,6 @@
 
 #include <xfr/fd_share.h>
 
-static PyObject *XFR_FD_REV_FAIL;
 
 static PyObject*
 fdshare_recv_fd(PyObject *self UNUSED_PARAM, PyObject *args) {
@@ -70,8 +69,11 @@
         return (NULL);
     }
 
-    XFR_FD_REV_FAIL = Py_BuildValue("i", isc::xfr::XFR_FD_REV_FAIL);
-    PyModule_AddObject(mod, "XFR_FD_REV_FAIL", XFR_FD_REV_FAIL);
+    PyObject *XFR_FD_RECEIVE_FAIL = Py_BuildValue("i", isc::xfr::XFR_FD_RECEIVE_FAIL);
+    int ret = PyModule_AddObject(mod, "XFR_FD_RECEIVE_FAIL", XFR_FD_RECEIVE_FAIL);
+    if (-1 == ret) {
+        return (NULL);
+    }
 
     return (mod);
 }




More information about the bind10-changes mailing list