[svn] commit: r1250 - in /trunk: ./ ext/boost/date_time/ ext/boost/functional/ ext/boost/integer/ ext/boost/optional/ ext/boost/regex/ ext/boost/system/ src/bin/auth/ src/lib/cc/ src/lib/config/ src/lib/dns/ src/lib/dns/rdata/generic/ src/lib/dns/tests/

BIND 10 source code commits bind10-changes at lists.isc.org
Tue Mar 9 22:52:16 UTC 2010


Author: jinmei
Date: Tue Mar  9 22:52:15 2010
New Revision: 1250

Log:
merged the jinmei-asio (+ jelte-tcp) branch, enabling TCP transport support.

Added:
    trunk/ext/boost/date_time/
      - copied from r1249, branches/jinmei-asio/ext/boost/date_time/
    trunk/ext/boost/functional/
      - copied from r1249, branches/jinmei-asio/ext/boost/functional/
    trunk/ext/boost/integer/
      - copied from r1249, branches/jinmei-asio/ext/boost/integer/
    trunk/ext/boost/optional/
      - copied from r1249, branches/jinmei-asio/ext/boost/optional/
    trunk/ext/boost/regex/
      - copied from r1249, branches/jinmei-asio/ext/boost/regex/
    trunk/ext/boost/system/
      - copied from r1249, branches/jinmei-asio/ext/boost/system/
Removed:
    trunk/src/bin/auth/common.cc
Modified:
    trunk/   (props changed)
    trunk/configure.ac
    trunk/src/bin/auth/Makefile.am
    trunk/src/bin/auth/auth_srv.cc
    trunk/src/bin/auth/auth_srv.h
    trunk/src/bin/auth/common.h
    trunk/src/bin/auth/main.cc
    trunk/src/lib/cc/   (props changed)
    trunk/src/lib/cc/session.cc
    trunk/src/lib/cc/session.h
    trunk/src/lib/config/ccsession.cc
    trunk/src/lib/config/ccsession.h
    trunk/src/lib/dns/   (props changed)
    trunk/src/lib/dns/message.cc
    trunk/src/lib/dns/message.h
    trunk/src/lib/dns/rdata/generic/rrsig_46.cc   (props changed)
    trunk/src/lib/dns/tests/   (props changed)
    trunk/src/lib/dns/tests/message_unittest.cc

Modified: trunk/configure.ac
==============================================================================
--- trunk/configure.ac (original)
+++ trunk/configure.ac Tue Mar  9 22:52:15 2010
@@ -11,6 +11,10 @@
 AC_PROG_CXX
 AC_PROG_CC
 AC_PROG_LIBTOOL
+
+# Use C++ language
+AC_LANG_CPLUSPLUS
+AX_COMPILER_VENDOR
 
 m4_define([_AM_PYTHON_INTERPRETER_LIST], [python python3 python3.1])
 AM_PATH_PYTHON([3.1])
@@ -85,6 +89,50 @@
 	AC_SUBST(GENHTML)
 fi
 AC_SUBST(USE_LCOV)
+
+# Check availability of the Boost System library
+
+AC_MSG_CHECKING([for boost::system library])
+AC_ARG_WITH([boostlib],
+AC_HELP_STRING([--with-boostlib=PATH],
+  [specify a path to boost libraries if it is not automatically found]),
+  [boostlib_path="$withval"], [boostlib_path="no"])
+if test "$boostlib_path" != "no"; then
+	BOOST_LDFLAGS="-L$boostlib_path"
+fi
+
+LDFLAGS_SAVED="$LDFLAGS"
+LIBS_SAVED="$LIBS"
+CPPFLAGS_SAVED="$CPPFLAGS"
+CPPFLAGS="$CPPFLAGS -Iext"
+
+for BOOST_TRY_LIB in boost_system boost_system-mt; do
+	LDFLAGS="$LDFLAGS_SAVED ${BOOST_LDFLAGS}"
+	LIBS="$LIBS_SAVED -l${BOOST_TRY_LIB}"
+	AC_TRY_LINK([#include <boost/system/error_code.hpp>],
+		[ boost::system::error_code error_code;
+		  std::string message(error_code.message());
+		  return 0; ],
+	[ AC_MSG_RESULT(yes)
+	  BOOST_SYSTEM_LIB="-l${BOOST_TRY_LIB}"
+	  ],[])
+	if test "X${BOOST_SYSTEM_LIB}" != X; then
+        	break
+	fi
+done
+
+if test "X${BOOST_SYSTEM_LIB}" = X; then
+	AC_MSG_RESULT(not found)
+else
+	AC_DEFINE(HAVE_BOOSTLIB, 1, Define to 1 if boost libraries are available)
+fi
+
+AM_CONDITIONAL(HAVE_BOOSTLIB, test "X${BOOST_SYSTEM_LIB}" != X)
+LDFLAGS="$LDFLAGS_SAVED"
+CPPFLAGS="$CPPFLAGS_SAVED"
+LIBS="$LIBS_SAVED"
+AC_SUBST(BOOST_LDFLAGS)
+AC_SUBST(BOOST_SYSTEM_LIB)
 
 #
 # Check availability of gtest, which will be used for unit tests.

Modified: trunk/src/bin/auth/Makefile.am
==============================================================================
--- trunk/src/bin/auth/Makefile.am (original)
+++ trunk/src/bin/auth/Makefile.am Tue Mar  9 22:52:15 2010
@@ -6,7 +6,7 @@
 
 pkglibexec_PROGRAMS = b10-auth
 b10_auth_SOURCES = auth_srv.cc auth_srv.h
-b10_auth_SOURCES += common.cc common.h
+b10_auth_SOURCES += common.h
 b10_auth_SOURCES += main.cc
 b10_auth_LDADD =  $(top_builddir)/src/lib/auth/.libs/libauth.a
 b10_auth_LDADD +=  $(top_builddir)/src/lib/dns/.libs/libdns.a
@@ -14,6 +14,10 @@
 b10_auth_LDADD += $(top_builddir)/src/lib/cc/libcc.a
 b10_auth_LDADD += $(top_builddir)/src/lib/exceptions/.libs/libexceptions.a
 b10_auth_LDADD += $(SQLITE_LIBS)
+if HAVE_BOOSTLIB
+b10_auth_LDFLAGS = $(AM_LDFLAGS) $(BOOST_LDFLAGS)
+b10_auth_LDADD += $(BOOST_SYSTEM_LIB)
+endif
 
 # TODO: config.h.in is wrong because doesn't honor pkgdatadir
 # and can't use @datadir@ because doesn't expand default ${prefix}

Modified: trunk/src/bin/auth/auth_srv.cc
==============================================================================
--- trunk/src/bin/auth/auth_srv.cc (original)
+++ trunk/src/bin/auth/auth_srv.cc Tue Mar  9 22:52:15 2010
@@ -85,53 +85,45 @@
     delete impl_;
 }
 
-void
-AuthSrv::processMessage(const int fd)
+int
+AuthSrv::processMessage(InputBuffer& request_buffer,
+                        Message& message,
+                        MessageRenderer& response_renderer,
+                        const bool udp_buffer)
 {
-    struct sockaddr_storage ss;
-    socklen_t sa_len = sizeof(ss);
-    struct sockaddr* sa = static_cast<struct sockaddr*>((void*)&ss);
-    char recvbuf[4096];
-    int cc;
+    try {
+        message.fromWire(request_buffer);
+    } catch (...) {
+        cerr << "[AuthSrv] parse failed" << endl;
+        return (-1);
+    }
 
-    if ((cc = recvfrom(fd, recvbuf, sizeof(recvbuf), 0, sa, &sa_len)) > 0) {
-        Message msg(Message::PARSE);
-        InputBuffer buffer(recvbuf, cc);
+    cout << "[AuthSrv] received a message:\n" << message.toText() << endl;
 
-        try {
-            msg.fromWire(buffer);
-        } catch (...) {
-            cerr << "[AuthSrv] parse failed" << endl;
-            return;
-        }
+    if (message.getRRCount(Section::QUESTION()) != 1) {
+        return (-1);
+    }
 
-        cout << "[AuthSrv] received a message:\n" << msg.toText() << endl;
+    bool dnssec_ok = message.isDNSSECSupported();
+    // unused for now.  should set this to renderer for truncation
+    uint16_t remote_bufsize = message.getUDPSize();
 
-        if (msg.getRRCount(Section::QUESTION()) != 1) {
-            return;
-        }
+    message.makeResponse();
+    message.setHeaderFlag(MessageFlag::AA());
+    message.setRcode(Rcode::NOERROR());
+    message.setDNSSECSupported(dnssec_ok);
+    message.setUDPSize(4096);   // XXX: hardcoding
 
-        bool dnssec_ok = msg.isDNSSECSupported();
-        uint16_t remote_bufsize = msg.getUDPSize();
+    Query query(message, dnssec_ok);
+    impl_->data_sources.doQuery(query);
 
-        msg.makeResponse();
-        msg.setHeaderFlag(MessageFlag::AA());
-        msg.setRcode(Rcode::NOERROR());
-        msg.setDNSSECSupported(dnssec_ok);
-        msg.setUDPSize(sizeof(recvbuf));
+    response_renderer.setLengthLimit(udp_buffer ? remote_bufsize : 65535);
+    message.toWire(response_renderer);
+    cout << "sending a response (" <<
+        boost::lexical_cast<string>(response_renderer.getLength())
+         << " bytes):\n" << message.toText() << endl;
 
-        Query query(msg, dnssec_ok);
-        impl_->data_sources.doQuery(query);
-
-        OutputBuffer obuffer(remote_bufsize);
-        MessageRenderer renderer(obuffer);
-        renderer.setLengthLimit(remote_bufsize);
-        msg.toWire(renderer);
-        cout << "sending a response (" <<
-            boost::lexical_cast<string>(obuffer.getLength())
-                  << " bytes):\n" << msg.toText() << endl;
-        sendto(fd, obuffer.getData(), obuffer.getLength(), 0, sa, sa_len);
-    }
+    return (0);
 }
 
 ElementPtr

Modified: trunk/src/bin/auth/auth_srv.h
==============================================================================
--- trunk/src/bin/auth/auth_srv.h (original)
+++ trunk/src/bin/auth/auth_srv.h Tue Mar  9 22:52:15 2010
@@ -22,6 +22,14 @@
 #include <cc/data.h>
 #include <auth/data_source.h>
 
+namespace isc {
+namespace dns {
+class InputBuffer;
+class Message;
+class MessageRenderer;
+}
+}
+
 class AuthSrvImpl;
 
 class AuthSrv {
@@ -38,7 +46,10 @@
     explicit AuthSrv();
     ~AuthSrv();
     //@}
-    void processMessage(int fd);
+    int processMessage(isc::dns::InputBuffer& request_buffer,
+                       isc::dns::Message& message,
+                       isc::dns::MessageRenderer& response_renderer,
+                       bool udp_buffer);
     void serve(std::string zone_name);
     isc::data::ElementPtr setDbFile(const isc::data::ElementPtr config);
     isc::data::ElementPtr updateConfig(isc::data::ElementPtr config);

Modified: trunk/src/bin/auth/common.h
==============================================================================
--- trunk/src/bin/auth/common.h (original)
+++ trunk/src/bin/auth/common.h Tue Mar  9 22:52:15 2010
@@ -20,13 +20,12 @@
 #include <stdlib.h>
 #include <string>
 
-class FatalError : public std::exception {
+#include <exceptions/exceptions.h>
+
+class FatalError : public isc::Exception {
 public:
-    FatalError(std::string m = "fatal error");
-    ~FatalError() throw() {}
-    const char* what() const throw() { return msg.c_str(); }
-private:
-    std::string msg;
+    FatalError(const char* file, size_t line, const char* what) :
+        isc::Exception(file, line, what) {}
 };
 
 #endif // __COMMON_H

Modified: trunk/src/bin/auth/main.cc
==============================================================================
--- trunk/src/bin/auth/main.cc (original)
+++ trunk/src/bin/auth/main.cc Tue Mar  9 22:52:15 2010
@@ -14,37 +14,55 @@
 
 // $Id$
 
+#include "../../../config.h"
+
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/select.h>
 #include <netdb.h>
-#include <netinet/in.h>  // IPPROTO_UDP
 #include <stdlib.h>
+#include <errno.h>
 
 #include <set>
 #include <iostream>
 
 #include <boost/foreach.hpp>
+#ifdef HAVE_BOOSTLIB
+#include <boost/bind.hpp>
+#include <boost/asio.hpp>
+#endif
+
+#include <exceptions/exceptions.h>
 
 #include <dns/buffer.h>
 #include <dns/name.h>
+#include <dns/message.h>
 #include <dns/rrset.h>
 #include <dns/message.h>
+#include <dns/messagerenderer.h>
 
 #include <cc/session.h>
 #include <cc/data.h>
 #include <config/ccsession.h>
 
+#include "config.h"
 #include "common.h"
-#include "config.h"
 #include "auth_srv.h"
 
 #include <boost/foreach.hpp>
 
 using namespace std;
+
+#ifdef HAVE_BOOSTLIB
+using namespace boost::asio;
+using ip::udp;
+using ip::tcp;
+#endif
+
 using namespace isc::data;
 using namespace isc::cc;
 using namespace isc::config;
+using namespace isc::dns;
 
 namespace {
 const string PROGRAM = "Auth";
@@ -58,19 +76,13 @@
 AuthSrv *auth_server;
 }
 
-static void
-usage() {
-    cerr << "Usage: b10-auth [-p port] [-4|-6]" << endl;
-    exit(1);
-}
-
-ElementPtr
+static ElementPtr
 my_config_handler(ElementPtr new_config)
 {
     return auth_server->updateConfig(new_config);
 }
 
-ElementPtr
+static ElementPtr
 my_command_handler(const string& command, const ElementPtr args) {
     ElementPtr answer = createAnswer(0);
 
@@ -83,8 +95,277 @@
     return answer;
 }
 
+#ifdef HAVE_BOOSTLIB
+//
+// Helper classes for asynchronous I/O using boost::asio
+//
+namespace {
+class Completed {
+public:
+    Completed(size_t len) : len_(len) {}
+    bool operator()(const boost::system::error_code& error,
+                    size_t bytes_transferred) const
+    {
+        return (error != 0 || bytes_transferred >= len_);
+    }
+private:
+    size_t len_;
+};
+
+class TCPClient {
+public:
+    TCPClient(io_service& io_service) :
+        socket_(io_service),
+        response_buffer_(0),
+        responselen_buffer_(TCP_MESSAGE_LENGTHSIZE),
+        response_renderer_(response_buffer_),
+        dns_message_(Message::PARSE)
+    {}
+
+    void start() {
+        async_read(socket_, boost::asio::buffer(data_, TCP_MESSAGE_LENGTHSIZE),
+                   Completed(TCP_MESSAGE_LENGTHSIZE),
+                   boost::bind(&TCPClient::headerRead, this,
+                               placeholders::error,
+                               placeholders::bytes_transferred));
+    }
+
+    tcp::socket& getSocket() { return (socket_); }
+
+    void headerRead(const boost::system::error_code& error,
+                    size_t bytes_transferred)
+    {
+        if (!error) {
+            assert(bytes_transferred == TCP_MESSAGE_LENGTHSIZE);
+            InputBuffer dnsbuffer(data_, TCP_MESSAGE_LENGTHSIZE);
+
+            uint16_t msglen = dnsbuffer.readUint16();
+            async_read(socket_, boost::asio::buffer(data_, msglen),
+                       Completed(msglen),
+                       boost::bind(&TCPClient::requestRead, this,
+                                   placeholders::error,
+                                   placeholders::bytes_transferred));
+        } else {
+            delete this;
+        }
+    }
+
+    void requestRead(const boost::system::error_code& error,
+                     size_t bytes_transferred)
+    {
+        if (!error) {
+            InputBuffer dnsbuffer(data_, bytes_transferred);
+            if (auth_server->processMessage(dnsbuffer, dns_message_,
+                                            response_renderer_, false) == 0) {
+                responselen_buffer_.writeUint16(response_buffer_.getLength());
+                async_write(socket_,
+                            boost::asio::buffer(
+                                responselen_buffer_.getData(),
+                                responselen_buffer_.getLength()),
+                        boost::bind(&TCPClient::responseWrite, this,
+                                    placeholders::error));
+            } else {
+                delete this;
+            }
+        } else {
+            delete this;
+        }
+    }
+
+    void responseWrite(const boost::system::error_code& error)
+    {
+        if (!error) {
+                async_write(socket_,
+                            boost::asio::buffer(response_buffer_.getData(),
+                                                response_buffer_.getLength()),
+                        boost::bind(&TCPClient::handleWrite, this,
+                                    placeholders::error));
+        }
+    }
+
+    void handleWrite(const boost::system::error_code& error)
+    {
+        if (!error) {
+            start();            // handle next request, if any.
+      } else {
+            delete this;
+      }
+    }
+
+private:
+    tcp::socket socket_;
+    OutputBuffer response_buffer_;
+    OutputBuffer responselen_buffer_;
+    MessageRenderer response_renderer_;
+    Message dns_message_;
+    enum { MAX_LENGTH = 65535 };
+    static const size_t TCP_MESSAGE_LENGTHSIZE = 2;
+    char data_[MAX_LENGTH];
+};
+
+class TCPServer
+{
+public:
+    TCPServer(io_service& io_service, int af, short port) :
+        io_service_(io_service),
+        acceptor_(io_service,
+                  tcp::endpoint(af == AF_INET6 ? tcp::v6() : tcp::v4(), port))
+    {
+        TCPClient* new_client = new TCPClient(io_service_);
+        // XXX: isn't the following exception free?  Need to check it.
+        acceptor_.async_accept(new_client->getSocket(),
+                               boost::bind(&TCPServer::handleAccept, this,
+                                           new_client, placeholders::error));
+    }
+
+    void handleAccept(TCPClient* new_client,
+                      const boost::system::error_code& error)
+    {
+        if (!error) {
+            new_client->start();
+            new_client = new TCPClient(io_service_);
+            acceptor_.async_accept(new_client->getSocket(),
+                                   boost::bind(&TCPServer::handleAccept,
+                                               this, new_client,
+                                               placeholders::error));
+        } else {
+            delete new_client;
+        }
+    }
+
+private:
+    io_service& io_service_;
+    tcp::acceptor acceptor_;
+};
+
+class UDPServer {
+public:
+    UDPServer(io_service& io_service, int af, short port) :
+        io_service_(io_service),
+        socket_(io_service,
+                udp::endpoint(af == AF_INET6 ? udp::v6() : udp::v4(), port)),
+        response_buffer_(0),
+        response_renderer_(response_buffer_),
+        dns_message_(Message::PARSE)
+    {
+        startReceive();
+    }
+
+    void handleRequest(const boost::system::error_code& error,
+                       size_t bytes_recvd)
+    {
+        if (!error && bytes_recvd > 0) {
+            InputBuffer request_buffer(data_, bytes_recvd);
+
+            dns_message_.clear(Message::PARSE);
+            response_renderer_.clear();
+            if (auth_server->processMessage(request_buffer, dns_message_,
+                                            response_renderer_, true) == 0) {
+                socket_.async_send_to(
+                    boost::asio::buffer(response_buffer_.getData(),
+                                        response_buffer_.getLength()),
+                    sender_endpoint_,
+                    boost::bind(&UDPServer::sendCompleted,
+                                this,
+                                placeholders::error,
+                                placeholders::bytes_transferred));
+            } else {
+                startReceive();
+            }
+        } else {
+            startReceive();
+        }
+    }
+
+    void sendCompleted(const boost::system::error_code& error,
+                       size_t bytes_sent)
+    {
+        startReceive();
+    }
+private:
+    void startReceive() {
+        socket_.async_receive_from(
+            boost::asio::buffer(data_, MAX_LENGTH), sender_endpoint_,
+            boost::bind(&UDPServer::handleRequest, this,
+                        placeholders::error,
+                        placeholders::bytes_transferred));
+    }
+
+private:
+    io_service& io_service_;
+    udp::socket socket_;
+    OutputBuffer response_buffer_;
+    MessageRenderer response_renderer_;
+    Message dns_message_;
+    udp::endpoint sender_endpoint_;
+    enum { MAX_LENGTH = 4096 };
+    char data_[MAX_LENGTH];
+};
+
+struct ServerSet {
+    ServerSet() : udp4_server(NULL), udp6_server(NULL),
+                  tcp4_server(NULL), tcp6_server(NULL)
+    {}
+    ~ServerSet()
+    {
+        delete udp4_server;
+        delete udp6_server;
+        delete tcp4_server;
+        delete tcp6_server;
+    }
+    UDPServer* udp4_server;
+    UDPServer* udp6_server;
+    TCPServer* tcp4_server;
+    TCPServer* tcp6_server;
+};
+
+static void
+run_server(const char* port, const bool use_ipv4, const bool use_ipv6,
+           const string& specfile)
+{
+    ServerSet servers;
+    boost::asio::io_service io_service;
+    short portnum = atoi(port);
+
+    ModuleCCSession cs(specfile, io_service, my_config_handler,
+                       my_command_handler);
+
+    if (use_ipv4) {
+        servers.udp4_server = new UDPServer(io_service, AF_INET, portnum);
+        servers.tcp4_server = new TCPServer(io_service, AF_INET, portnum);
+    }
+    if (use_ipv6) {
+        servers.udp6_server = new UDPServer(io_service, AF_INET6, portnum);
+        servers.tcp6_server = new TCPServer(io_service, AF_INET6, portnum);
+    }
+
+    cout << "Server started." << endl;
+    io_service.run();
+}
+}
+#else  // !HAVE_BOOSTLIB
+struct SocketSet {
+    SocketSet() : ups4(-1), tps4(-1), ups6(-1), tps6(-1) {}
+    ~SocketSet()
+    {
+        if (ups4 >= 0) {
+            close(ups4);
+        }
+        if (tps4 >= 0) {
+            close(tps4);
+        }
+        if (ups6 >= 0) {
+            close(ups6);
+        }
+        if (tps4 >= 0) {
+            close(tps6);
+        }
+    }
+    int ups4, tps4, ups6, tps6;
+};
+
 static int
-getSocket(int af, const char* port) {
+getUDPSocket(int af, const char* port) {
     struct addrinfo hints, *res;
 
     memset(&hints, 0, sizeof(hints));
@@ -95,30 +376,211 @@
 
     int error = getaddrinfo(NULL, port, &hints, &res);
     if (error != 0) {
-        cerr << "getaddrinfo failed: " << gai_strerror(error);
-        return (-1);
+        isc_throw(FatalError, "getaddrinfo failed: " << gai_strerror(error));
     }
 
     int s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
     if (s < 0) {
-        cerr << "failed to open socket" << endl;
-        return (-1);
+        isc_throw(FatalError, "failed to open socket");
     }
 
     if (af == AF_INET6) {
         int on = 1;
         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)) < 0) {
             cerr << "couldn't set IPV6_V6ONLY socket option" << endl;
+            // proceed anyway
         }
     }
 
     if (bind(s, res->ai_addr, res->ai_addrlen) < 0) {
-        cerr << "binding socket failure" << endl;
-        close(s);
-        return (-1);
+        isc_throw(FatalError, "binding socket failure");
     }
 
     return (s);
+}
+
+static int
+getTCPSocket(int af, const char* port) {
+    struct addrinfo hints, *res;
+
+    memset(&hints, 0, sizeof(hints));
+    hints.ai_family = af;
+    hints.ai_socktype = SOCK_STREAM;
+    hints.ai_flags = AI_PASSIVE;
+    hints.ai_protocol = IPPROTO_TCP;
+
+    int error = getaddrinfo(NULL, port, &hints, &res);
+    if (error != 0) {
+        isc_throw(FatalError, "getaddrinfo failed: " << gai_strerror(error));
+    }
+
+    int s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+    if (s < 0) {
+        isc_throw(FatalError, "failed to open socket");
+    }
+
+    int on = 1;
+    if (af == AF_INET6) {
+        if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)) < 0) {
+            cerr << "couldn't set IPV6_V6ONLY socket option" << endl;
+        }
+    }
+
+    if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+        cerr << "couldn't set SO_REUSEADDR socket option" << endl;
+    }
+
+    if (bind(s, res->ai_addr, res->ai_addrlen) < 0) {
+        isc_throw(FatalError, "binding socket failure");
+    }
+
+    listen(s, 100);
+    return (s);
+}
+
+static void
+processMessageUDP(const int fd, Message& dns_message,
+                  MessageRenderer& response_renderer)
+{
+    struct sockaddr_storage ss;
+    socklen_t sa_len = sizeof(ss);
+    struct sockaddr* sa = static_cast<struct sockaddr*>((void*)&ss);
+    char recvbuf[4096];
+    int cc;
+
+    dns_message.clear(Message::PARSE);
+    response_renderer.clear();
+    if ((cc = recvfrom(fd, recvbuf, sizeof(recvbuf), 0, sa, &sa_len)) > 0) {
+        InputBuffer buffer(recvbuf, cc);
+        if (auth_server->processMessage(buffer, dns_message, response_renderer,
+                                        true) == 0) {
+            sendto(fd, response_renderer.getData(),
+                   response_renderer.getLength(), 0, sa, sa_len);
+        }
+    }
+}
+
+static void
+processMessageTCP(const int fd, Message& dns_message,
+                  MessageRenderer& response_renderer)
+{
+    struct sockaddr_storage ss;
+    socklen_t sa_len = sizeof(ss);
+    struct sockaddr* sa = static_cast<struct sockaddr*>((void*)&ss);
+    char sizebuf[2];
+    int cc;
+    int ts = accept(fd, sa, &sa_len);
+
+    cout << "[XX] process TCP" << endl;
+    cc = recv(ts, sizebuf, 2, 0);
+    cout << "[XX] got: " << cc << endl;
+    uint16_t size, size_n;
+    memcpy(&size_n, sizebuf, 2);
+    size = ntohs(size_n);
+    cout << "[XX] got: " << size << endl;
+
+    vector<char> message_buffer;
+    message_buffer.reserve(size);
+    cc = 0;
+    while (cc < size) {
+        cout << "[XX] cc now: " << cc << " of " << size << endl;
+        cc += recv(ts, &message_buffer[0] + cc, size - cc, 0);
+    }
+
+    InputBuffer buffer(&message_buffer[0], size);
+    dns_message.clear(Message::PARSE);
+    response_renderer.clear();
+    if (auth_server->processMessage(buffer, dns_message, response_renderer,
+                                    false) == 0) {
+        size = response_renderer.getLength();
+        size_n = htons(size);
+        if (send(ts, &size_n, 2, 0) == 2) {
+            cc = send(ts, response_renderer.getData(),
+                      response_renderer.getLength(), 0);
+            if (cc == -1) {
+                cerr << "[AuthSrv] error in sending TCP response message" <<
+                    endl;
+            } else {
+                cout << "[XX] sent TCP response: " << cc << " bytes" << endl;
+            }
+        }
+    }
+ 
+   // TODO: we don't check for more queries on the stream atm
+    close(ts);
+}
+
+static void
+run_server(const char* port, const bool use_ipv4, const bool use_ipv6,
+           const string& specfile)
+{
+    SocketSet socket_set;
+    fd_set fds_base;
+    int nfds = -1;
+
+    FD_ZERO(&fds_base);
+    if (use_ipv4) {
+        socket_set.ups4 = getUDPSocket(AF_INET, port);
+        FD_SET(socket_set.ups4, &fds_base);
+        nfds = max(nfds, socket_set.ups4);
+        socket_set.tps4 = getTCPSocket(AF_INET, port);
+        FD_SET(socket_set.tps4, &fds_base);
+        nfds = max(nfds, socket_set.tps4);
+    }
+    if (use_ipv6) {
+        socket_set.ups6 = getUDPSocket(AF_INET6, port);
+        FD_SET(socket_set.ups6, &fds_base);
+        nfds = max(nfds, socket_set.ups6);
+        socket_set.tps6 = getTCPSocket(AF_INET6, port);
+        FD_SET(socket_set.tps6, &fds_base);
+        nfds = max(nfds, socket_set.tps6);
+    }
+    ++nfds;
+
+    ModuleCCSession cs(specfile, my_config_handler, my_command_handler);
+
+    cout << "Server started." << endl;
+    
+    int ss = cs.getSocket();
+    Message dns_message(Message::PARSE);
+    OutputBuffer resonse_buffer(0);
+    MessageRenderer response_renderer(resonse_buffer);
+
+    while (true) {
+        fd_set fds = fds_base;
+        FD_SET(ss, &fds);
+
+        int n = select(nfds, &fds, NULL, NULL, NULL);
+        if (n < 0) {
+            if (errno != EINTR) {
+                isc_throw(FatalError, "select error");
+            }
+            continue;
+        }
+
+        if (socket_set.ups4 >= 0 && FD_ISSET(socket_set.ups4, &fds)) {
+            processMessageUDP(socket_set.ups4, dns_message, response_renderer);
+        }
+        if (socket_set.ups6 >= 0 && FD_ISSET(socket_set.ups6, &fds)) {
+            processMessageUDP(socket_set.ups6, dns_message, response_renderer);
+        }
+        if (socket_set.tps4 >= 0 && FD_ISSET(socket_set.tps4, &fds)) {
+            processMessageTCP(socket_set.tps4, dns_message, response_renderer);
+        }
+        if (socket_set.tps6 >= 0 && FD_ISSET(socket_set.tps6, &fds)) {
+            processMessageTCP(socket_set.tps6, dns_message, response_renderer);
+        }
+        if (FD_ISSET(ss, &fds)) {
+            cs.check_command();
+        }
+    }
+}
+#endif // HAVE_BOOSTLIB
+
+static void
+usage() {
+    cerr << "Usage: b10-auth [-p port] [-4|-6]" << endl;
+    exit(1);
 }
 
 int
@@ -126,7 +588,7 @@
     int ch;
     const char* port = DNSPORT;
     bool ipv4_only = false, ipv6_only = false;
-    int ps4 = -1, ps6 = -1;
+    bool use_ipv4 = false, use_ipv6 = false;
 
     while ((ch = getopt(argc, argv, "46p:")) != -1) {
         switch (ch) {
@@ -154,19 +616,10 @@
         usage();
     }
     if (!ipv6_only) {
-        ps4 = getSocket(AF_INET, port);
-        if (ps4 < 0) {
-            exit(1);
-        }
+        use_ipv4 = true;
     }
     if (!ipv4_only) {
-        ps6 = getSocket(AF_INET6, port);
-        if (ps6 < 0) {
-            if (ps4 < 0) {
-                close(ps4);
-            }
-            exit(1);
-        }
+        use_ipv4 = true;
     }
 
     auth_server = new AuthSrv;
@@ -181,54 +634,13 @@
         } else {
             specfile = string(AUTH_SPECFILE_LOCATION);
         }
-        ModuleCCSession cs(specfile, my_config_handler, my_command_handler);
-
-        // main server loop
-        fd_set fds;
-        int ss = cs.getSocket();
-        int nfds = max(max(ps4, ps6), ss) + 1;
-        int counter = 0;
-
-        cout << "Server started." << endl;
-        while (true) {
-            FD_ZERO(&fds);
-            if (ps4 >= 0) {
-                FD_SET(ps4, &fds);
-            }
-            if (ps6 >= 0) {
-                FD_SET(ps6, &fds);
-            }
-            FD_SET(ss, &fds);
-
-            int n = select(nfds, &fds, NULL, NULL, NULL);
-            if (n < 0) {
-                throw FatalError("select error");
-            }
-
-            if (ps4 >= 0 && FD_ISSET(ps4, &fds)) {
-                ++counter;
-                auth_server->processMessage(ps4);
-            }
-            if (ps6 >= 0 && FD_ISSET(ps6, &fds)) {
-                ++counter;
-                auth_server->processMessage(ps6);
-            }
-    
-            if (FD_ISSET(ss, &fds)) {
-                cs.check_command();
-            }
-        }
-    } catch (SessionError se) {
-        cout << se.what() << endl;
+
+        run_server(port, use_ipv4, use_ipv6, specfile);
+    } catch (const std::exception& ex) {
+        cerr << ex.what() << endl;
         ret = 1;
     }
 
-    if (ps4 >= 0) {
-        close(ps4);
-    }
-    if (ps6 >= 0) {
-        close(ps6);
-    }
     delete auth_server;
     return (ret);
 }

Modified: trunk/src/lib/cc/session.cc
==============================================================================
--- trunk/src/lib/cc/session.cc (original)
+++ trunk/src/lib/cc/session.cc Tue Mar  9 22:52:15 2010
@@ -14,18 +14,37 @@
 
 // $Id$
 
-#include "data.h"
-#include "session.h"
+#include "config.h"
+
+#include <stdint.h>
 
 #include <cstdio>
 #include <vector>
 #include <iostream>
 #include <sstream>
 
+#ifdef HAVE_BOOSTLIB
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/asio.hpp>
+#endif
+
+#include <exceptions/exceptions.h>
+
+#include "data.h"
+#include "session.h"
+
 using namespace std;
 using namespace isc::cc;
 using namespace isc::data;
 
+#ifdef HAVE_BOOSTLIB
+// some of the boost::asio names conflict with socket API system calls
+// (e.g. write(2)) so we don't import the entire boost::asio namespace.
+using boost::asio::io_service;
+using boost::asio::ip::tcp;
+#endif
+
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
@@ -35,63 +54,172 @@
 
 class SessionImpl {
 public:
-    SessionImpl() : sock_(-1), sequence_(-1) {}
-    int sock_;
+    SessionImpl() : sequence_(-1) {}
+    virtual ~SessionImpl() {}
+    virtual void establish() = 0; 
+    virtual int getSocket() = 0;
+    virtual void disconnect() = 0;
+    virtual void writeData(const void* data, size_t datalen) = 0;
+    virtual size_t readDataLength() = 0;
+    virtual void readData(void* data, size_t datalen) = 0;
+    virtual void startRead(boost::function<void()> user_handler) = 0;
+
     int sequence_; // the next sequence number to use
     std::string lname_;
 };
 
-Session::Session() : impl_(new SessionImpl)
-{}
-
-Session::~Session() {
-    delete impl_;
-}
-
-void
-Session::disconnect() {
-    close(impl_->sock_);
-    impl_->sock_ = -1;
-}
-
-int
-Session::getSocket() const {
-     return (impl_->sock_);
-}
-
-namespace {
+#ifdef HAVE_BOOSTLIB
+class ASIOSession : public SessionImpl {
+public:
+    ASIOSession(io_service& io_service) :
+        io_service_(io_service), socket_(io_service_), data_length_(0)
+    {}
+    virtual void establish();
+    virtual void disconnect();
+    virtual int getSocket() { return (socket_.native()); }
+    virtual void writeData(const void* data, size_t datalen);
+    virtual size_t readDataLength();
+    virtual void readData(void* data, size_t datalen);
+    virtual void startRead(boost::function<void()> user_handler);
+private:
+    void internalRead(const boost::system::error_code& error,
+                      size_t bytes_transferred);
+
+private:
+    io_service& io_service_;
+    tcp::socket socket_;
+    uint32_t data_length_;
+    boost::function<void()> user_handler_;
+    boost::system::error_code error_;
+};
+
+void
+ASIOSession::establish() {
+    socket_.connect(tcp::endpoint(boost::asio::ip::address_v4::loopback(),
+                                  9912), error_);
+    if (error_) {
+        isc_throw(SessionError, "Unable to connect to message queue");
+    }
+}
+
+void
+ASIOSession::disconnect() {
+    socket_.close();
+    data_length_ = 0;
+}
+
+void
+ASIOSession::writeData(const void* data, size_t datalen) {
+    try {
+        boost::asio::write(socket_, boost::asio::buffer(data, datalen));
+    } catch (const boost::system::system_error& boost_ex) {
+        isc_throw(SessionError, "ASIO write failed: " << boost_ex.what());
+    }
+}
+
+size_t
+ASIOSession::readDataLength() {
+    size_t ret_len = data_length_;
+    
+    if (ret_len == 0) {
+        readData(&data_length_, sizeof(data_length_));
+        if (data_length_ == 0) {
+            isc_throw(SessionError, "ASIO read: data length is not ready");
+        }
+        ret_len = ntohl(data_length_);
+    }
+
+    data_length_ = 0;
+    return (ret_len);
+}
+
+void
+ASIOSession::readData(void* data, size_t datalen) {
+    try {
+        boost::asio::read(socket_, boost::asio::buffer(data, datalen));
+    } catch (const boost::system::system_error& boost_ex) {
+        // to hide boost specific exceptions, we catch them explicitly
+        // and convert it to SessionError.
+        isc_throw(SessionError, "ASIO read failed: " << boost_ex.what());
+    }
+}
+
+void
+ASIOSession::startRead(boost::function<void()> user_handler) {
+    data_length_ = 0;
+    user_handler_ = user_handler;
+    async_read(socket_, boost::asio::buffer(&data_length_,
+                                            sizeof(data_length_)),
+               boost::bind(&ASIOSession::internalRead, this,
+                           boost::asio::placeholders::error,
+                           boost::asio::placeholders::bytes_transferred));
+}
+
+void
+ASIOSession::internalRead(const boost::system::error_code& error,
+                          size_t bytes_transferred)
+{
+    if (!error) {
+        assert(bytes_transferred == sizeof(data_length_));
+        data_length_ = ntohl(data_length_);
+        if (data_length_ == 0) {
+            isc_throw(SessionError, "Invalid message length (0)");
+        }
+        user_handler_();
+    } else {
+        isc_throw(SessionError, "asynchronous read failed");
+    }
+}
+#endif
+
+class SocketSession : public SessionImpl {
+public:
+    SocketSession() : sock_(-1) {}
+    virtual ~SocketSession() { disconnect(); }
+    virtual int getSocket() { return (sock_); }
+    void establish();
+    virtual void disconnect()
+    {
+        if (sock_ >= 0) {
+            close(sock_);
+        }
+        sock_ = -1;
+    }
+    virtual void writeData(const void* data, size_t datalen);
+    virtual void readData(void* data, size_t datalen);
+    virtual size_t readDataLength();
+    virtual void startRead(boost::function<void()> user_handler)
+    {} // nothing to do for this class
+private:
+    int sock_;
+};
+
+namespace {                     // maybe unnecessary.
 // This is a helper class to make the establish() method (below) exception-safe
 // with the RAII approach.
-class SocketHolder {
+class SessionHolder {
 public:
-    SocketHolder(SessionImpl* obj, int fd) : impl_obj_(obj), fd_(fd)
+    SessionHolder(SessionImpl* obj) : impl_obj_(obj) {}
+    ~SessionHolder()
     {
-        impl_obj_->sock_ = fd;
-    }
-    ~SocketHolder()
-    {
-        if (fd_ >= 0) {
-            close(fd_);
-            impl_obj_->sock_ = -1;
+        if (impl_obj_ != NULL) {
+            impl_obj_->disconnect();
         }
     }
-    void clear() { fd_ = -1; }
+    void clear() { impl_obj_ = NULL; }
     SessionImpl* impl_obj_;
-    int fd_;
 };
 }
 
 void
-Session::establish() {
-    int s, ret;
+SocketSession::establish() {
+    int s;
     struct sockaddr_in sin;
 
     s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
     if (s < 0) {
-        throw SessionError("socket() failed");
-    }
-
-    SocketHolder socket_holder(impl_, s);
+        isc_throw(SessionError, "socket() failed");
+    }
 
     sin.sin_family = AF_INET;
     sin.sin_port = htons(9912);
@@ -101,18 +229,80 @@
     sin.sin_len = sizeof(struct sockaddr_in);
 #endif
 
-    ret = connect(s, (struct sockaddr *)&sin, sizeof(sin));
-    if (ret < 0) {
-        throw SessionError("Unable to connect to message queue");
-    }
+    if (connect(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
+        close(s);
+        isc_throw(SessionError, "Unable to connect to message queue");
+    }
+
+    sock_ = s;
+}
+
+void
+SocketSession::writeData(const void* data, const size_t datalen) {
+    int cc = write(sock_, data, datalen);
+    if (cc != datalen) {
+        isc_throw(SessionError, "Write failed: expect " << datalen <<
+                  ", actual " << cc);
+    }
+}
+
+size_t
+SocketSession::readDataLength() {
+    uint32_t length;
+    readData(&length, sizeof(length));
+    return (ntohl(length));
+}
+
+void
+SocketSession::readData(void* data, const size_t datalen) {
+    int cc = read(sock_, data, datalen);
+    if (cc != datalen) {
+        isc_throw(SessionError, "Read failed: expect " << datalen <<
+                  ", actual " << cc);
+    }
+}
+
+Session::Session() : impl_(new SocketSession)
+{}
+
+#ifdef HAVE_BOOSTLIB
+Session::Session(io_service& io_service) : impl_(new ASIOSession(io_service))
+{}
+#endif
+
+Session::~Session() {
+    delete impl_;
+}
+
+void
+Session::disconnect() {
+    impl_->disconnect();
+}
+
+int
+Session::getSocket() const {
+    return (impl_->getSocket());
+}
+
+void
+Session::startRead(boost::function<void()> read_callback) {
+    impl_->startRead(read_callback);
+}
+
+void
+Session::establish() {
+    impl_->establish();
+
+    // once established, encapsulate the implementation object so that we
+    // can safely release the internal resource when exception happens
+    // below.
+    SessionHolder session_holder(impl_);
 
     //
     // send a request for our local name, and wait for a response
     //
-    std::string get_lname_str = "{ \"type\": \"getlname\" }";
-    std::stringstream get_lname_stream;
-    get_lname_stream.str(get_lname_str);
-    ElementPtr get_lname_msg = Element::createFromString(get_lname_stream);
+    ElementPtr get_lname_msg =
+        Element::createFromString("{ \"type\": \"getlname\" }");
     sendmsg(get_lname_msg);
 
     ElementPtr routing, msg;
@@ -121,7 +311,8 @@
     impl_->lname_ = msg->get("lname")->stringValue();
     cout << "My local name is:  " << impl_->lname_ << endl;
 
-    socket_holder.clear();
+    // At this point there's no risk of resource leak.
+    session_holder.clear();
 }
 
 //
@@ -134,82 +325,44 @@
     unsigned int length_net = htonl(length);
     unsigned short header_length = header_wire.length();
     unsigned short header_length_net = htons(header_length);
-    unsigned int ret;
-
-    assert(impl_->sock_ != -1);
-
-    ret = write(impl_->sock_, &length_net, 4);
-    if (ret != 4)
-        throw SessionError("Short write");
-
-    ret = write(impl_->sock_, &header_length_net, 2);
-    if (ret != 2)
-        throw SessionError("Short write");
-
-    ret = write(impl_->sock_, header_wire.c_str(), header_length);
-    if (ret != header_length) {
-        throw SessionError("Short write");
-    }
-}
-
-void
-Session::sendmsg(ElementPtr& env, ElementPtr& msg)
-{
+
+    impl_->writeData(&length_net, sizeof(length_net));
+    impl_->writeData(&header_length_net, sizeof(header_length_net));
+    impl_->writeData(header_wire.data(), header_length);
+}
+
+void
+Session::sendmsg(ElementPtr& env, ElementPtr& msg) {
     std::string header_wire = env->toWire();
     std::string body_wire = msg->toWire();
     unsigned int length = 2 + header_wire.length() + body_wire.length();
     unsigned int length_net = htonl(length);
     unsigned short header_length = header_wire.length();
     unsigned short header_length_net = htons(header_length);
-    unsigned int ret;
-
-    ret = write(impl_->sock_, &length_net, 4);
-    if (ret != 4)
-        throw SessionError("Short write");
-
-    ret = write(impl_->sock_, &header_length_net, 2);
-    if (ret != 2)
-        throw SessionError("Short write");
-
-    ret = write(impl_->sock_, header_wire.c_str(), header_length);
-    if (ret != header_length) {
-        throw SessionError("Short write");
-    }
-    ret = write(impl_->sock_, body_wire.c_str(), body_wire.length());
-    if (ret != body_wire.length()) {
-        throw SessionError("Short write");
-    }
+
+    impl_->writeData(&length_net, sizeof(length_net));
+    impl_->writeData(&header_length_net, sizeof(header_length_net));
+    impl_->writeData(header_wire.data(), header_length);
+    impl_->writeData(body_wire.data(), body_wire.length());
 }
 
 bool
-Session::recvmsg(ElementPtr& msg, bool nonblock)
-{
-    unsigned int length_net;
+Session::recvmsg(ElementPtr& msg, bool nonblock) {
+    size_t length = impl_->readDataLength();
+
     unsigned short header_length_net;
-    unsigned int ret;
-
-    ret = read(impl_->sock_, &length_net, 4);
-    if (ret != 4)
-        throw SessionError("Short read");
-
-    ret = read(impl_->sock_, &header_length_net, 2);
-    if (ret != 2)
-        throw SessionError("Short read");
-
-    unsigned int length = ntohl(length_net) - 2;
+    impl_->readData(&header_length_net, sizeof(header_length_net));
+
     unsigned short header_length = ntohs(header_length_net);
     if (header_length != length) {
-        throw SessionError("Received non-empty body where only a header expected");
+        isc_throw(SessionError, "Length parameters invalid: total=" << length
+                  << ", header=" << header_length);
     }
 
     std::vector<char> buffer(length);
-    ret = read(impl_->sock_, &buffer[0], length);
-    if (ret != length) {
-        throw SessionError("Short read");
-    }
+    impl_->readData(&buffer[0], length);
 
     std::string wire = std::string(&buffer[0], length);
-
     std::stringstream wire_stream;
     wire_stream << wire;
 
@@ -220,36 +373,26 @@
 }
 
 bool
-Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock)
-{
-    unsigned int length_net;
+Session::recvmsg(ElementPtr& env, ElementPtr& msg, bool nonblock) {
+    size_t length = impl_->readDataLength();
+
     unsigned short header_length_net;
-    unsigned int ret;
-
-    ret = read(impl_->sock_, &length_net, 4);
-    if (ret != 4)
-        throw SessionError("Short read");
-
-    ret = read(impl_->sock_, &header_length_net, 2);
-    if (ret != 2)
-        throw SessionError("Short read");
-
-    unsigned int length = ntohl(length_net);
+    impl_->readData(&header_length_net, sizeof(header_length_net));
+
     unsigned short header_length = ntohs(header_length_net);
-    if (header_length > length)
-        throw SessionError("Bad header length");
+    if (header_length > length || length < 2) {
+        isc_throw(SessionError, "Length parameters invalid: total=" << length
+                  << ", header=" << header_length);
+    }
 
     // remove the header-length bytes from the total length
     length -= 2;
     std::vector<char> buffer(length);
-    ret = read(impl_->sock_, &buffer[0], length);
-    if (ret != length) {
-        throw SessionError("Short read");
-    }
+    impl_->readData(&buffer[0], length);
 
     std::string header_wire = std::string(&buffer[0], header_length);
-    std::string body_wire = std::string(&buffer[0] + header_length, length - header_length);
-
+    std::string body_wire = std::string(&buffer[0] + header_length,
+                                        length - header_length);
     std::stringstream header_wire_stream;
     header_wire_stream << header_wire;
     env = Element::fromWire(header_wire_stream, header_length);
@@ -263,8 +406,7 @@
 }
 
 void
-Session::subscribe(std::string group, std::string instance)
-{
+Session::subscribe(std::string group, std::string instance) {
     ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
 
     env->set("type", Element::create("subscribe"));
@@ -275,8 +417,7 @@
 }
 
 void
-Session::unsubscribe(std::string group, std::string instance)
-{
+Session::unsubscribe(std::string group, std::string instance) {
     ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
 
     env->set("type", Element::create("unsubscribe"));
@@ -302,23 +443,18 @@
 
     sendmsg(env, msg);
 
-    return (impl_->sequence_++);
+    return (++impl_->sequence_);
 }
 
 bool
-Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg, bool nonblock)
+Session::group_recvmsg(ElementPtr& envelope, ElementPtr& msg,
+                       bool nonblock)
 {
-    bool got_message = recvmsg(envelope, msg, nonblock);
-    if (!got_message) {
-        return false;
-    }
-
-    return (true);
+    return (recvmsg(envelope, msg, nonblock));
 }
 
 unsigned int
-Session::reply(ElementPtr& envelope, ElementPtr& newmsg)
-{
+Session::reply(ElementPtr& envelope, ElementPtr& newmsg) {
     ElementPtr env = Element::create(std::map<std::string, ElementPtr>());
 
     env->set("type", Element::create("send"));
@@ -331,7 +467,7 @@
 
     sendmsg(env, newmsg);
 
-    return (impl_->sequence_++);
-}
-}
-}
+    return (++impl_->sequence_);
+}
+}
+}

Modified: trunk/src/lib/cc/session.h
==============================================================================
--- trunk/src/lib/cc/session.h (original)
+++ trunk/src/lib/cc/session.h Tue Mar  9 22:52:15 2010
@@ -19,19 +19,26 @@
 
 #include <string>
 
+#include <boost/function.hpp>
+
+#include <exceptions/exceptions.h>
+
 #include "data.h"
+
+namespace boost {
+namespace asio {
+class io_service;
+}
+}
 
 namespace isc {
     namespace cc {
         class SessionImpl;
 
-        class SessionError : public std::exception {
+        class SessionError : public isc::Exception {
         public:
-            SessionError(std::string m = "CC Session Error") : msg(m) {}
-            ~SessionError() throw() {}
-            const char* what() const throw() { return msg.c_str(); }
-        private:
-            std::string msg;
+            SessionError(const char* file, size_t line, const char* what) :
+                isc::Exception(file, line, what) {}
         };
 
         class Session {
@@ -44,15 +51,19 @@
 
         public:
             Session();
+            Session(boost::asio::io_service& ioservice);
             ~Session();
 
             // XXX: quick hack to allow the user to watch the socket directly.
             int getSocket() const;
 
+            void startRead(boost::function<void()> read_callback);
+
             void establish();
             void disconnect();
             void sendmsg(isc::data::ElementPtr& msg);
-            void sendmsg(isc::data::ElementPtr& env, isc::data::ElementPtr& msg);
+            void sendmsg(isc::data::ElementPtr& env,
+                         isc::data::ElementPtr& msg);
             bool recvmsg(isc::data::ElementPtr& msg,
                          bool nonblock = true);
             bool recvmsg(isc::data::ElementPtr& env,

Modified: trunk/src/lib/config/ccsession.cc
==============================================================================
--- trunk/src/lib/config/ccsession.cc (original)
+++ trunk/src/lib/config/ccsession.cc Tue Mar  9 22:52:15 2010
@@ -20,6 +20,7 @@
 //               react on config change announcements)
 //
 
+#include "config.h"
 
 #include <stdexcept>
 #include <stdlib.h>
@@ -31,6 +32,9 @@
 #include <sstream>
 #include <cerrno>
 
+#ifdef HAVE_BOOSTLIB
+#include <boost/bind.hpp>
+#endif
 #include <boost/foreach.hpp>
 
 #include <cc/data.h>
@@ -163,10 +167,50 @@
     return module_spec;
 }
 
-ModuleCCSession::ModuleCCSession(std::string spec_file_name,
-                               isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
-                               isc::data::ElementPtr(*command_handler)(const std::string& command, const isc::data::ElementPtr args)
-                              ) throw (isc::cc::SessionError)
+#ifdef HAVE_BOOSTLIB
+void
+ModuleCCSession::startCheck() {
+    // data available on the command channel.  process it in the synchronous
+    // mode.
+    check_command();
+
+    // start asynchronous read again.
+    session_.startRead(boost::bind(&ModuleCCSession::startCheck, this));
+}
+
+ModuleCCSession::ModuleCCSession(
+    std::string spec_file_name,
+    boost::asio::io_service& io_service,
+    isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
+    isc::data::ElementPtr(*command_handler)(
+        const std::string& command, const isc::data::ElementPtr args)
+    ) throw (isc::cc::SessionError) :
+    session_(io_service)
+{
+    init(spec_file_name, config_handler, command_handler);
+
+    // register callback for asynchronous read
+    session_.startRead(boost::bind(&ModuleCCSession::startCheck, this));
+}
+#endif
+
+ModuleCCSession::ModuleCCSession(
+    std::string spec_file_name,
+    isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
+    isc::data::ElementPtr(*command_handler)(
+        const std::string& command, const isc::data::ElementPtr args)
+    ) throw (isc::cc::SessionError)
+{
+    init(spec_file_name, config_handler, command_handler);
+}
+
+void
+ModuleCCSession::init(
+    std::string spec_file_name,
+    isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
+    isc::data::ElementPtr(*command_handler)(
+        const std::string& command, const isc::data::ElementPtr args)
+    ) throw (isc::cc::SessionError)
 {
     module_specification_ = read_module_specification(spec_file_name);
     sleep(1);

Modified: trunk/src/lib/config/ccsession.h
==============================================================================
--- trunk/src/lib/config/ccsession.h (original)
+++ trunk/src/lib/config/ccsession.h Tue Mar  9 22:52:15 2010
@@ -23,6 +23,12 @@
 #include <config/module_spec.h>
 #include <cc/session.h>
 #include <cc/data.h>
+
+namespace boost {
+namespace asio {
+class io_service;
+}
+}
 
 namespace isc {
 namespace config {
@@ -54,6 +60,11 @@
      *                        module specification.
      */
     ModuleCCSession(std::string spec_file_name,
+                    isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config) = NULL,
+                    isc::data::ElementPtr(*command_handler)(const std::string& command, const isc::data::ElementPtr args) = NULL
+                    ) throw (isc::cc::SessionError);
+    ModuleCCSession(std::string spec_file_name,
+                    boost::asio::io_service& io_service,
                     isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config) = NULL,
                     isc::data::ElementPtr(*command_handler)(const std::string& command, const isc::data::ElementPtr args) = NULL
                     ) throw (isc::cc::SessionError);
@@ -129,7 +140,15 @@
     ElementPtr getRemoteConfigValue(const std::string& module_name, const std::string& identifier);
     
 private:
+    void init(
+        std::string spec_file_name,
+        isc::data::ElementPtr(*config_handler)(
+            isc::data::ElementPtr new_config),
+        isc::data::ElementPtr(*command_handler)(
+            const std::string& command, const isc::data::ElementPtr args)
+        ) throw (isc::cc::SessionError);
     ModuleSpec read_module_specification(const std::string& filename);
+    void startCheck();
     
     std::string module_name_;
     isc::cc::Session session_;

Modified: trunk/src/lib/dns/message.cc
==============================================================================
--- trunk/src/lib/dns/message.cc (original)
+++ trunk/src/lib/dns/message.cc Tue Mar  9 22:52:15 2010
@@ -792,12 +792,6 @@
 }
 
 void
-Message::clear()
-{
-    impl_->init();
-}
-
-void
 Message::clear(Mode mode)
 {
     impl_->init();

Modified: trunk/src/lib/dns/message.h
==============================================================================
--- trunk/src/lib/dns/message.h (original)
+++ trunk/src/lib/dns/message.h Tue Mar  9 22:52:15 2010
@@ -563,7 +563,6 @@
     //void addRR(const Section& section, const RR& rr);
     //void removeRR(const Section& section, const RR& rr);
 
-    void clear();
     void clear(Mode mode);
 
     // prepare for making a response from a request.  This will clear the

Modified: trunk/src/lib/dns/tests/message_unittest.cc
==============================================================================
--- trunk/src/lib/dns/tests/message_unittest.cc (original)
+++ trunk/src/lib/dns/tests/message_unittest.cc Tue Mar  9 22:52:15 2010
@@ -124,12 +124,12 @@
     EXPECT_FALSE(message_parse.isDNSSECSupported());
 
     // If DO bit is on, DNSSEC is considered to be supported.
-    message_parse.clear();
+    message_parse.clear(Message::PARSE);
     factoryFromFile(message_parse, "testdata/message_fromWire2");
     EXPECT_TRUE(message_parse.isDNSSECSupported());
 
     // If DO bit is off, DNSSEC is considered to be unsupported.
-    message_parse.clear();
+    message_parse.clear(Message::PARSE);
     factoryFromFile(message_parse, "testdata/message_fromWire3");
     EXPECT_FALSE(message_parse.isDNSSECSupported());
 }
@@ -162,12 +162,12 @@
     EXPECT_EQ(Message::DEFAULT_MAX_UDPSIZE, message_parse.getUDPSize());
 
     // If the size specified in EDNS0 > default max, use it.
-    message_parse.clear();
+    message_parse.clear(Message::PARSE);
     factoryFromFile(message_parse, "testdata/message_fromWire2");
     EXPECT_EQ(4096, message_parse.getUDPSize());
 
     // If the size specified in EDNS0 < default max, keep using the default.
-    message_parse.clear();
+    message_parse.clear(Message::PARSE);
     factoryFromFile(message_parse, "testdata/message_fromWire8");
     EXPECT_EQ(Message::DEFAULT_MAX_UDPSIZE, message_parse.getUDPSize());
 }
@@ -203,7 +203,7 @@
     EXPECT_EQ(Rcode::BADVERS(), message_parse.getRcode());
 
     // Maximum extended Rcode
-    message_parse.clear();
+    message_parse.clear(Message::PARSE);
     factoryFromFile(message_parse, "testdata/message_fromWire11");
     EXPECT_EQ(0xfff, message_parse.getRcode().getCode());
 }
@@ -214,21 +214,21 @@
     EXPECT_THROW(factoryFromFile(message_parse, "testdata/message_fromWire4"),
                  DNSMessageFORMERR);
     // multiple OPT RRs (in the additional section)
-    message_parse.clear();
+    message_parse.clear(Message::PARSE);
     EXPECT_THROW(factoryFromFile(message_parse, "testdata/message_fromWire5"),
                  DNSMessageFORMERR);
     // OPT RR of a non root name
-    message_parse.clear();
+    message_parse.clear(Message::PARSE);
     EXPECT_THROW(factoryFromFile(message_parse, "testdata/message_fromWire6"),
                  DNSMessageFORMERR);
     // Compressed owner name of OPT RR points to a root name.
     // Not necessarily bogus, but very unusual and mostly pathological.
     // We accept it, but is it okay?
-    message_parse.clear();
+    message_parse.clear(Message::PARSE);
     EXPECT_NO_THROW(factoryFromFile(message_parse,
                                     "testdata/message_fromWire7"));
     // Unsupported Version
-    message_parse.clear();
+    message_parse.clear(Message::PARSE);
     EXPECT_THROW(factoryFromFile(message_parse, "testdata/message_fromWire9"),
                  DNSMessageBADVERS);
 }




More information about the bind10-changes mailing list