[svn] commit: r3455 - in /branches/trac299/src: bin/xfrout/tests/xfrout_test.py bin/xfrout/xfrout.py.in lib/xfr/fd_share.cc lib/xfr/fd_share.h lib/xfr/fdshare_python.cc
BIND 10 source code commits
bind10-changes at lists.isc.org
Fri Nov 5 04:59:32 UTC 2010
Author: chenzhengzhang
Date: Fri Nov 5 04:59:31 2010
New Revision: 3455
Log:
update xfrout according to review comments
Modified:
branches/trac299/src/bin/xfrout/tests/xfrout_test.py
branches/trac299/src/bin/xfrout/xfrout.py.in
branches/trac299/src/lib/xfr/fd_share.cc
branches/trac299/src/lib/xfr/fd_share.h
branches/trac299/src/lib/xfr/fdshare_python.cc
Modified: branches/trac299/src/bin/xfrout/tests/xfrout_test.py
==============================================================================
--- branches/trac299/src/bin/xfrout/tests/xfrout_test.py (original)
+++ branches/trac299/src/bin/xfrout/tests/xfrout_test.py Fri Nov 5 04:59:31 2010
@@ -63,6 +63,13 @@
def handle(self):
pass
+ def _send_data(self, sock, data):
+ size = len(data)
+ total_count = 0
+ while total_count < size:
+ count = sock.send(data[total_count:])
+ total_count += count
+
class Dbserver:
def __init__(self):
self._shutdown_event = threading.Event()
@@ -80,12 +87,20 @@
def setUp(self):
request = MySocket(socket.AF_INET,socket.SOCK_STREAM)
self.log = isc.log.NSLogger('xfrout', '', severity = 'critical', log_to_console = False )
- (self.mastsock, self.slavesock) = socket.socketpair()
- self.xfrsess = MyXfroutSession(request, None, None, self.log, self.slavesock)
+ (self.write_sock, self.read_sock) = socket.socketpair()
+ self.xfrsess = MyXfroutSession(request, None, None, self.log, self.read_sock)
self.xfrsess.server = Dbserver()
self.mdata = bytes(b'\xd6=\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\xfc\x00\x01')
self.sock = MySocket(socket.AF_INET,socket.SOCK_STREAM)
self.soa_record = (4, 3, 'example.com.', 'com.example.', 3600, 'SOA', None, 'master.example.com. admin.example.com. 1234 3600 1800 2419200 7200')
+
+ def test_receive_query_message(self):
+ send_msg = b"\xd6=\x00\x00\x00\x01\x00"
+ msg_len = struct.pack('H', socket.htons(len(send_msg)))
+ self.write_sock.send(msg_len)
+ self.write_sock.send(send_msg)
+ recv_msg = self.xfrsess._receive_query_message(self.read_sock)
+ self.assertEqual(recv_msg, send_msg)
def test_parse_query_message(self):
[get_rcode, get_msg] = self.xfrsess._parse_query_message(self.mdata)
Modified: branches/trac299/src/bin/xfrout/xfrout.py.in
==============================================================================
--- branches/trac299/src/bin/xfrout/xfrout.py.in (original)
+++ branches/trac299/src/bin/xfrout/xfrout.py.in Fri Nov 5 04:59:31 2010
@@ -85,47 +85,53 @@
try:
rlist, wlist, xlist = select.select([self._shutdown_sock, self.request], [], [])
except select.error as e:
- if e.args[0] == errno.EINTR:
- (rlist, wlist, xlist) = ([], [], [])
- else:
- sys.stderr.write("[b10-xfrout] Error with select(): %s\n" %e)
- break
+ if not e.args[0] == errno.EINTR:
+ self._log.log_message("error", "Error with select(): %s" %e)
+ break
if self._shutdown_sock in rlist:
continue
- fd = recv_fd(self.request.fileno())
-
- if fd < 0:
+ sock_fd = recv_fd(self.request.fileno())
+
+ if sock_fd < 0:
# This may happen when one xfrout process try to connect to
# xfrout unix socket server, to check whether there is another
# xfrout running.
- if fd == XFR_FD_REV_FAIL:
+ if sock_fd == XFR_FD_RECEIVE_FAIL:
self._log.log_message("error", "Failed to receive the file descriptor for XFR connection")
break
- data_len = self.request.recv(2)
- if not data_len:
- break
- msg_len = struct.unpack('!H', data_len)[0]
- msgdata = self.request.recv(msg_len)
+ # receive query msg
+ msgdata = self._receive_query_message(self.request)
if not msgdata:
break
- sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+
try:
- self.dns_xfrout_start(sock, msgdata)
+ self.dns_xfrout_start(sock_fd, msgdata)
#TODO, avoid catching all exceptions
except Exception as e:
self._log.log_message("error", str(e))
- try:
- sock.shutdown(socket.SHUT_RDWR)
- except socket.error:
- # Avoid socket error caused by shutting down
- # one non-connected socket.
- pass
-
- sock.close()
- os.close(fd)
+ os.close(sock_fd)
+
+ def _receive_query_message(self, sock):
+ ''' receive query message from sock'''
+ # receive data length
+ data_len = sock.recv(2)
+ if not data_len:
+ return None
+ msg_len = struct.unpack('!H', data_len)[0]
+ # receive data
+ recv_size = 0
+ msgdata = b''
+ while recv_size < msg_len:
+ data = sock.recv(msg_len - recv_size)
+ if not data:
+ return None
+ recv_size += len(data)
+ msgdata += data
+
+ return msgdata
def _parse_query_message(self, mdata):
''' parse query message to [socket,message]'''
@@ -144,37 +150,37 @@
return question.get_name().to_text()
- def _send_data(self, sock, data):
+ def _send_data(self, sock_fd, data):
size = len(data)
total_count = 0
while total_count < size:
- count = sock.send(data[total_count:])
+ count = os.write(sock_fd, data[total_count:])
total_count += count
- def _send_message(self, sock, msg):
+ def _send_message(self, sock_fd, msg):
render = MessageRenderer()
render.set_length_limit(XFROUT_MAX_MESSAGE_SIZE)
msg.to_wire(render)
header_len = struct.pack('H', socket.htons(render.get_length()))
- self._send_data(sock, header_len)
- self._send_data(sock, render.get_data())
-
-
- def _reply_query_with_error_rcode(self, msg, sock, rcode_):
+ self._send_data(sock_fd, header_len)
+ self._send_data(sock_fd, render.get_data())
+
+
+ def _reply_query_with_error_rcode(self, msg, sock_fd, rcode_):
msg.make_response()
msg.set_rcode(rcode_)
- self._send_message(sock, msg)
-
-
- def _reply_query_with_format_error(self, msg, sock):
+ self._send_message(sock_fd, msg)
+
+
+ def _reply_query_with_format_error(self, msg, sock_fd):
'''query message format isn't legal.'''
if not msg:
return # query message is invalid. send nothing back.
msg.make_response()
msg.set_rcode(Rcode.FORMERR())
- self._send_message(sock, msg)
+ self._send_message(sock_fd, msg)
def _zone_is_empty(self, zone):
@@ -210,22 +216,22 @@
return Rcode.NOERROR()
- def dns_xfrout_start(self, sock, msg_query):
+ def dns_xfrout_start(self, sock_fd, msg_query):
rcode_, msg = self._parse_query_message(msg_query)
#TODO. create query message and parse header
if rcode_ != Rcode.NOERROR():
- return self._reply_query_with_format_error(msg, sock)
+ return self._reply_query_with_format_error(msg, sock_fd)
zone_name = self._get_query_zone_name(msg)
rcode_ = self._check_xfrout_available(zone_name)
if rcode_ != Rcode.NOERROR():
self._log.log_message("info", "transfer of '%s/IN' failed: %s",
zone_name, rcode_.to_text())
- return self. _reply_query_with_error_rcode(msg, sock, rcode_)
+ return self. _reply_query_with_error_rcode(msg, sock_fd, rcode_)
try:
self._log.log_message("info", "transfer of '%s/IN': AXFR started" % zone_name)
- self._reply_xfrout_query(msg, sock, zone_name)
+ self._reply_xfrout_query(msg, sock_fd, zone_name)
self._log.log_message("info", "transfer of '%s/IN': AXFR end" % zone_name)
except Exception as err:
self._log.log_message("error", str(err))
@@ -257,7 +263,7 @@
rrset_.add_rdata(rdata_)
return rrset_
- def _send_message_with_last_soa(self, msg, sock, rrset_soa, message_upper_len):
+ def _send_message_with_last_soa(self, msg, sock_fd, rrset_soa, message_upper_len):
'''Add the SOA record to the end of message. If it can't be
added, a new message should be created to send out the last soa .
'''
@@ -266,14 +272,14 @@
if message_upper_len + rrset_len < XFROUT_MAX_MESSAGE_SIZE:
msg.add_rrset(Section.ANSWER(), rrset_soa)
else:
- self._send_message(sock, msg)
+ self._send_message(sock_fd, msg)
msg = self._clear_message(msg)
msg.add_rrset(Section.ANSWER(), rrset_soa)
- self._send_message(sock, msg)
-
-
- def _reply_xfrout_query(self, msg, sock, zone_name):
+ self._send_message(sock_fd, msg)
+
+
+ def _reply_xfrout_query(self, msg, sock_fd, zone_name):
#TODO, there should be a better way to insert rrset.
msg.make_response()
msg.set_header_flag(MessageFlag.AA())
@@ -303,12 +309,12 @@
message_upper_len += rrset_len
continue
- self._send_message(sock, msg)
+ self._send_message(sock_fd, msg)
msg = self._clear_message(msg)
msg.add_rrset(Section.ANSWER(), rrset_) # Add the rrset to the new message
message_upper_len = rrset_len
- self._send_message_with_last_soa(msg, sock, rrset_soa, message_upper_len)
+ self._send_message_with_last_soa(msg, sock_fd, rrset_soa, message_upper_len)
class UnixSockServer(ThreadingUnixStreamServer):
'''The unix domain socket server which accept xfr query sent from auth server.'''
@@ -320,14 +326,14 @@
self._lock = threading.Lock()
self._transfers_counter = 0
self._shutdown_event = shutdown_event
- self._master_sock, self._slave_sock = socket.socketpair()
+ self._write_sock, self._read_sock = socket.socketpair()
self._log = log
self.update_config_data(config_data)
self._cc = cc
def finish_request(self, request, client_address):
'''Finish one request by instantiating RequestHandlerClass.'''
- self.RequestHandlerClass(request, client_address, self, self._log, self._slave_sock)
+ self.RequestHandlerClass(request, client_address, self, self._log, self._read_sock)
def _remove_unused_sock_file(self, sock_file):
'''Try to remove the socket file. If the file is being used
@@ -335,8 +341,8 @@
If it's not a socket file or nobody is listening
, it will be removed. If it can't be removed, exit from python. '''
if self._sock_file_in_use(sock_file):
- sys.stderr.write("[b10-xfrout] Fail to start xfrout process, unix socket"
- " file '%s' is being used by another xfrout process\n" % sock_file)
+ self._log.log_message("error", "Fail to start xfrout process, unix socket file '%s'"
+ " is being used by another xfrout process\n" % sock_file)
sys.exit(0)
else:
if not os.path.exists(sock_file):
@@ -345,7 +351,7 @@
try:
os.unlink(sock_file)
except OSError as err:
- sys.stderr.write('[b10-xfrout] Fail to remove file %s: %s\n' % (sock_file, err))
+ self._log.log_message("error", '[b10-xfrout] Fail to remove file %s: %s\n' % (sock_file, err))
sys.exit(0)
def _sock_file_in_use(self, sock_file):
@@ -361,7 +367,7 @@
return True
def shutdown(self):
- self._master_sock.send(b"shutdown") #terminate the xfrout session thread
+ self._write_sock.send(b"shutdown") #terminate the xfrout session thread
ThreadingUnixStreamServer.shutdown(self)
try:
os.unlink(self._sock_file)
Modified: branches/trac299/src/lib/xfr/fd_share.cc
==============================================================================
--- branches/trac299/src/lib/xfr/fd_share.cc (original)
+++ branches/trac299/src/lib/xfr/fd_share.cc Fri Nov 5 04:59:31 2010
@@ -88,12 +88,12 @@
msghdr.msg_controllen = cmsg_space(sizeof(int));
msghdr.msg_control = malloc(msghdr.msg_controllen);
if (msghdr.msg_control == NULL) {
- return (XFR_FD_REV_FAIL);
+ return (-1);
}
if (recvmsg(sock, &msghdr, 0) < 0) {
free(msghdr.msg_control);
- return (XFR_FD_REV_FAIL);
+ return (XFR_FD_RECEIVE_FAIL);
}
const struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msghdr);
int fd = -1;
Modified: branches/trac299/src/lib/xfr/fd_share.h
==============================================================================
--- branches/trac299/src/lib/xfr/fd_share.h (original)
+++ branches/trac299/src/lib/xfr/fd_share.h Fri Nov 5 04:59:31 2010
@@ -20,10 +20,12 @@
namespace isc {
namespace xfr {
-const int XFR_FD_REV_FAIL = -2;
+// Failed to receive xfr socket descriptor "fd" on unix domain socket 'sock'
+const int XFR_FD_RECEIVE_FAIL = -2;
// Receive socket descriptor on unix domain socket 'sock'.
// Returned value is the socket descriptor received.
+// Returned XFR_FD_RECEIVE_FAIL if failed to receive xfr socket descriptor
// Errors are indicated by a return value of -1.
int recv_fd(const int sock);
@@ -37,6 +39,6 @@
#endif
-// Local Variables:
+// Local Variables:
// mode: c++
-// End:
+// End:
Modified: branches/trac299/src/lib/xfr/fdshare_python.cc
==============================================================================
--- branches/trac299/src/lib/xfr/fdshare_python.cc (original)
+++ branches/trac299/src/lib/xfr/fdshare_python.cc Fri Nov 5 04:59:31 2010
@@ -22,7 +22,6 @@
#include <xfr/fd_share.h>
-static PyObject *XFR_FD_REV_FAIL;
static PyObject*
fdshare_recv_fd(PyObject *self UNUSED_PARAM, PyObject *args) {
@@ -70,8 +69,11 @@
return (NULL);
}
- XFR_FD_REV_FAIL = Py_BuildValue("i", isc::xfr::XFR_FD_REV_FAIL);
- PyModule_AddObject(mod, "XFR_FD_REV_FAIL", XFR_FD_REV_FAIL);
+ PyObject *XFR_FD_RECEIVE_FAIL = Py_BuildValue("i", isc::xfr::XFR_FD_RECEIVE_FAIL);
+ int ret = PyModule_AddObject(mod, "XFR_FD_RECEIVE_FAIL", XFR_FD_RECEIVE_FAIL);
+ if (-1 == ret) {
+ return (NULL);
+ }
return (mod);
}
More information about the bind10-changes
mailing list