[svn] commit: r3301 - in /branches/trac299/src: bin/auth/auth_srv.cc bin/xfrout/xfrout.py.in bin/zonemgr/zonemgr.py.in lib/xfr/fd_share.cc lib/xfr/xfrout_client.cc

BIND 10 source code commits bind10-changes at lists.isc.org
Thu Oct 21 06:22:14 UTC 2010


Author: chenzhengzhang
Date: Thu Oct 21 06:22:14 2010
New Revision: 3301

Log:
support long tcp connectin between auth and xfrout

Modified:
    branches/trac299/src/bin/auth/auth_srv.cc
    branches/trac299/src/bin/xfrout/xfrout.py.in
    branches/trac299/src/bin/zonemgr/zonemgr.py.in
    branches/trac299/src/lib/xfr/fd_share.cc
    branches/trac299/src/lib/xfr/xfrout_client.cc

Modified: branches/trac299/src/bin/auth/auth_srv.cc
==============================================================================
--- branches/trac299/src/bin/auth/auth_srv.cc (original)
+++ branches/trac299/src/bin/auth/auth_srv.cc Thu Oct 21 06:22:14 2010
@@ -360,8 +360,10 @@
     }
 
     try {
-        xfrout_client_.connect();
-        xfrout_connected_ = true;
+        if (!xfrout_connected_) {
+            xfrout_client_.connect();
+            xfrout_connected_ = true;
+        }
         xfrout_client_.sendXfroutRequestInfo(
             io_message.getSocket().getNative(),
             io_message.getData(),
@@ -384,9 +386,6 @@
                          verbose_mode_);
         return (true);
     }
-
-    xfrout_client_.disconnect();
-    xfrout_connected_ = false;
 
     return (false);
 }

Modified: branches/trac299/src/bin/xfrout/xfrout.py.in
==============================================================================
--- branches/trac299/src/bin/xfrout/xfrout.py.in (original)
+++ branches/trac299/src/bin/xfrout/xfrout.py.in Thu Oct 21 06:22:14 2010
@@ -61,6 +61,8 @@
 MAX_TRANSFERS_OUT = 10
 VERBOSE_MODE = False
 
+
+SESSION_RUNNABLE = False
 XFROUT_MAX_MESSAGE_SIZE = 65535
 
 def get_rrset_len(rrset):
@@ -71,42 +73,61 @@
 
 
 class XfroutSession(BaseRequestHandler):
-    def __init__(self, request, client_address, server, log):
+    def __init__(self, request, client_address, server, log, sock):
         # The initializer for the superclass may call functions
         # that need _log to be set, so we set it first
         self._log = log
+        self._shutdown_sock = sock
         BaseRequestHandler.__init__(self, request, client_address, server)
 
     def handle(self):
-        fd = recv_fd(self.request.fileno())
-        
-        if fd < 0:
-            # This may happen when one xfrout process try to connect to
-            # xfrout unix socket server, to check whether there is another
-            # xfrout running. 
-            self._log.log_message("error", "Failed to receive the file descriptor for XFR connection")
-            return
-
-        data_len = self.request.recv(2)
-        msg_len = struct.unpack('!H', data_len)[0]
-        msgdata = self.request.recv(msg_len)
-        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
-        try:
-            self.dns_xfrout_start(sock, msgdata)
-            #TODO, avoid catching all exceptions
-        except Exception as e:
-            self._log.log_message("error", str(e))
-
-        try:
-            sock.shutdown(socket.SHUT_RDWR)
-        except socket.error:
-            # Avoid socket error caused by shutting down 
-            # one non-connected socket.
-            pass
-
-        sock.close()
-        os.close(fd)
-        pass
+        # Handle a request until shutdown or xfrout client is closed.
+        global SESSION_RUNNABLE
+        while SESSION_RUNNABLE:
+            try:
+                rlist, wlist, xlist = select.select([self._shutdown_sock, self.request], [], [])
+            except select.error as e:
+                if e.args[0] == errno.EINTR:
+                    (rlist, wlist, xlist) = ([], [], [])
+                else:
+                    sys.stderr.write("[b10-xfrout] Error with select(): %s\n" %e)
+                    break
+            if self._shutdown_sock in rlist:
+                continue
+
+            fd = recv_fd(self.request.fileno())
+            
+            if fd < 0:
+                # This may happen when one xfrout process try to connect to
+                # xfrout unix socket server, to check whether there is another
+                # xfrout running. 
+                if fd == -2:
+                    self._log.log_message("error", "Failed to receive the file descriptor for XFR connection")
+                break 
+
+            data_len = self.request.recv(2)
+            if not data_len:
+                break
+            msg_len = struct.unpack('!H', data_len)[0]
+            msgdata = self.request.recv(msg_len)
+            if not msgdata:
+                break
+            sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+            try:
+                self.dns_xfrout_start(sock, msgdata)
+                #TODO, avoid catching all exceptions
+            except Exception as e:
+                self._log.log_message("error", str(e))
+
+            try:
+                sock.shutdown(socket.SHUT_RDWR)
+            except socket.error:
+                # Avoid socket error caused by shutting down 
+                # one non-connected socket.
+                pass
+ 
+            sock.close()
+            os.close(fd)
 
     def _parse_query_message(self, mdata):
         ''' parse query message to [socket,message]'''
@@ -301,13 +322,14 @@
         self._lock = threading.Lock()
         self._transfers_counter = 0
         self._shutdown_event = shutdown_event
+        self._master_sock, self._slave_sock = socket.socketpair()
         self._log = log
         self.update_config_data(config_data)
         self._cc = cc
         
     def finish_request(self, request, client_address):
         '''Finish one request by instantiating RequestHandlerClass.'''
-        self.RequestHandlerClass(request, client_address, self, self._log)
+        self.RequestHandlerClass(request, client_address, self, self._log, self._slave_sock)
 
     def _remove_unused_sock_file(self, sock_file):
         '''Try to remove the socket file. If the file is being used 
@@ -341,6 +363,9 @@
             return True 
 
     def shutdown(self):
+        global SESSION_RUNNABLE
+        SESSION_RUNNABLE = False
+        self._master_sock.send(b"shutdown") #terminate the xfrout session thread
         ThreadingUnixStreamServer.shutdown(self)
         try:
             os.unlink(self._sock_file)
@@ -496,6 +521,8 @@
 
     def run(self):
         '''Get and process all commands sent from cfgmgr or other modules. '''
+        global SESSION_RUNNABLE
+        SESSION_RUNNABLE = True
         while not self._shutdown_event.is_set():
             self._cc.check_command(False)
 

Modified: branches/trac299/src/bin/zonemgr/zonemgr.py.in
==============================================================================
--- branches/trac299/src/bin/zonemgr/zonemgr.py.in (original)
+++ branches/trac299/src/bin/zonemgr/zonemgr.py.in Thu Oct 21 06:22:14 2010
@@ -326,14 +326,14 @@
     def _run_timer(self, start_event):
         start_event.set()
         while self._running:
-            # If zonemgr has no zone, set timer timeout to LOWERBOUND_RETRY.
+            # If zonemgr has no zone, set timer timeout to self._lowerbound_retry.
             if self._zone_mgr_is_empty():
                 timeout = self._lowerbound_retry 
             else:
                 zone_need_refresh = self._find_need_do_refresh_zone()
-                # If don't get zone with minimum next refresh time, set timer timeout to LOWERBOUND_RETRY
+                # If don't get zone with minimum next refresh time, set timer timeout to self._lowerbound_retry.
                 if not zone_need_refresh:
-                    timeout = LOWERBOUND_RETRY
+                    timeout = self._lowerbound_retry 
                 else:
                     timeout = self._get_zone_next_refresh_time(zone_need_refresh) - self._get_current_time()
                     if (timeout < 0):

Modified: branches/trac299/src/lib/xfr/fd_share.cc
==============================================================================
--- branches/trac299/src/lib/xfr/fd_share.cc (original)
+++ branches/trac299/src/lib/xfr/fd_share.cc Thu Oct 21 06:22:14 2010
@@ -88,12 +88,12 @@
     msghdr.msg_controllen = cmsg_space(sizeof(int));
     msghdr.msg_control = malloc(msghdr.msg_controllen);
     if (msghdr.msg_control == NULL) {
-        return (-1);
+        return (-2);
     }
 
     if (recvmsg(sock, &msghdr, 0) < 0) {
         free(msghdr.msg_control);
-        return (-1);
+        return (-2);
     }
     const struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msghdr);
     int fd = -1;

Modified: branches/trac299/src/lib/xfr/xfrout_client.cc
==============================================================================
--- branches/trac299/src/lib/xfr/xfrout_client.cc (original)
+++ branches/trac299/src/lib/xfr/xfrout_client.cc Thu Oct 21 06:22:14 2010
@@ -94,12 +94,6 @@
                   "failed to send XFR request data to xfrout module");
     }
     
-    int databuf = 0;
-    if (recv(impl_->socket_.native(), &databuf, sizeof(int), 0) != 0) {
-        isc_throw(XfroutError,
-                  "xfr query hasn't been processed properly by xfrout module");
-    }
-
     return (0);
 }
 




More information about the bind10-changes mailing list