[svn] commit: r2530 - /branches/trac216/src/bin/xfrin/xfrin.py.in
BIND 10 source code commits
bind10-changes at lists.isc.org
Mon Jul 19 03:07:52 UTC 2010
Author: shentingting
Date: Mon Jul 19 03:07:52 2010
New Revision: 2530
Log:
implement new feature items in xfrin TODO list. make new config data applied, deal with communication error in handle_error, and shutdown time delay problem.
Modified:
branches/trac216/src/bin/xfrin/xfrin.py.in
Modified: branches/trac216/src/bin/xfrin/xfrin.py.in
==============================================================================
--- branches/trac216/src/bin/xfrin/xfrin.py.in (original)
+++ branches/trac216/src/bin/xfrin/xfrin.py.in Mon Jul 19 03:07:52 2010
@@ -24,8 +24,9 @@
import asyncore
import struct
import threading
-import socket
+import socket, _socket, select
import random
+import time
from optparse import OptionParser, OptionValueError
from isc.config.ccsession import *
try:
@@ -63,6 +64,54 @@
class XfrinException(Exception):
pass
+def Xfrin_poll(timeout=0.0, map=None):
+ if map is None:
+ map = socket_map
+ if map:
+ abc = None
+ r = []; w = []; e = []
+ for fd, obj in list(map.items()):
+ if isinstance(obj, _socket.socket):
+ r.append(obj)
+ else:
+ is_r = obj.readable()
+ is_w = obj.writable()
+ if is_r:
+ r.append(fd)
+ if is_w:
+ w.append(fd)
+ if is_r or is_w:
+ e.append(fd)
+ if [] == r == w == e:
+ time.sleep(timeout)
+ return
+ try:
+ r, w, e = select.select(r, w, e, timeout)
+ except select.error as err:
+ if err.args[0] != EINTR:
+ raise
+ else:
+ return
+ for el in r:
+ if isinstance(el, _socket.socket):
+ raise XfrinException("shutdown xfrin!")
+ obj = map.get(el)
+ if obj is None:
+ continue
+ asyncore.read(obj)
+ for fd in w:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ asyncore.write(obj)
+ for fd in e:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ _exception(obj)
+
+asyncore.poll = Xfrin_poll
+
class XfrinConnection(asyncore.dispatcher):
'''Do xfrin in this class. '''
@@ -73,7 +122,6 @@
db_file: specify the data source file.
check_soa: when it's true, check soa first before sending xfr query
'''
-
asyncore.dispatcher.__init__(self, map=sock_map)
self.create_socket(master_addrinfo[0], master_addrinfo[1])
self._zone_name = zone_name
@@ -176,7 +224,6 @@
def do_xfrin(self, check_soa, ixfr_first = False):
'''Do xfr by sending xfr request and parsing response. '''
-
try:
ret = XFRIN_OK
if check_soa:
@@ -190,7 +237,6 @@
self._send_query(RRType(252))
isc.datasrc.sqlite3_ds.load(self._db_file, self._zone_name,
self._handle_xfrin_response)
-
self.log_msg(logstr + 'succeeded')
ret = XFRIN_OK
@@ -212,8 +258,7 @@
self.log_msg(logstr + 'failed')
ret = XFRIN_FAIL
finally:
- self.close()
-
+ self.close()
return ret
def _check_response_header(self, msg):
@@ -297,72 +342,50 @@
def handle_read(self):
'''Read query's response from socket. '''
-
self._recvd_data = self.recv(self._need_recv_size)
+ if(len(self._recvd_data)==0):
+ raise XfrinException
self._recvd_size = len(self._recvd_data)
self._recv_time_out = False
+ def handle_error(self):
+ raise XfrinException("receive data from socket error!")
+
+
def writable(self):
'''Ignore the writable socket. '''
-
return False
+
def log_info(self, msg, type='info'):
# Overwrite the log function, log nothing
pass
def log_msg(self, msg):
- if self._verbose:
+ if self._verbose:
sys.stdout.write('[b10-xfrin] %s\n' % str(msg))
-def process_xfrin(xfrin_recorder, zone_name, rrclass, db_file,
- shutdown_event, master_addrinfo, check_soa, verbose):
- xfrin_recorder.increment(zone_name)
+def process_xfrin(zone_name, rrclass, db_file,
+ shutdown_event, master_addrinfo, check_soa, socket_pair, verbose):
sock_map = {}
+ sock_map[socket_pair.fileno()] = socket_pair
conn = XfrinConnection(sock_map, zone_name, rrclass, db_file,
shutdown_event, master_addrinfo, verbose)
if conn.connect_to_master():
conn.do_xfrin(check_soa)
- xfrin_recorder.decrement(zone_name)
-
-
-class XfrinRecorder:
- def __init__(self):
- self._lock = threading.Lock()
- self._zones = []
-
- def increment(self, zone_name):
- self._lock.acquire()
- self._zones.append(zone_name)
- self._lock.release()
-
- def decrement(self, zone_name):
- self._lock.acquire()
- if zone_name in self._zones:
- self._zones.remove(zone_name)
- self._lock.release()
-
- def xfrin_in_progress(self, zone_name):
- self._lock.acquire()
- ret = zone_name in self._zones
- self._lock.release()
- return ret
-
- def count(self):
- self._lock.acquire()
- ret = len(self._zones)
- self._lock.release()
- return ret
-
class Xfrin:
def __init__(self, verbose = False):
self._cc_setup()
- self._max_transfers_in = 10
- self.recorder = XfrinRecorder()
self._shutdown_event = threading.Event()
self._verbose = verbose
+
+ #the item in self._zones: zone name and xfr communication thread.
+ #The item in self._socket_pairs: a socket and xfr communication thread, the main thread uses
+ #the socket to communicate with this xfr communication thread.
+ self._zones = {}
+ self._socket_pairs = {}
def _cc_setup(self):
'''
@@ -373,7 +396,10 @@
self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
self.config_handler,
self.command_handler)
+
self._cc.start()
+ config_data = self._cc.get_full_config()
+ self._max_transfers_in = config_data.get("transfers_in")
def _cc_check_command(self):
'''
@@ -384,12 +410,17 @@
def config_handler(self, new_config):
# TODO, process new config data
+ self._max_transfers_in = new_config.get("transfers_in") or self._max_transfers_in
return create_answer(0)
def shutdown(self):
''' shutdown the xfrin process. the thread which is doing xfrin should be
terminated.
'''
+ self._filter_hash(self._socket_pairs)
+ for fd in self._socket_pairs.keys():
+ fd.send(b"shutdown")
+
self._shutdown_event.set()
main_thread = threading.currentThread()
for th in threading.enumerate():
@@ -397,12 +428,12 @@
continue
th.join()
-
def command_handler(self, command, args):
answer = create_answer(0)
try:
if command == 'shutdown':
self._shutdown_event.set()
+ #self.shutdown()
elif command == 'retransfer' or command == 'refresh':
# The default RR class is IN. We should fix this so that
# the class is passed in the command arg (where we specify
@@ -456,26 +487,45 @@
while not self._shutdown_event.is_set():
self._cc_check_command()
+ def _filter_hash(self, hash):
+ '''delete zone_name in self._zones or a socket in self._socket_pairs.'''
+ keys = []
+ for key in hash.keys():
+ keys.append(key)
+
+ for key in keys:
+ if not (hash[key]).is_alive():
+ del hash[key]
+
def xfrin_start(self, zone_name, rrclass, db_file, master_addrinfo,
check_soa = True):
if "libdns_python" not in sys.modules:
return (1, "xfrin failed, can't load dns message python library: 'libdns_python'")
# check max_transfer_in, else return quota error
- if self.recorder.count() >= self._max_transfers_in:
- return (1, 'xfrin quota error')
-
- if self.recorder.xfrin_in_progress(zone_name):
- return (1, 'zone xfrin is in progress')
-
+ if len(self._zones) >= self._max_transfers_in:
+ self._filter_hash(self._zones)
+ self._filter_hash(self._socket_pairs)
+ if len(self._zones) >= self._max_transfers_in:
+ return (1, 'xfrin quota error')
+
+ # check whether the zone xfrin is in progress.
+ if zone_name in self._zones.keys():
+ if not (self._zones[zone_name]).is_alive():
+ del self._zones[zone_name]
+ else:
+ return (1, 'zone xfrin is in progress')
+ socket_pair = socket.socketpair()
xfrin_thread = threading.Thread(target = process_xfrin,
- args = (self.recorder,
- zone_name, rrclass,
+ args = (zone_name, rrclass,
db_file,
self._shutdown_event,
- master_addrinfo, check_soa,
+ master_addrinfo, check_soa, socket_pair[1],
self._verbose))
+ # recored the zone name which zone xfrin is in process
+ self._zones[zone_name] = xfrin_thread
+ self._socket_pairs[socket_pair[0]] = xfrin_thread
xfrin_thread.start()
return (0, 'zone xfrin is started')
More information about the bind10-changes
mailing list