[svn] commit: r1227 - /branches/xfrin/src/bin/xfrin/xfrin.py.in

BIND 10 source code commits bind10-changes at lists.isc.org
Tue Mar 9 08:51:17 UTC 2010


Author: zhanglikun
Date: Tue Mar  9 08:51:17 2010
New Revision: 1227

Log:
Refine code for xfrin. 

Modified:
    branches/xfrin/src/bin/xfrin/xfrin.py.in

Modified: branches/xfrin/src/bin/xfrin/xfrin.py.in
==============================================================================
--- branches/xfrin/src/bin/xfrin/xfrin.py.in (original)
+++ branches/xfrin/src/bin/xfrin/xfrin.py.in Tue Mar  9 08:51:17 2010
@@ -56,19 +56,10 @@
 class XfrinException(Exception): 
     pass
 
-def hextodec(s):
-    return int(chr(s), 16)
 
 class XfrinConnection(asyncore.dispatcher):
-    ''' conn = XfrinConnection()  # start a connection
-        conn.prepare_xfrin(master_addr, port)
-        while True:
-            ret = conn.recv_xfrin()        # start to do xfrin  
-            if ret != XFRIN_OK:
-                break            
-        conn.finish_xfrin()              # finsih the xfrin
-    '''
-        
+    '''Do xfrin in this class. '''    
+
     def __init__(self, zone_name, db_file, master_addr, 
                  port = 53, 
                  check_soa = True, 
@@ -84,51 +75,34 @@
         self.setblocking(1)
         self.connect((master_addr, port))
 
-    '''
-    def _send_query_over_tcp(self, query):
-        msg_len = 
-        send_len = self.send()
-    '''
-
-    def _send_soa_query(self):
-        #TODO
-        query_data = b'072e0100000100000000000002747702636e0000060001'
-        send_len = self.socket.send(query_data)
-        if send_len != len(query_data):
-            raise XfrinException('send query failed')
-
-    def _send_axfr_query(self):
+
+    def _create_query(self, query_type):
         msg = message(message_mode.RENDER)
         msg.set_qid(random.randint(1, 0xFFFF))
         msg.set_opcode(op_code.QUERY())
         msg.set_rcode(rcode.NOERROR())
-        query_question = question(name(self._zone_name), rr_class.IN(), rr_type.AXFR())
+        query_question = question(name(self._zone_name), rr_class.IN(), query_type)
         msg.add_question(query_question)
-
+        return msg
+
+    def _send_data(self, data):
+        size = len(data)
+        total_count = 0
+        while total_count < size:
+            count = self.send(data[total_count:])
+            total_count += count
+
+
+    def _send_query(self, query_type):
+        msg = self._create_query(query_type)
         obuf = output_buffer(0)
         render = message_render(obuf)
         msg.to_wire(render)
         header_len = struct.pack('H', socket.htons(obuf.get_length()))
-        self.send(header_len)
-        self.send(obuf.get_data())
-        #TODO, check error
-
-        '''
-        #TODO
-        value_ =  b'018b0000000100000000000002736402636e0000fc0001'
-        i = 0 
-        byte_data = b'' 
-        while True:
-            byte_data += struct.pack('B', hextodec(value_[i]) * 16 + hextodec(value_[i+1]))
-            i += 2
-            if i >= len(value_):
-                break
-
-        header_len = struct.pack('H', socket.htons(len(byte_data)))
-        self.send(header_len)
-        self.send(byte_data)
-        #TODO, check error
-        '''
+
+        self._send_data(header_len)
+        self._send_data(obuf.get_data())
+
 
     def _get_request_response(self, size):
         recv_size = 0
@@ -160,8 +134,7 @@
         False: soa serial in master is less or equal to the local one.
         True: soa serial in master is bigger
         '''
-        #TODO
-        self._send_soa_query()
+        self._send_query(rr_type.SOA())
         data_size = self._get_request_response(2)
         soa_reply = self._get_request_response(int(data_size))
         return soa_reply 
@@ -170,8 +143,7 @@
         return True
 
     def _send_xfrin_request(self, ixfr_first = False):
-        # TODO, support ixfr
-        self._send_axfr_query()
+        self._send_query(rr_type.AXFR())
         return XFRIN_OK
 
 
@@ -181,18 +153,19 @@
             if check_soa:
                 ret =  self._check_soa_serial()
 
-            print('[xfrin] AXFR is started!')
+            print('[xfrin] transfer of \'%s\': AXFR started' % self._zone_name)
             if ret == XFRIN_OK:    
                 ret = self._send_xfrin_request(ixfr_first)
                 if ret == XFRIN_OK:
                     ret = self._handle_xfrin_response()
 
             self._insert_record_to_sqlite3(self._records)
-            print('[xfrin] AXFR finished!')
+            print('[xfrin] transfer of \'%s\' AXFR ended' % self._zone_name)
  
         except XfrinException as e:
             print(e)
             print('[xfrin] Error happened during xfrin!')
+            #TODO, recover data source. 
         finally:
            self.close()
 
@@ -257,8 +230,6 @@
         at the end of AXFR'''
         isc.auth.sqlite3_ds.load(self._db_file, self._zone_name, rrs)
 
-    def handle_timeout(self):
-        pass
 
     def writable(self):
         '''Ignore the writable socket. '''
@@ -270,7 +241,6 @@
 
 def process_xfrin(xfrin_recorder, zone_name, db_file, master_addr, port, check_soa):
     xfrin_recorder.increment(name)
-    print(zone_name, master_addr, port, check_soa)
     conn = XfrinConnection(zone_name, db_file, master_addr, int(port), check_soa)
     conn.do_xfrin(False)
     xfrin_recorder.decrement(zone_name)
@@ -310,9 +280,11 @@
         self._cc.start()
         self._max_transfers_in = 10
         self.recorder = XfrinRecorder()
+        self._serving = True
 
     def config_handler(self, new_config):
-        print(new_config)
+        answer = create_answer(0, 'ok')
+
 
     def _print_settings(self):
         full_config = self._cc.get_full_config()
@@ -321,21 +293,29 @@
 
 
     def shutdown(self):
-        pass
+        ''' shutdown the xfrin process. the thread which is doing xfrin should be 
+        terminated.
+        '''
+        self._serving = False
+        main_thread = threading.currentThread()
+        for th in threading.enumerate():
+            if th is main_thread:
+                continue
+            th.join()
 
 
     def command_handler(self, command, args):
-        answer = create_answer(0, 'ok')
+        answer = create_answer(0)
         cmd = command
         try:
             if cmd == 'print_message':
-                answer = create_answer(0, args)
+                print(args)
 
             elif cmd == 'print_settings':
                 self._print_settings()  
 
             elif cmd == 'shutdown':
-                answer = create_answer(1, 'not support now')
+                self._serving = False
 
             elif cmd == 'retransfer':
                 zone_name, master, port, db_file = self._parse_cmd_params(args)
@@ -349,7 +329,7 @@
 
         except XfrinException as err:
             answer = create_answer(1, str(err))
-        print(answer)
+
         return answer
 
     def _parse_cmd_params(self, args):
@@ -371,12 +351,14 @@
             
 
     def startup(self):
-        self._cc.check_command()
+        while self._serving:
+            self._cc.check_command()
+
 
     def xfrin_start(self, zone_name, db_file, master_addr, 
                     port = 53, 
                     check_soa = True):
-            # check max_transfer_in, else return quota error
+        # check max_transfer_in, else return quota error
         if self.recorder.count() >= self._max_transfers_in:
             return (1, 'xfrin quota error')
 
@@ -384,7 +366,9 @@
             return (1, 'zone xfrin is in progress')
 
         xfrin_thread = threading.Thread(target = process_xfrin, 
-                         args = (self.recorder, zone_name, db_file, master_addr, port, check_soa))
+                                        args = (self.recorder, 
+                                                zone_name, db_file, master_addr, 
+                                                port, check_soa))
         xfrin_thread.start()
         return (0, 'zone xfrin is started')
 




More information about the bind10-changes mailing list