BIND 10 master, updated. 0047c74393959975ffd9f75a687a60f1f1c42a9c [master] update ChangeLog entry for #419
BIND 10 source code commits
bind10-changes at lists.isc.org
Wed Mar 2 12:31:00 UTC 2011
The branch, master has been updated
via 0047c74393959975ffd9f75a687a60f1f1c42a9c (commit)
via 1d60afb59e9606f312caef352ecb2fe488c4e751 (commit)
via 9c22af762d0cb6cdcb0bcbcea2b302b1165a0f66 (commit)
via 543f406610c4fc2c0236f8f9c7fdc37014937fb6 (commit)
via 2f0de2e7a8d594fd40c4fb2449232bb5cd64efa9 (commit)
via dadeedb633df1f3793c32fa283c82f22fcdb7ff4 (commit)
via 91193e42d664fbb494a15cdf5b01a0c5da19d0b7 (commit)
via 40f74edaaf73a8a5a7798fd79646e2279b82b5cc (commit)
via 9abd2de988dbd33bac4149e0d2cb1e4fec55413e (commit)
via 4548257a1d70b64890433443d156d62a27fcc32a (commit)
via 06fdc8c5bff48e8cd0fa093dce018af40bdaa668 (commit)
from 0a9ce967a515894bd7c46cf86a6ff4bc3d856b3a (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log -----------------------------------------------------------------
commit 0047c74393959975ffd9f75a687a60f1f1c42a9c
Author: chenzhengzhang <jerry.zzpku at gmail.com>
Date: Wed Mar 2 20:30:30 2011 +0800
[master] update ChangeLog entry for #419
commit 1d60afb59e9606f312caef352ecb2fe488c4e751
Author: chenzhengzhang <jerry.zzpku at gmail.com>
Date: Wed Mar 2 20:27:28 2011 +0800
[master] merge #419 : Parallel xfrout session should be allowed
-----------------------------------------------------------------------
Summary of changes:
ChangeLog | 6 +
src/bin/xfrout/tests/xfrout_test.py | 29 ++--
src/bin/xfrout/xfrout.py.in | 183 ++++++++++++--------
src/lib/python/isc/util/socketserver_mixin.py | 6 +-
.../isc/util/tests/socketserver_mixin_test.py | 2 +-
5 files changed, 131 insertions(+), 95 deletions(-)
-----------------------------------------------------------------------
diff --git a/ChangeLog b/ChangeLog
index 76a852b..726a578 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,9 @@
+ 183. [bug] jerry
+ src/bin/xfrout: Enable parallel sessions between xfrout server and
+ muti-Auth. The session needs to be created only on the first time
+ or if an error occur.
+ (Trac #419, git 1d60afb59e9606f312caef352ecb2fe488c4e751)
+
182. [func] jinmei
Support cppcheck for static code check on C++ code. If cppcheck
is available, 'make cppcheck' on the top source directory will run
diff --git a/src/bin/xfrout/tests/xfrout_test.py b/src/bin/xfrout/tests/xfrout_test.py
index 55a2e52..5aec072 100644
--- a/src/bin/xfrout/tests/xfrout_test.py
+++ b/src/bin/xfrout/tests/xfrout_test.py
@@ -85,23 +85,12 @@ class TestXfroutSession(unittest.TestCase):
return msg
def setUp(self):
- request = MySocket(socket.AF_INET,socket.SOCK_STREAM)
+ self.sock = MySocket(socket.AF_INET,socket.SOCK_STREAM)
self.log = isc.log.NSLogger('xfrout', '', severity = 'critical', log_to_console = False )
- (self.write_sock, self.read_sock) = socket.socketpair()
- self.xfrsess = MyXfroutSession(request, None, None, self.log, self.read_sock)
- self.xfrsess.server = Dbserver()
+ self.xfrsess = MyXfroutSession(self.sock, None, Dbserver(), self.log)
self.mdata = bytes(b'\xd6=\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
- self.sock = MySocket(socket.AF_INET,socket.SOCK_STREAM)
self.soa_record = (4, 3, 'example.com.', 'com.example.', 3600, 'SOA', None, 'master.example.com. admin.example.com. 1234 3600 1800 2419200 7200')
- def test_receive_query_message(self):
- send_msg = b"\xd6=\x00\x00\x00\x01\x00"
- msg_len = struct.pack('H', socket.htons(len(send_msg)))
- self.write_sock.send(msg_len)
- self.write_sock.send(send_msg)
- recv_msg = self.xfrsess._receive_query_message(self.read_sock)
- self.assertEqual(recv_msg, send_msg)
-
def test_parse_query_message(self):
[get_rcode, get_msg] = self.xfrsess._parse_query_message(self.mdata)
self.assertEqual(get_rcode.to_text(), "NOERROR")
@@ -157,7 +146,6 @@ class TestXfroutSession(unittest.TestCase):
self.assertTrue(msg.get_header_flag(Message.HEADERFLAG_AA))
def test_reply_query_with_format_error(self):
-
msg = self.getmsg()
self.xfrsess._reply_query_with_format_error(msg, self.sock)
get_msg = self.sock.read_msg()
@@ -272,11 +260,11 @@ class TestXfroutSession(unittest.TestCase):
self.xfrsess._zone_has_soa = zone_empty
def false_func():
return False
- self.xfrsess.server.increase_transfers_counter = false_func
+ self.xfrsess._server.increase_transfers_counter = false_func
self.assertEqual(self.xfrsess._check_xfrout_available(True).to_text(), "REFUSED")
def true_func():
return True
- self.xfrsess.server.increase_transfers_counter = true_func
+ self.xfrsess._server.increase_transfers_counter = true_func
self.assertEqual(self.xfrsess._check_xfrout_available(True).to_text(), "NOERROR")
def test_dns_xfrout_start_formerror(self):
@@ -346,8 +334,17 @@ class MyUnixSockServer(UnixSockServer):
class TestUnixSockServer(unittest.TestCase):
def setUp(self):
+ self.write_sock, self.read_sock = socket.socketpair()
self.unix = MyUnixSockServer()
+ def test_receive_query_message(self):
+ send_msg = b"\xd6=\x00\x00\x00\x01\x00"
+ msg_len = struct.pack('H', socket.htons(len(send_msg)))
+ self.write_sock.send(msg_len)
+ self.write_sock.send(send_msg)
+ recv_msg = self.unix._receive_query_message(self.read_sock)
+ self.assertEqual(recv_msg, send_msg)
+
def test_updata_config_data(self):
self.unix.update_config_data({'transfers_out':10 })
self.assertEqual(self.unix._max_transfers_out, 10)
diff --git a/src/bin/xfrout/xfrout.py.in b/src/bin/xfrout/xfrout.py.in
index a819640..fd1288d 100755
--- a/src/bin/xfrout/xfrout.py.in
+++ b/src/bin/xfrout/xfrout.py.in
@@ -73,75 +73,25 @@ def get_rrset_len(rrset):
return len(bytes)
-class XfroutSession(BaseRequestHandler):
- def __init__(self, request, client_address, server, log, sock):
+class XfroutSession():
+ def __init__(self, sock_fd, request_data, server, log):
# The initializer for the superclass may call functions
# that need _log to be set, so we set it first
+ self._sock_fd = sock_fd
+ self._request_data = request_data
+ self._server = server
self._log = log
- self._shutdown_sock = sock
- BaseRequestHandler.__init__(self, request, client_address, server)
+ self.handle()
def handle(self):
- '''Handle a request until shutdown or xfrout client is closed.'''
- # check self.server._shutdown_event to ensure the real shutdown comes.
- # Linux could trigger a spurious readable event on the _shutdown_sock
- # due to a bug, so we need perform a double check.
- while not self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
- try:
- (rlist, wlist, xlist) = select.select([self._shutdown_sock, self.request], [], [])
- except select.error as e:
- if e.args[0] == errno.EINTR:
- (rlist, wlist, xlist) = ([], [], [])
- continue
- else:
- self._log.log_message("error", "Error with select(): %s" %e)
- break
- # self.server._shutdown_evnet will be set by now, if it is not a false
- # alarm
- if self._shutdown_sock in rlist:
- continue
-
- sock_fd = recv_fd(self.request.fileno())
-
- if sock_fd < 0:
- # This may happen when one xfrout process try to connect to
- # xfrout unix socket server, to check whether there is another
- # xfrout running.
- if sock_fd == XFR_FD_RECEIVE_FAIL:
- self._log.log_message("error", "Failed to receive the file descriptor for XFR connection")
- break
-
- # receive query msg
- msgdata = self._receive_query_message(self.request)
- if not msgdata:
- break
-
- try:
- self.dns_xfrout_start(sock_fd, msgdata)
- #TODO, avoid catching all exceptions
- except Exception as e:
- self._log.log_message("error", str(e))
-
- os.close(sock_fd)
-
- def _receive_query_message(self, sock):
- ''' receive query message from sock'''
- # receive data length
- data_len = sock.recv(2)
- if not data_len:
- return None
- msg_len = struct.unpack('!H', data_len)[0]
- # receive data
- recv_size = 0
- msgdata = b''
- while recv_size < msg_len:
- data = sock.recv(msg_len - recv_size)
- if not data:
- return None
- recv_size += len(data)
- msgdata += data
+ ''' Handle a xfrout query, send xfrout response '''
+ try:
+ self.dns_xfrout_start(self._sock_fd, self._request_data)
+ #TODO, avoid catching all exceptions
+ except Exception as e:
+ self._log.log_message("error", str(e))
- return msgdata
+ os.close(self._sock_fd)
def _parse_query_message(self, mdata):
''' parse query message to [socket,message]'''
@@ -195,7 +145,6 @@ class XfroutSession(BaseRequestHandler):
msg.set_rcode(Rcode.FORMERR())
self._send_message(sock_fd, msg)
-
def _zone_has_soa(self, zone):
'''Judge if the zone has an SOA record.'''
# In some sense, the SOA defines a zone.
@@ -203,7 +152,7 @@ class XfroutSession(BaseRequestHandler):
# specific zone, we need to judge if the zone has an SOA record;
# if not, we consider the zone has incomplete data, so xfrout can't
# serve for it.
- if sqlite3_ds.get_zone_soa(zone, self.server.get_db_file()):
+ if sqlite3_ds.get_zone_soa(zone, self._server.get_db_file()):
return True
return False
@@ -215,7 +164,7 @@ class XfroutSession(BaseRequestHandler):
# authority for the specific zone.
# TODO: should get zone's configuration from cfgmgr or other place
# in future.
- return sqlite3_ds.zone_exist(zonename, self.server.get_db_file())
+ return sqlite3_ds.zone_exist(zonename, self._server.get_db_file())
def _check_xfrout_available(self, zone_name):
'''Check if xfr request can be responsed.
@@ -234,7 +183,7 @@ class XfroutSession(BaseRequestHandler):
return Rcode.SERVFAIL()
#TODO, check allow_transfer
- if not self.server.increase_transfers_counter():
+ if not self._server.increase_transfers_counter():
return Rcode.REFUSED()
return Rcode.NOERROR()
@@ -260,7 +209,7 @@ class XfroutSession(BaseRequestHandler):
except Exception as err:
self._log.log_message("error", str(err))
- self.server.decrease_transfers_counter()
+ self._server.decrease_transfers_counter()
return
@@ -307,14 +256,14 @@ class XfroutSession(BaseRequestHandler):
#TODO, there should be a better way to insert rrset.
msg.make_response()
msg.set_header_flag(Message.HEADERFLAG_AA)
- soa_record = sqlite3_ds.get_zone_soa(zone_name, self.server.get_db_file())
+ soa_record = sqlite3_ds.get_zone_soa(zone_name, self._server.get_db_file())
rrset_soa = self._create_rrset_from_db_record(soa_record)
msg.add_rrset(Message.SECTION_ANSWER, rrset_soa)
message_upper_len = get_rrset_len(rrset_soa)
- for rr_data in sqlite3_ds.get_zone_datas(zone_name, self.server.get_db_file()):
- if self.server._shutdown_event.is_set(): # Check if xfrout is shutdown
+ for rr_data in sqlite3_ds.get_zone_datas(zone_name, self._server.get_db_file()):
+ if self._server._shutdown_event.is_set(): # Check if xfrout is shutdown
self._log.log_message("info", "xfrout process is being shutdown")
return
@@ -356,9 +305,93 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn, ThreadingUnixStreamServer):
self.update_config_data(config_data)
self._cc = cc
- def finish_request(self, request, client_address):
+ def _receive_query_message(self, sock):
+ ''' receive request message from sock'''
+ # receive data length
+ data_len = sock.recv(2)
+ if not data_len:
+ return None
+ msg_len = struct.unpack('!H', data_len)[0]
+ # receive data
+ recv_size = 0
+ msgdata = b''
+ while recv_size < msg_len:
+ data = sock.recv(msg_len - recv_size)
+ if not data:
+ return None
+ recv_size += len(data)
+ msgdata += data
+
+ return msgdata
+
+ def handle_request(self):
+ ''' Enable server handle a request until shutdown or auth is closed.'''
+ try:
+ request, client_address = self.get_request()
+ except socket.error:
+ self._log.log_message("error", "Failed to fetch request")
+ return
+
+ # Check self._shutdown_event to ensure the real shutdown comes.
+ # Linux could trigger a spurious readable event on the _read_sock
+ # due to a bug, so we need perform a double check.
+ while not self._shutdown_event.is_set(): # Check if xfrout is shutdown
+ try:
+ (rlist, wlist, xlist) = select.select([self._read_sock, request], [], [])
+ except select.error as e:
+ if e.args[0] == errno.EINTR:
+ (rlist, wlist, xlist) = ([], [], [])
+ continue
+ else:
+ self._log.log_message("error", "Error with select(): %s" %e)
+ break
+
+ # self.server._shutdown_event will be set by now, if it is not a false
+ # alarm
+ if self._read_sock in rlist:
+ continue
+
+ try:
+ self.process_request(request)
+ except:
+ self._log.log_message("error", "Exception happened during processing of %s"
+ % str(client_address))
+ break
+
+ def _handle_request_noblock(self):
+ """Override the function _handle_request_noblock(), it creates a new
+ thread to handle requests for each auth"""
+ td = threading.Thread(target=self.handle_request)
+ td.setDaemon(True)
+ td.start()
+
+ def process_request(self, request):
+ """Receive socket fd and query message from auth, then
+ start a new thread to process the request."""
+ sock_fd = recv_fd(request.fileno())
+ if sock_fd < 0:
+ # This may happen when one xfrout process try to connect to
+ # xfrout unix socket server, to check whether there is another
+ # xfrout running.
+ if sock_fd == XFR_FD_RECEIVE_FAIL:
+ self._log.log_message("error", "Failed to receive the file descriptor for XFR connection")
+ return
+
+ # receive request msg
+ request_data = self._receive_query_message(request)
+ if not request_data:
+ return
+
+ t = threading.Thread(target = self.finish_request,
+ args = (sock_fd, request_data))
+ if self.daemon_threads:
+ t.daemon = True
+ t.start()
+
+
+ def finish_request(self, sock_fd, request_data):
'''Finish one request by instantiating RequestHandlerClass.'''
- self.RequestHandlerClass(request, client_address, self, self._log, self._read_sock)
+ self.RequestHandlerClass(sock_fd, request_data, self, self._log)
def _remove_unused_sock_file(self, sock_file):
'''Try to remove the socket file. If the file is being used
@@ -376,7 +409,7 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn, ThreadingUnixStreamServer):
try:
os.unlink(sock_file)
except OSError as err:
- self._log.log_message("error", '[b10-xfrout] Fail to remove file %s: %s\n' % (sock_file, err))
+ self._log.log_message("error", "[b10-xfrout] Fail to remove file %s: %s\n" % (sock_file, err))
sys.exit(0)
def _sock_file_in_use(self, sock_file):
@@ -397,7 +430,7 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn, ThreadingUnixStreamServer):
try:
os.unlink(self._sock_file)
except Exception as e:
- self._log.log_message("error", str(e))
+ self._log.log_message('error', str(e))
def update_config_data(self, new_config):
'''Apply the new config setting of xfrout module. '''
diff --git a/src/lib/python/isc/util/socketserver_mixin.py b/src/lib/python/isc/util/socketserver_mixin.py
index fb5f9a2..e954fe1 100644
--- a/src/lib/python/isc/util/socketserver_mixin.py
+++ b/src/lib/python/isc/util/socketserver_mixin.py
@@ -64,7 +64,7 @@ class NoPollMixIn:
called in anther thread. Note, parameter 'poll_interval' is
just used for interface compatibility; it's never used in this
function.
- '''
+ '''
while True:
# block until the self.socket or self.__read_sock is readable
try:
@@ -74,11 +74,11 @@ class NoPollMixIn:
continue
else:
break
-
+
if self.__read_sock in r:
break
else:
- self._handle_request_noblock()
+ self._handle_request_noblock();
self._is_shut_down.set()
diff --git a/src/lib/python/isc/util/tests/socketserver_mixin_test.py b/src/lib/python/isc/util/tests/socketserver_mixin_test.py
index 61bc248..a6686d8 100644
--- a/src/lib/python/isc/util/tests/socketserver_mixin_test.py
+++ b/src/lib/python/isc/util/tests/socketserver_mixin_test.py
@@ -25,7 +25,7 @@ class MyHandler(socketserver.BaseRequestHandler):
data = self.request.recv(20)
self.request.send(data)
-class MyServer(NoPollMixIn,
+class MyServer(NoPollMixIn,
socketserver.ThreadingMixIn,
socketserver.TCPServer):
More information about the bind10-changes
mailing list