[svn] commit: r2530 - /branches/trac216/src/bin/xfrin/xfrin.py.in

BIND 10 source code commits bind10-changes at lists.isc.org
Mon Jul 19 03:07:52 UTC 2010


Author: shentingting
Date: Mon Jul 19 03:07:52 2010
New Revision: 2530

Log:
implement new feature items in xfrin TODO list. make new config data applied, deal with communication error in handle_error, and shutdown time delay problem.

Modified:
    branches/trac216/src/bin/xfrin/xfrin.py.in

Modified: branches/trac216/src/bin/xfrin/xfrin.py.in
==============================================================================
--- branches/trac216/src/bin/xfrin/xfrin.py.in (original)
+++ branches/trac216/src/bin/xfrin/xfrin.py.in Mon Jul 19 03:07:52 2010
@@ -24,8 +24,9 @@
 import asyncore
 import struct
 import threading
-import socket
+import socket, _socket, select
 import random
+import time
 from optparse import OptionParser, OptionValueError
 from isc.config.ccsession import *
 try:
@@ -63,6 +64,54 @@
 class XfrinException(Exception): 
     pass
 
+def Xfrin_poll(timeout=0.0, map=None):
+    if map is None:
+        map = socket_map
+    if map:
+        abc = None
+        r = []; w = []; e = []
+        for fd, obj in list(map.items()):
+            if isinstance(obj, _socket.socket):
+                r.append(obj)
+            else:
+                is_r = obj.readable()
+                is_w = obj.writable()
+                if is_r:
+                    r.append(fd)
+                if is_w:
+                    w.append(fd)
+                if is_r or is_w:
+                    e.append(fd)
+        if [] == r == w == e:
+            time.sleep(timeout)
+            return
+        try:
+            r, w, e = select.select(r, w, e, timeout)
+        except select.error as err:
+            if err.args[0] != EINTR:
+                raise
+            else:
+                return
+        for el in r:
+            if isinstance(el, _socket.socket):
+                raise XfrinException("shutdown xfrin!")
+            obj = map.get(el)
+            if obj is None:
+                continue
+            asyncore.read(obj)
+        for fd in w:
+            obj = map.get(fd)
+            if obj is None:
+                continue
+            asyncore.write(obj)
+        for fd in e:
+            obj = map.get(fd)
+            if obj is None:
+                continue
+            _exception(obj)
+            
+asyncore.poll = Xfrin_poll
+
 class XfrinConnection(asyncore.dispatcher):
     '''Do xfrin in this class. '''    
 
@@ -73,7 +122,6 @@
             db_file: specify the data source file.
             check_soa: when it's true, check soa first before sending xfr query
         '''
-
         asyncore.dispatcher.__init__(self, map=sock_map)
         self.create_socket(master_addrinfo[0], master_addrinfo[1])
         self._zone_name = zone_name
@@ -176,7 +224,6 @@
 
     def do_xfrin(self, check_soa, ixfr_first = False):
         '''Do xfr by sending xfr request and parsing response. '''
-
         try:
             ret = XFRIN_OK
             if check_soa:
@@ -190,7 +237,6 @@
                 self._send_query(RRType(252))
                 isc.datasrc.sqlite3_ds.load(self._db_file, self._zone_name,
                                             self._handle_xfrin_response)
-
                 self.log_msg(logstr + 'succeeded')
                 ret = XFRIN_OK
 
@@ -212,8 +258,7 @@
             self.log_msg(logstr + 'failed')
             ret = XFRIN_FAIL
         finally:
-           self.close()
-
+            self.close()
         return ret
 
     def _check_response_header(self, msg):
@@ -297,72 +342,50 @@
 
     def handle_read(self):
         '''Read query's response from socket. '''
-
         self._recvd_data = self.recv(self._need_recv_size)
+        if(len(self._recvd_data)==0):
+            raise XfrinException
         self._recvd_size = len(self._recvd_data)
         self._recv_time_out = False
 
+    def handle_error(self):
+        raise XfrinException("receive data from socket error!")
+
+
     def writable(self):
         '''Ignore the writable socket. '''
-
         return False
+
 
     def log_info(self, msg, type='info'):
         # Overwrite the log function, log nothing
         pass
 
     def log_msg(self, msg):
-        if self._verbose:
+       if self._verbose:
             sys.stdout.write('[b10-xfrin] %s\n' % str(msg))
 
 
-def process_xfrin(xfrin_recorder, zone_name, rrclass, db_file, 
-                  shutdown_event, master_addrinfo, check_soa, verbose):
-    xfrin_recorder.increment(zone_name)
+def process_xfrin(zone_name, rrclass, db_file, 
+                  shutdown_event, master_addrinfo, check_soa, socket_pair, verbose):
     sock_map = {}
+    sock_map[socket_pair.fileno()] = socket_pair
     conn = XfrinConnection(sock_map, zone_name, rrclass, db_file,
                            shutdown_event, master_addrinfo, verbose)
     if conn.connect_to_master():
         conn.do_xfrin(check_soa)
 
-    xfrin_recorder.decrement(zone_name)
-
-
-class XfrinRecorder:
-    def __init__(self):
-        self._lock = threading.Lock()
-        self._zones = []
-
-    def increment(self, zone_name):
-        self._lock.acquire()
-        self._zones.append(zone_name)
-        self._lock.release()
-
-    def decrement(self, zone_name):
-        self._lock.acquire()
-        if zone_name in self._zones:
-            self._zones.remove(zone_name)
-        self._lock.release()
-
-    def xfrin_in_progress(self, zone_name):
-        self._lock.acquire()
-        ret = zone_name in self._zones
-        self._lock.release()
-        return ret
-
-    def count(self):
-        self._lock.acquire()
-        ret = len(self._zones)
-        self._lock.release()
-        return ret
-
 class Xfrin:
     def __init__(self, verbose = False):
         self._cc_setup()
-        self._max_transfers_in = 10
-        self.recorder = XfrinRecorder()
         self._shutdown_event = threading.Event()
         self._verbose = verbose
+
+        #the item in self._zones: zone name and xfr communication thread. 
+        #The item in self._socket_pairs: a socket and xfr communication thread, the main thread uses 
+        #the socket to communicate with this xfr communication thread.
+        self._zones = {}
+        self._socket_pairs = {}
 
     def _cc_setup(self):
         '''
@@ -373,7 +396,10 @@
         self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
                                               self.config_handler,
                                               self.command_handler)
+        
         self._cc.start()
+        config_data = self._cc.get_full_config()
+        self._max_transfers_in = config_data.get("transfers_in")
 
     def _cc_check_command(self):
         '''
@@ -384,12 +410,17 @@
 
     def config_handler(self, new_config):
         # TODO, process new config data
+        self._max_transfers_in = new_config.get("transfers_in") or self._max_transfers_in
         return create_answer(0)
 
     def shutdown(self):
         ''' shutdown the xfrin process. the thread which is doing xfrin should be 
         terminated.
         '''
+        self._filter_hash(self._socket_pairs)
+        for fd in self._socket_pairs.keys():
+            fd.send(b"shutdown")
+
         self._shutdown_event.set()
         main_thread = threading.currentThread()
         for th in threading.enumerate():
@@ -397,12 +428,12 @@
                 continue
             th.join()
 
-
     def command_handler(self, command, args):
         answer = create_answer(0)
         try:
             if command == 'shutdown':
                 self._shutdown_event.set()
+                #self.shutdown()
             elif command == 'retransfer' or command == 'refresh':
                 # The default RR class is IN.  We should fix this so that
                 # the class is passed in the command arg (where we specify
@@ -456,26 +487,45 @@
         while not self._shutdown_event.is_set():
             self._cc_check_command()
 
+    def _filter_hash(self, hash):
+        '''delete zone_name in self._zones or a socket in self._socket_pairs.'''
+        keys = []
+        for key in hash.keys():
+            keys.append(key)
+
+        for key in keys:
+            if not (hash[key]).is_alive():
+                del hash[key]
+
     def xfrin_start(self, zone_name, rrclass, db_file, master_addrinfo,
                     check_soa = True):
         if "libdns_python" not in sys.modules:
             return (1, "xfrin failed, can't load dns message python library: 'libdns_python'")
 
         # check max_transfer_in, else return quota error
-        if self.recorder.count() >= self._max_transfers_in:
-            return (1, 'xfrin quota error')
-
-        if self.recorder.xfrin_in_progress(zone_name):
-            return (1, 'zone xfrin is in progress')
-
+        if len(self._zones) >= self._max_transfers_in:
+            self._filter_hash(self._zones)
+            self._filter_hash(self._socket_pairs)
+            if len(self._zones) >= self._max_transfers_in:
+                return (1, 'xfrin quota error')
+
+        # check whether the zone xfrin is in progress.
+        if zone_name in self._zones.keys():
+            if not (self._zones[zone_name]).is_alive():
+                del self._zones[zone_name]
+            else:
+                return (1, 'zone xfrin is in progress')
+        socket_pair = socket.socketpair()
         xfrin_thread = threading.Thread(target = process_xfrin,
-                                        args = (self.recorder,
-                                                zone_name, rrclass,
+                                        args = (zone_name, rrclass,
                                                 db_file,
                                                 self._shutdown_event,
-                                                master_addrinfo, check_soa,
+                                                master_addrinfo, check_soa, socket_pair[1],
                                                 self._verbose))
 
+        # recored the zone name which zone xfrin is in process
+        self._zones[zone_name] = xfrin_thread
+        self._socket_pairs[socket_pair[0]] = xfrin_thread
         xfrin_thread.start()
         return (0, 'zone xfrin is started')
 




More information about the bind10-changes mailing list