[svn] commit: r3084 - in /branches/trac327/src: bin/auth/ bin/auth/tests/ bin/recurse/ bin/recurse/tests/ lib/asiolink/ lib/asiolink/internal/ lib/asiolink/tests/
BIND 10 source code commits
bind10-changes at lists.isc.org
Thu Sep 30 06:40:50 UTC 2010
Author: each
Date: Thu Sep 30 06:40:49 2010
New Revision: 3084
Log:
Substantial further (but still very incomplete) work on ASIO structure.
DNS lookup calls can now be asynchronous, calling back into the UDPServer
or TCPServer coroutine that originated them via io_service::post().
Modified:
branches/trac327/src/bin/auth/auth_srv.cc
branches/trac327/src/bin/auth/auth_srv.h
branches/trac327/src/bin/auth/main.cc
branches/trac327/src/bin/auth/tests/auth_srv_unittest.cc
branches/trac327/src/bin/recurse/main.cc
branches/trac327/src/bin/recurse/recursor.cc
branches/trac327/src/bin/recurse/recursor.h
branches/trac327/src/bin/recurse/tests/recursor_unittest.cc
branches/trac327/src/lib/asiolink/asiolink.cc
branches/trac327/src/lib/asiolink/asiolink.h
branches/trac327/src/lib/asiolink/internal/coroutine.h
branches/trac327/src/lib/asiolink/internal/tcpdns.h
branches/trac327/src/lib/asiolink/internal/udpdns.h
branches/trac327/src/lib/asiolink/tcpdns.cc
branches/trac327/src/lib/asiolink/tests/asio_link_unittest.cc
branches/trac327/src/lib/asiolink/udpdns.cc
Modified: branches/trac327/src/bin/auth/auth_srv.cc
==============================================================================
--- branches/trac327/src/bin/auth/auth_srv.cc (original)
+++ branches/trac327/src/bin/auth/auth_srv.cc Thu Sep 30 06:40:49 2010
@@ -14,6 +14,8 @@
// $Id$
+#include <config.h>
+
#include <netinet/in.h>
#include <algorithm>
@@ -123,46 +125,78 @@
}
}
-// This is a derived class of \c DNSProvider, to serve as a
+// This is a derived class of \c DNSLookup, to serve as a
// callback in the asiolink module. It calls
// AuthSrv::processMessage() on a single DNS message.
-class MessageProcessor : public DNSProvider {
+class MessageLookup : public DNSLookup {
public:
- MessageProcessor(AuthSrv* srv) : server_(srv) {}
- virtual bool operator()(const IOMessage& io_message,
+ MessageLookup(AuthSrv* srv) : server_(srv) {}
+ virtual void operator()(const IOMessage& io_message,
isc::dns::Message& dns_message,
- isc::dns::MessageRenderer& renderer) const {
- return (server_->processMessage(io_message, dns_message, renderer));
+ isc::dns::MessageRenderer& renderer,
+ BasicServer* server, bool& complete) const
+ {
+ server_->processMessage(io_message, dns_message, renderer,
+ server, complete);
}
private:
AuthSrv* server_;
};
-// This is a derived class of \c CheckinProvider, to serve
-// as a callback in the asiolink module. It checks for queued
-// configuration messages, and executes them if found.
-class ConfigChecker : public CheckinProvider {
+// This is a derived class of \c DNSAnswer, to serve as a
+// callback in the asiolink module. It takes a completed
+// set of answer data from the DNS lookup and assembles it
+// into a wire-format response.
+class MessageAnswer : public DNSAnswer {
public:
- ConfigChecker(AuthSrv* srv) : server_(srv) {}
- virtual void operator()(void) const {
- if (server_->configSession()->hasQueuedMsgs()) {
- server_->configSession()->checkCommand();
- }
- }
+ MessageAnswer(AuthSrv* srv) : server_(srv) {}
+ virtual void operator()(const IOMessage& io_message,
+ isc::dns::Message& message,
+ isc::dns::MessageRenderer& renderer) const
+ {
+ if (io_message.getSocket().getProtocol() == IPPROTO_UDP) {
+ renderer.setLengthLimit(message.getUDPSize());
+ } else {
+ renderer.setLengthLimit(65535);
+ }
+ message.toWire(renderer);
+ if (server_->getVerbose()) {
+ cerr << "[b10-recurse] sending a response (" << renderer.getLength()
+ << " bytes):\n" << message.toText() << endl;
+ }
+ }
+
private:
AuthSrv* server_;
};
+// This is a derived class of \c IOCallback, to serve
+// as a callback in the asiolink module. It checks for queued
+// configuration messages, and executes them if found.
+class ConfigChecker : public IOCallback {
+public:
+ ConfigChecker(AuthSrv* srv) : server_(srv) {}
+ virtual void operator()(const IOMessage& io_message UNUSED_PARAM) const {
+ if (server_->configSession()->hasQueuedMsgs()) {
+ server_->configSession()->checkCommand();
+ }
+ }
+private:
+ AuthSrv* server_;
+};
+
AuthSrv::AuthSrv(const bool use_cache, AbstractXfroutClient& xfrout_client) :
impl_(new AuthSrvImpl(use_cache, xfrout_client)),
checkin_provider_(new ConfigChecker(this)),
- dns_provider_(new MessageProcessor(this))
+ dns_lookup_(new MessageLookup(this)),
+ dns_answer_(new MessageAnswer(this))
{}
AuthSrv::~AuthSrv() {
delete impl_;
delete checkin_provider_;
- delete dns_provider_;
+ delete dns_lookup_;
+ delete dns_answer_;
}
namespace {
@@ -241,9 +275,10 @@
return (impl_->config_session_);
}
-bool
+void
AuthSrv::processMessage(const IOMessage& io_message, Message& message,
- MessageRenderer& response_renderer)
+ MessageRenderer& response_renderer,
+ BasicServer* server, bool& complete)
{
InputBuffer request_buffer(io_message.getData(), io_message.getDataSize());
@@ -258,14 +293,21 @@
cerr << "[b10-auth] received unexpected response, ignoring"
<< endl;
}
- return (false);
+ complete = false;
+ server->resume();
+ return;
}
} catch (const Exception& ex) {
- return (false);
- }
-
- // Parse the message. On failure, return an appropriate error.
+ if (impl_->verbose_mode_) {
+ cerr << "[b10-auth] DNS packet exception: " << ex.what() << endl;
+ }
+ complete = false;
+ server->resume();
+ return;
+ }
+
try {
+ // Parse the message.
message.fromWire(request_buffer);
} catch (const DNSProtocolError& error) {
if (impl_->verbose_mode_) {
@@ -274,14 +316,18 @@
}
makeErrorMessage(message, response_renderer, error.getRcode(),
impl_->verbose_mode_);
- return (true);
+ complete = true;
+ server->resume();
+ return;
} catch (const Exception& ex) {
if (impl_->verbose_mode_) {
cerr << "[b10-auth] returning SERVFAIL: " << ex.what() << endl;
}
makeErrorMessage(message, response_renderer, Rcode::SERVFAIL(),
impl_->verbose_mode_);
- return (true);
+ complete = true;
+ server->resume();
+ return;
} // other exceptions will be handled at a higher layer.
if (impl_->verbose_mode_) {
@@ -291,35 +337,36 @@
// Perform further protocol-level validation.
if (message.getOpcode() == Opcode::NOTIFY()) {
- return (impl_->processNotify(io_message, message, response_renderer));
+ complete = impl_->processNotify(io_message, message,
+ response_renderer);
} else if (message.getOpcode() != Opcode::QUERY()) {
if (impl_->verbose_mode_) {
cerr << "[b10-auth] unsupported opcode" << endl;
}
makeErrorMessage(message, response_renderer, Rcode::NOTIMP(),
impl_->verbose_mode_);
- return (true);
- }
-
- if (message.getRRCount(Section::QUESTION()) != 1) {
+ complete = true;
+ } else if (message.getRRCount(Section::QUESTION()) != 1) {
makeErrorMessage(message, response_renderer, Rcode::FORMERR(),
impl_->verbose_mode_);
- return (true);
- }
-
- ConstQuestionPtr question = *message.beginQuestion();
- const RRType &qtype = question->getType();
- if (qtype == RRType::AXFR()) {
- return (impl_->processAxfrQuery(io_message, message,
- response_renderer));
- } else if (qtype == RRType::IXFR()) {
- makeErrorMessage(message, response_renderer, Rcode::NOTIMP(),
+ complete = true;
+ } else {
+ ConstQuestionPtr question = *message.beginQuestion();
+ const RRType &qtype = question->getType();
+ if (qtype == RRType::AXFR()) {
+ complete = impl_->processAxfrQuery(io_message, message,
+ response_renderer);
+ } else if (qtype == RRType::IXFR()) {
+ makeErrorMessage(message, response_renderer, Rcode::NOTIMP(),
impl_->verbose_mode_);
- return (true);
- } else {
- return (impl_->processNormalQuery(io_message, message,
- response_renderer));
- }
+ complete = true;
+ } else {
+ complete = impl_->processNormalQuery(io_message, message,
+ response_renderer);
+ }
+ }
+
+ server->resume();
}
bool
Modified: branches/trac327/src/bin/auth/auth_srv.h
==============================================================================
--- branches/trac327/src/bin/auth/auth_srv.h (original)
+++ branches/trac327/src/bin/auth/auth_srv.h Thu Sep 30 06:40:49 2010
@@ -66,19 +66,26 @@
//@}
/// \return \c true if the \message contains a response to be returned;
/// otherwise \c false.
- bool processMessage(const asiolink::IOMessage& io_message,
+ void processMessage(const asiolink::IOMessage& io_message,
isc::dns::Message& message,
- isc::dns::MessageRenderer& response_renderer);
+ isc::dns::MessageRenderer& response_renderer,
+ asiolink::BasicServer* server, bool& complete);
void setVerbose(bool on);
bool getVerbose() const;
isc::data::ConstElementPtr updateConfig(isc::data::ConstElementPtr config);
isc::config::ModuleCCSession* configSession() const;
void setConfigSession(isc::config::ModuleCCSession* config_session);
- asiolink::DNSProvider* getDNSProvider() {
- return (dns_provider_);
+ void setIOService(asiolink::IOService& ios) { io_service_ = &ios; }
+ asiolink::IOService& getIOService() const { return (*io_service_); }
+
+ asiolink::DNSLookup* getDNSLookupProvider() const {
+ return (dns_lookup_);
}
- asiolink::CheckinProvider* getCheckinProvider() {
+ asiolink::DNSAnswer* getDNSAnswerProvider() const {
+ return (dns_answer_);
+ }
+ asiolink::IOCallback* getCheckinProvider() const {
return (checkin_provider_);
}
@@ -98,8 +105,10 @@
void setXfrinSession(isc::cc::AbstractSession* xfrin_session);
private:
AuthSrvImpl* impl_;
- asiolink::CheckinProvider* checkin_provider_;
- asiolink::DNSProvider* dns_provider_;
+ asiolink::IOService* io_service_;
+ asiolink::IOCallback* checkin_provider_;
+ asiolink::DNSLookup* dns_lookup_;
+ asiolink::DNSAnswer* dns_answer_;
};
#endif // __AUTH_SRV_H
Modified: branches/trac327/src/bin/auth/main.cc
==============================================================================
--- branches/trac327/src/bin/auth/main.cc (original)
+++ branches/trac327/src/bin/auth/main.cc Thu Sep 30 06:40:49 2010
@@ -177,8 +177,9 @@
auth_server->setVerbose(verbose_mode);
cout << "[b10-auth] Server created." << endl;
- CheckinProvider* checkin = auth_server->getCheckinProvider();
- DNSProvider* process = auth_server->getDNSProvider();
+ IOCallback* checkin = auth_server->getCheckinProvider();
+ DNSLookup* lookup = auth_server->getDNSLookupProvider();
+ DNSAnswer* answer = auth_server->getDNSAnswerProvider();
if (address != NULL) {
// XXX: we can only specify at most one explicit address.
@@ -188,11 +189,12 @@
// is a short term workaround until we support dynamic listening
// port allocation.
io_service = new IOService(*port, *address,
- checkin, process);
+ checkin, lookup, answer);
} else {
io_service = new IOService(*port, use_ipv4, use_ipv6,
- checkin, process);
- }
+ checkin, lookup, answer);
+ }
+ auth_server->setIOService(*io_service);
cout << "[b10-auth] IOService created." << endl;
cc_session = new Session(io_service->get_io_service());
Modified: branches/trac327/src/bin/auth/tests/auth_srv_unittest.cc
==============================================================================
--- branches/trac327/src/bin/auth/tests/auth_srv_unittest.cc (original)
+++ branches/trac327/src/bin/auth/tests/auth_srv_unittest.cc Thu Sep 30 06:40:49 2010
@@ -122,6 +122,13 @@
bool receive_ok_;
};
+ // A nonoperative task object to be used in calls to processMessage()
+ class MockTask : public BasicServer {
+ void operator()(asio::error_code ec UNUSED_PARAM,
+ size_t length UNUSED_PARAM)
+ {}
+ };
+
protected:
AuthSrvTest() : server(true, xfrout),
request_message(Message::RENDER),
@@ -140,6 +147,7 @@
}
MockSession notify_session;
MockXfroutClient xfrout;
+ MockTask noOp;
AuthSrv server;
Message request_message;
Message parse_message;
@@ -273,7 +281,6 @@
const int protocol = IPPROTO_UDP)
{
delete io_message;
- delete io_sock;
data.clear();
delete endpoint;
@@ -359,8 +366,10 @@
data[2] = ((i << 3) & 0xff);
parse_message.clear(Message::PARSE);
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::NOTIMP(), i, QR_FLAG,
0, 0, 0, 0);
}
@@ -378,8 +387,10 @@
// Multiple questions. Should result in FORMERR.
TEST_F(AuthSrvTest, multiQuestion) {
createDataFromFile("multiquestion_fromWire");
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::FORMERR(), opcode.getCode(),
QR_FLAG, 2, 0, 0, 0);
@@ -399,8 +410,10 @@
// dropped.
TEST_F(AuthSrvTest, shortMessage) {
createDataFromFile("shortmessage_fromWire");
- EXPECT_EQ(false, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_FALSE(done);
}
// Response messages. Must be silently dropped, whether it's a valid response
@@ -408,26 +421,32 @@
TEST_F(AuthSrvTest, response) {
// A valid (although unusual) response
createDataFromFile("simpleresponse_fromWire");
- EXPECT_EQ(false, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_FALSE(done);
// A response with a broken question section. must be dropped rather than
// returning FORMERR.
createDataFromFile("shortresponse_fromWire");
- EXPECT_EQ(false, server.processMessage(*io_message, parse_message,
- response_renderer));
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_FALSE(done);
// A response to iquery. must be dropped rather than returning NOTIMP.
createDataFromFile("iqueryresponse_fromWire");
- EXPECT_EQ(false, server.processMessage(*io_message, parse_message,
- response_renderer));
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_FALSE(done);
}
// Query with a broken question
TEST_F(AuthSrvTest, shortQuestion) {
createDataFromFile("shortquestion_fromWire");
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
// Since the query's question is broken, the question section of the
// response should be empty.
headerCheck(parse_message, default_qid, Rcode::FORMERR(), opcode.getCode(),
@@ -437,8 +456,10 @@
// Query with a broken answer section
TEST_F(AuthSrvTest, shortAnswer) {
createDataFromFile("shortanswer_fromWire");
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
// This is a bogus query, but question section is valid. So the response
// should copy the question section.
@@ -456,8 +477,10 @@
// Query with unsupported version of EDNS.
TEST_F(AuthSrvTest, ednsBadVers) {
createDataFromFile("queryBadEDNS_fromWire");
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
// The response must have an EDNS OPT RR in the additional section.
// Note that the DNSSEC DO bit is cleared even if this bit in the query
@@ -472,8 +495,10 @@
// AXFR over UDP is invalid and should result in FORMERR.
createRequestPacket(opcode, Name("example.com"), RRClass::IN(),
RRType::AXFR(), IPPROTO_UDP);
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::FORMERR(), opcode.getCode(),
QR_FLAG, 1, 0, 0, 0);
}
@@ -484,8 +509,10 @@
RRType::AXFR(), IPPROTO_TCP);
// On success, the AXFR query has been passed to a separate process,
// so we shouldn't have to respond.
- EXPECT_EQ(false, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_FALSE(done);
EXPECT_FALSE(xfrout.isConnected());
}
@@ -494,8 +521,10 @@
xfrout.disableConnect();
createRequestPacket(opcode, Name("example.com"), RRClass::IN(),
RRType::AXFR(), IPPROTO_TCP);
- EXPECT_TRUE(server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::SERVFAIL(),
opcode.getCode(), QR_FLAG, 1, 0, 0, 0);
// For a shot term workaround with xfrout we currently close the connection
@@ -508,7 +537,9 @@
// open.
createRequestPacket(opcode, Name("example.com"), RRClass::IN(),
RRType::AXFR(), IPPROTO_TCP);
- server.processMessage(*io_message, parse_message, response_renderer);
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
EXPECT_FALSE(xfrout.isConnected()); // see above
xfrout.disableSend();
@@ -516,8 +547,9 @@
response_renderer.clear();
createRequestPacket(opcode, Name("example.com"), RRClass::IN(),
RRType::AXFR(), IPPROTO_TCP);
- EXPECT_TRUE(server.processMessage(*io_message, parse_message,
- response_renderer));
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::SERVFAIL(),
opcode.getCode(), QR_FLAG, 1, 0, 0, 0);
@@ -532,8 +564,9 @@
xfrout.disableDisconnect();
createRequestPacket(opcode, Name("example.com"), RRClass::IN(),
RRType::AXFR(), IPPROTO_TCP);
+ bool done;
EXPECT_THROW(server.processMessage(*io_message, parse_message,
- response_renderer),
+ response_renderer, &noOp, done),
XfroutError);
EXPECT_TRUE(xfrout.isConnected());
// XXX: we need to re-enable disconnect. otherwise an exception would be
@@ -546,8 +579,10 @@
RRType::SOA());
request_message.setHeaderFlag(MessageFlag::AA());
createRequestPacket(IPPROTO_UDP);
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
// An internal command message should have been created and sent to an
// external module. Check them.
@@ -578,8 +613,10 @@
RRType::SOA());
request_message.setHeaderFlag(MessageFlag::AA());
createRequestPacket(IPPROTO_UDP);
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
// Other conditions should be the same, so simply confirm the RR class is
// set correctly.
@@ -595,8 +632,10 @@
request_message.setQid(default_qid);
request_message.toWire(request_renderer);
createRequestPacket(IPPROTO_UDP);
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::FORMERR(),
Opcode::NOTIFY().getCode(), QR_FLAG, 0, 0, 0, 0);
}
@@ -609,8 +648,10 @@
RRType::SOA()));
request_message.setHeaderFlag(MessageFlag::AA());
createRequestPacket(IPPROTO_UDP);
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::FORMERR(),
Opcode::NOTIFY().getCode(), QR_FLAG, 2, 0, 0, 0);
}
@@ -620,8 +661,10 @@
RRType::NS());
request_message.setHeaderFlag(MessageFlag::AA());
createRequestPacket(IPPROTO_UDP);
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::FORMERR(),
Opcode::NOTIFY().getCode(), QR_FLAG, 1, 0, 0, 0);
}
@@ -630,8 +673,10 @@
// implicitly leave the AA bit off. our implementation will accept it.
createRequestPacket(Opcode::NOTIFY(), Name("example.com"), RRClass::IN(),
RRType::SOA());
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::NOERROR(),
Opcode::NOTIFY().getCode(), QR_FLAG | AA_FLAG, 1, 0, 0, 0);
}
@@ -642,8 +687,10 @@
request_message.setHeaderFlag(MessageFlag::AA());
request_message.setRcode(Rcode::SERVFAIL());
createRequestPacket(IPPROTO_UDP);
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::NOERROR(),
Opcode::NOTIFY().getCode(), QR_FLAG | AA_FLAG, 1, 0, 0, 0);
}
@@ -658,8 +705,10 @@
// we simply ignore the notify and let it be resent if an internal error
// happens.
- EXPECT_FALSE(server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_FALSE(done);
}
TEST_F(AuthSrvTest, notifySendFail) {
@@ -670,8 +719,10 @@
request_message.setHeaderFlag(MessageFlag::AA());
createRequestPacket(IPPROTO_UDP);
- EXPECT_FALSE(server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_FALSE(done);
}
TEST_F(AuthSrvTest, notifyReceiveFail) {
@@ -681,8 +732,10 @@
RRType::SOA());
request_message.setHeaderFlag(MessageFlag::AA());
createRequestPacket(IPPROTO_UDP);
- EXPECT_FALSE(server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_FALSE(done);
}
TEST_F(AuthSrvTest, notifyWithBogusSessionMessage) {
@@ -692,8 +745,10 @@
RRType::SOA());
request_message.setHeaderFlag(MessageFlag::AA());
createRequestPacket(IPPROTO_UDP);
- EXPECT_FALSE(server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_FALSE(done);
}
TEST_F(AuthSrvTest, notifyWithSessionMessageError) {
@@ -704,8 +759,10 @@
RRType::SOA());
request_message.setHeaderFlag(MessageFlag::AA());
createRequestPacket(IPPROTO_UDP);
- EXPECT_FALSE(server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_FALSE(done);
}
void
@@ -730,8 +787,10 @@
// response should have the AA flag on, and have an RR in each answer
// and authority section.
createDataFromFile("examplequery_fromWire");
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::NOERROR(), opcode.getCode(),
QR_FLAG | AA_FLAG, 1, 1, 1, 0);
}
@@ -744,8 +803,10 @@
// in a SERVFAIL response, and the answer and authority sections should
// be empty.
createDataFromFile("badExampleQuery_fromWire");
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::SERVFAIL(),
opcode.getCode(), QR_FLAG, 1, 0, 0, 0);
}
@@ -759,8 +820,10 @@
// The original data source should still exist.
createDataFromFile("examplequery_fromWire");
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer,
+ &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::NOERROR(), opcode.getCode(),
QR_FLAG | AA_FLAG, 1, 1, 1, 0);
}
Modified: branches/trac327/src/bin/recurse/main.cc
==============================================================================
--- branches/trac327/src/bin/recurse/main.cc (original)
+++ branches/trac327/src/bin/recurse/main.cc Thu Sep 30 06:40:49 2010
@@ -61,8 +61,8 @@
const string PROGRAM = "Recurse";
const char* DNSPORT = "5300";
+IOService* io_service;
Recursor *recursor;
-IOService* io_service;
ConstElementPtr
my_config_handler(ConstElementPtr new_config) {
@@ -161,12 +161,9 @@
specfile = string(RECURSE_SPECFILE_LOCATION);
}
- recursor = new Recursor();
- recursor ->setVerbose(verbose_mode);
- cout << "[b10-recurse] Server created." << endl;
-
- CheckinProvider* checkin = recursor->getCheckinProvider();
- DNSProvider* process = recursor->getDNSProvider();
+ IOCallback* checkin = recursor->getCheckinProvider();
+ DNSLookup* lookup = recursor->getDNSLookupProvider();
+ DNSAnswer* answer = recursor->getDNSAnswerProvider();
if (address != NULL) {
// XXX: we can only specify at most one explicit address.
@@ -176,12 +173,16 @@
// is a short term workaround until we support dynamic listening
// port allocation.
io_service = new IOService(*port, *address,
- checkin, process);
+ checkin, lookup, answer);
} else {
io_service = new IOService(*port, use_ipv4, use_ipv6,
- checkin, process);
+ checkin, lookup, answer);
}
cout << "[b10-recurse] IOService created." << endl;
+
+ recursor = new Recursor(*io_service);
+ recursor ->setVerbose(verbose_mode);
+ cout << "[b10-recurse] Server created." << endl;
cc_session = new Session(io_service->get_io_service());
cout << "[b10-recurse] Configuration session channel created." << endl;
Modified: branches/trac327/src/bin/recurse/recursor.cc
==============================================================================
--- branches/trac327/src/bin/recurse/recursor.cc (original)
+++ branches/trac327/src/bin/recurse/recursor.cc Thu Sep 30 06:40:49 2010
@@ -61,41 +61,78 @@
RecursorImpl(const RecursorImpl& source);
RecursorImpl& operator=(const RecursorImpl& source);
public:
- RecursorImpl();
- bool processNormalQuery(const IOMessage& io_message, Message& message,
- MessageRenderer& response_renderer);
+ RecursorImpl(asiolink::IOService& io_service);
+ bool processNormalQuery(const IOMessage& io_message,
+ const Question& question, Message& message,
+ MessageRenderer& renderer,
+ BasicServer* server);
ModuleCCSession* config_session_;
bool verbose_mode_;
+
+ /// Object to handle upstream queries
+ IOQuery ioquery_;
/// Currently non-configurable, but will be.
static const uint16_t DEFAULT_LOCAL_UDPSIZE = 4096;
};
-RecursorImpl::RecursorImpl() : config_session_(NULL), verbose_mode_(false) {}
-
-// This is a derived class of \c DNSProvider, to serve as a
+RecursorImpl::RecursorImpl(asiolink::IOService& io_service) :
+ config_session_(NULL), verbose_mode_(false), ioquery_(io_service)
+{}
+
+// This is a derived class of \c DNSLookup, to serve as a
// callback in the asiolink module. It calls
// Recursor::processMessage() on a single DNS message.
-class MessageProcessor : public DNSProvider {
-public:
- MessageProcessor(Recursor* srv) : server_(srv) {}
- virtual bool operator()(const IOMessage& io_message,
+class MessageLookup : public DNSLookup {
+public:
+ MessageLookup(Recursor* srv) : server_(srv) {}
+ virtual void operator()(const IOMessage& io_message,
isc::dns::Message& dns_message,
- isc::dns::MessageRenderer& renderer) const {
- return (server_->processMessage(io_message, dns_message, renderer));
+ isc::dns::MessageRenderer& renderer,
+ BasicServer* server, bool& complete) const
+ {
+ server_->processMessage(io_message, dns_message, renderer,
+ server, complete);
}
private:
Recursor* server_;
};
-// This is a derived class of \c CheckinProvider, to serve
+// This is a derived class of \c DNSAnswer, to serve as a
+// callback in the asiolink module. It takes a completed
+// set of answer data from the DNS lookup and assembles it
+// into a wire-format response.
+class MessageAnswer : public DNSAnswer {
+public:
+ MessageAnswer(Recursor* srv) : server_(srv) {}
+ virtual void operator()(const IOMessage& io_message,
+ isc::dns::Message& message,
+ isc::dns::MessageRenderer& renderer) const
+ {
+ if (io_message.getSocket().getProtocol() == IPPROTO_UDP) {
+ renderer.setLengthLimit(message.getUDPSize());
+ } else {
+ renderer.setLengthLimit(65535);
+ }
+ message.toWire(renderer);
+ if (server_->getVerbose()) {
+ cerr << "[b10-recurse] sending a response (" << renderer.getLength()
+ << " bytes):\n" << message.toText() << endl;
+ }
+ }
+
+private:
+ Recursor* server_;
+};
+
+// This is a derived class of \c IOCallback, to serve
// as a callback in the asiolink module. It checks for queued
// configuration messages, and executes them if found.
-class ConfigChecker : public CheckinProvider {
+class ConfigChecker : public IOCallback {
public:
ConfigChecker(Recursor* srv) : server_(srv) {}
- virtual void operator()(void) const {
+ virtual void operator()(const IOMessage& io_message UNUSED_PARAM) const {
if (server_->configSession()->hasQueuedMsgs()) {
server_->configSession()->checkCommand();
}
@@ -104,16 +141,18 @@
Recursor* server_;
};
-Recursor::Recursor() :
- impl_(new RecursorImpl()),
- checkin_provider_(new ConfigChecker(this)),
- dns_provider_(new MessageProcessor(this))
+Recursor::Recursor(asiolink::IOService& io_service) :
+ impl_(new RecursorImpl(io_service)),
+ checkin_(new ConfigChecker(this)),
+ dns_lookup_(new MessageLookup(this)),
+ dns_answer_(new MessageAnswer(this))
{}
Recursor::~Recursor() {
delete impl_;
- delete checkin_provider_;
- delete dns_provider_;
+ delete checkin_;
+ delete dns_lookup_;
+ delete dns_answer_;
}
namespace {
@@ -187,9 +226,10 @@
return (impl_->config_session_);
}
-bool
+void
Recursor::processMessage(const IOMessage& io_message, Message& message,
- MessageRenderer& response_renderer)
+ MessageRenderer& renderer,
+ BasicServer* server, bool& complete)
{
InputBuffer request_buffer(io_message.getData(), io_message.getDataSize());
@@ -204,10 +244,17 @@
cerr << "[b10-recurse] received unexpected response, ignoring"
<< endl;
}
- return (false);
+ complete = false;
+ server->resume();
+ return;
}
} catch (const Exception& ex) {
- return (false);
+ if (impl_->verbose_mode_) {
+ cerr << "[b10-recurse] DNS packet exception: " << ex.what() << endl;
+ }
+ complete = false;
+ server->resume();
+ return;
}
// Parse the message. On failure, return an appropriate error.
@@ -218,16 +265,20 @@
cerr << "[b10-recurse] returning " << error.getRcode().toText()
<< ": " << error.what() << endl;
}
- makeErrorMessage(message, response_renderer, error.getRcode(),
- impl_->verbose_mode_);
- return (true);
+ makeErrorMessage(message, renderer, error.getRcode(),
+ impl_->verbose_mode_);
+ complete = true;
+ server->resume();
+ return;
} catch (const Exception& ex) {
if (impl_->verbose_mode_) {
cerr << "[b10-recurse] returning SERVFAIL: " << ex.what() << endl;
}
- makeErrorMessage(message, response_renderer, Rcode::SERVFAIL(),
- impl_->verbose_mode_);
- return (true);
+ makeErrorMessage(message, renderer, Rcode::SERVFAIL(),
+ impl_->verbose_mode_);
+ complete = true;
+ server->resume();
+ return;
} // other exceptions will be handled at a higher layer.
if (impl_->verbose_mode_) {
@@ -237,79 +288,58 @@
// Perform further protocol-level validation.
if (message.getOpcode() == Opcode::NOTIFY()) {
- makeErrorMessage(message, response_renderer, Rcode::NOTAUTH(),
- impl_->verbose_mode_);
- return (true);
+ makeErrorMessage(message, renderer, Rcode::NOTAUTH(),
+ impl_->verbose_mode_);
+ complete = true;
} else if (message.getOpcode() != Opcode::QUERY()) {
if (impl_->verbose_mode_) {
cerr << "[b10-recurse] unsupported opcode" << endl;
}
- makeErrorMessage(message, response_renderer, Rcode::NOTIMP(),
- impl_->verbose_mode_);
- return (true);
- }
-
- if (message.getRRCount(Section::QUESTION()) != 1) {
- makeErrorMessage(message, response_renderer, Rcode::FORMERR(),
- impl_->verbose_mode_);
- return (true);
- }
-
- ConstQuestionPtr question = *message.beginQuestion();
- const RRType &qtype = question->getType();
- if (qtype == RRType::AXFR()) {
- if (io_message.getSocket().getProtocol() == IPPROTO_UDP) {
- makeErrorMessage(message, response_renderer, Rcode::FORMERR(),
- impl_->verbose_mode_);
+ makeErrorMessage(message, renderer, Rcode::NOTIMP(),
+ impl_->verbose_mode_);
+ complete = true;
+ } else if (message.getRRCount(Section::QUESTION()) != 1) {
+ makeErrorMessage(message, renderer, Rcode::FORMERR(),
+ impl_->verbose_mode_);
+ complete = true;
+ } else {
+ ConstQuestionPtr question = *message.beginQuestion();
+ const RRType &qtype = question->getType();
+ if (qtype == RRType::AXFR()) {
+ if (io_message.getSocket().getProtocol() == IPPROTO_UDP) {
+ makeErrorMessage(message, renderer, Rcode::FORMERR(),
+ impl_->verbose_mode_);
+ } else {
+ makeErrorMessage(message, renderer, Rcode::NOTIMP(),
+ impl_->verbose_mode_);
+ }
+ complete = true;
+ } else if (qtype == RRType::IXFR()) {
+ makeErrorMessage(message, renderer, Rcode::NOTIMP(),
+ impl_->verbose_mode_);
+ complete = true;
} else {
- makeErrorMessage(message, response_renderer, Rcode::NOTIMP(),
- impl_->verbose_mode_);
- }
- return (true);
- } else if (qtype == RRType::IXFR()) {
- makeErrorMessage(message, response_renderer, Rcode::NOTIMP(),
- impl_->verbose_mode_);
- return (true);
- } else {
- return (impl_->processNormalQuery(io_message, message,
- response_renderer));
- }
+ complete = impl_->processNormalQuery(io_message, *question,
+ message, renderer, server);
+ }
+ }
+
+ server->resume();
}
bool
-RecursorImpl::processNormalQuery(const IOMessage& io_message, Message& message,
- MessageRenderer& response_renderer)
+RecursorImpl::processNormalQuery(const IOMessage& io_message,
+ const Question& question, Message& message,
+ MessageRenderer& renderer,
+ BasicServer* server)
{
const bool dnssec_ok = message.isDNSSECSupported();
- const uint16_t remote_bufsize = message.getUDPSize();
message.makeResponse();
message.setRcode(Rcode::NOERROR());
message.setDNSSECSupported(dnssec_ok);
message.setUDPSize(RecursorImpl::DEFAULT_LOCAL_UDPSIZE);
-
- try {
- // HERE: initiate forward query, construct a reply
- } catch (const Exception& ex) {
- if (verbose_mode_) {
- cerr << "[b10-recurse] Internal error, returning SERVFAIL: " <<
- ex.what() << endl;
- }
- makeErrorMessage(message, response_renderer, Rcode::SERVFAIL(),
- verbose_mode_);
- return (true);
- }
-
- const bool udp_buffer =
- (io_message.getSocket().getProtocol() == IPPROTO_UDP);
- response_renderer.setLengthLimit(udp_buffer ? remote_bufsize : 65535);
- message.toWire(response_renderer);
- if (verbose_mode_) {
- cerr << "[b10-recurse] sending a response ("
- << response_renderer.getLength()
- << " bytes):\n" << message.toText() << endl;
- }
-
+ ioquery_.sendQuery(io_message, question, renderer, server);
return (true);
}
Modified: branches/trac327/src/bin/recurse/recursor.h
==============================================================================
--- branches/trac327/src/bin/recurse/recursor.h (original)
+++ branches/trac327/src/bin/recurse/recursor.h Thu Sep 30 06:40:49 2010
@@ -56,31 +56,36 @@
/// process. It's normally a reference to an xfr::XfroutClient object,
/// but can refer to a local mock object for testing (or other
/// experimental) purposes.
- Recursor();
+ Recursor(asiolink::IOService& io_service);
~Recursor();
//@}
/// \return \c true if the \message contains a response to be returned;
/// otherwise \c false.
- bool processMessage(const asiolink::IOMessage& io_message,
+ void processMessage(const asiolink::IOMessage& io_message,
isc::dns::Message& message,
- isc::dns::MessageRenderer& response_renderer);
+ isc::dns::MessageRenderer& response_renderer,
+ asiolink::BasicServer* server, bool& complete);
void setVerbose(bool on);
bool getVerbose() const;
isc::data::ConstElementPtr updateConfig(isc::data::ConstElementPtr config);
isc::config::ModuleCCSession* configSession() const;
void setConfigSession(isc::config::ModuleCCSession* config_session);
- asiolink::DNSProvider* getDNSProvider() {
- return (dns_provider_);
+ asiolink::DNSLookup* getDNSLookupProvider() {
+ return (dns_lookup_);
}
- asiolink::CheckinProvider* getCheckinProvider() {
- return (checkin_provider_);
+ asiolink::DNSAnswer* getDNSAnswerProvider() {
+ return (dns_answer_);
+ }
+ asiolink::IOCallback* getCheckinProvider() {
+ return (checkin_);
}
private:
RecursorImpl* impl_;
- asiolink::CheckinProvider* checkin_provider_;
- asiolink::DNSProvider* dns_provider_;
+ asiolink::IOCallback* checkin_;
+ asiolink::DNSLookup* dns_lookup_;
+ asiolink::DNSAnswer* dns_answer_;
};
#endif // __RECURSOR_H
Modified: branches/trac327/src/bin/recurse/tests/recursor_unittest.cc
==============================================================================
--- branches/trac327/src/bin/recurse/tests/recursor_unittest.cc (original)
+++ branches/trac327/src/bin/recurse/tests/recursor_unittest.cc Thu Sep 30 06:40:49 2010
@@ -44,6 +44,7 @@
namespace {
const char* const DEFAULT_REMOTE_ADDRESS = "192.0.2.1";
+const char* const TEST_PORT = "53535";
class DummySocket : public IOSocket {
private:
@@ -93,11 +94,20 @@
bool receive_ok_;
};
+ // A nonoperative task object to be used in calls to processMessage()
+ class MockTask : public BasicServer {
+ void operator()(asio::error_code ec UNUSED_PARAM,
+ size_t length UNUSED_PARAM)
+ {}
+ };
+
protected:
- RecursorTest() : server(),
+ RecursorTest() : ios(*TEST_PORT, true, false, NULL, NULL, NULL),
+ server(ios),
request_message(Message::RENDER),
- parse_message(Message::PARSE), default_qid(0x1035),
- opcode(Opcode(Opcode::QUERY())), qname("www.example.com"),
+ parse_message(Message::PARSE),
+ default_qid(0x1035), opcode(Opcode(Opcode::QUERY())),
+ qname("www.example.com"),
qclass(RRClass::IN()), qtype(RRType::A()),
io_message(NULL), endpoint(NULL), request_obuffer(0),
request_renderer(request_obuffer),
@@ -108,6 +118,8 @@
delete endpoint;
}
MockSession notify_session;
+ MockTask noOp;
+ IOService ios;
Recursor server;
Message request_message;
Message parse_message;
@@ -294,8 +306,11 @@
data[2] = ((i << 3) & 0xff);
parse_message.clear(Message::PARSE);
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message,
+ response_renderer, &noOp,
+ done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::NOTIMP(), i, QR_FLAG,
0, 0, 0, 0);
}
@@ -313,8 +328,9 @@
// Multiple questions. Should result in FORMERR.
TEST_F(RecursorTest, multiQuestion) {
createDataFromFile("multiquestion_fromWire");
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer, &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::FORMERR(), opcode.getCode(),
QR_FLAG, 2, 0, 0, 0);
@@ -334,8 +350,9 @@
// dropped.
TEST_F(RecursorTest, shortMessage) {
createDataFromFile("shortmessage_fromWire");
- EXPECT_EQ(false, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer, &noOp, done);
+ EXPECT_FALSE(done);
}
// Response messages. Must be silently dropped, whether it's a valid response
@@ -343,26 +360,28 @@
TEST_F(RecursorTest, response) {
// A valid (although unusual) response
createDataFromFile("simpleresponse_fromWire");
- EXPECT_EQ(false, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer, &noOp, done);
+ EXPECT_FALSE(done);
// A response with a broken question section. must be dropped rather than
// returning FORMERR.
createDataFromFile("shortresponse_fromWire");
- EXPECT_EQ(false, server.processMessage(*io_message, parse_message,
- response_renderer));
+ server.processMessage(*io_message, parse_message, response_renderer, &noOp, done);
+ EXPECT_FALSE(done);
// A response to iquery. must be dropped rather than returning NOTIMP.
createDataFromFile("iqueryresponse_fromWire");
- EXPECT_EQ(false, server.processMessage(*io_message, parse_message,
- response_renderer));
+ server.processMessage(*io_message, parse_message, response_renderer, &noOp, done);
+ EXPECT_FALSE(done);
}
// Query with a broken question
TEST_F(RecursorTest, shortQuestion) {
createDataFromFile("shortquestion_fromWire");
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer, &noOp, done);
+ EXPECT_TRUE(done);
// Since the query's question is broken, the question section of the
// response should be empty.
headerCheck(parse_message, default_qid, Rcode::FORMERR(), opcode.getCode(),
@@ -372,8 +391,9 @@
// Query with a broken answer section
TEST_F(RecursorTest, shortAnswer) {
createDataFromFile("shortanswer_fromWire");
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer, &noOp, done);
+ EXPECT_TRUE(done);
// This is a bogus query, but question section is valid. So the response
// should copy the question section.
@@ -391,8 +411,9 @@
// Query with unsupported version of EDNS.
TEST_F(RecursorTest, ednsBadVers) {
createDataFromFile("queryBadEDNS_fromWire");
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer, &noOp, done);
+ EXPECT_TRUE(done);
// The response must have an EDNS OPT RR in the additional section.
// Note that the DNSSEC DO bit is cleared even if this bit in the query
@@ -407,8 +428,9 @@
// AXFR over UDP is invalid and should result in FORMERR.
createRequestPacket(opcode, Name("example.com"), RRClass::IN(),
RRType::AXFR(), IPPROTO_UDP);
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer, &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::FORMERR(), opcode.getCode(),
QR_FLAG, 1, 0, 0, 0);
}
@@ -417,8 +439,9 @@
createRequestPacket(opcode, Name("example.com"), RRClass::IN(),
RRType::AXFR(), IPPROTO_TCP);
// AXFR is not implemented and should always send NOTIMP.
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer, &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::NOTIMP(), opcode.getCode(),
QR_FLAG, 1, 0, 0, 0);
}
@@ -431,8 +454,9 @@
request_message.setQid(default_qid);
request_message.toWire(request_renderer);
createRequestPacket(IPPROTO_UDP);
- EXPECT_EQ(true, server.processMessage(*io_message, parse_message,
- response_renderer));
+ bool done;
+ server.processMessage(*io_message, parse_message, response_renderer, &noOp, done);
+ EXPECT_TRUE(done);
headerCheck(parse_message, default_qid, Rcode::NOTAUTH(),
Opcode::NOTIFY().getCode(), QR_FLAG, 0, 0, 0, 0);
}
Modified: branches/trac327/src/lib/asiolink/asiolink.cc
==============================================================================
--- branches/trac327/src/lib/asiolink/asiolink.cc (original)
+++ branches/trac327/src/lib/asiolink/asiolink.cc Thu Sep 30 06:40:49 2010
@@ -94,11 +94,32 @@
remote_endpoint_(remote_endpoint)
{}
+IOQuery::IOQuery(IOService& io_service) : io_service_(io_service) {}
+
+void
+IOQuery::sendQuery(const IOMessage& io_message,
+ const Question& question, MessageRenderer& renderer,
+ BasicServer* completer)
+{
+ error_code err;
+ // XXX: hard-code the address for now:
+ const ip::address addr = ip::address::from_string("192.168.1.12", err);
+
+ // XXX: eventually we will need to be able to determine whether
+ // the message should be sent via TCP or UDP, or sent initially via
+ // UDP and then fall back to TCP on failure, but for the moment
+ // we're only going to handle UDP.
+ UDPQuery* query = new UDPQuery(io_service_.get_io_service(), io_message,
+ question, addr, renderer, completer);
+ (*query)();
+}
+
class IOServiceImpl {
public:
IOServiceImpl(const char& port,
const ip::address* v4addr, const ip::address* v6addr,
- CheckinProvider* checkin, DNSProvider* process);
+ IOCallback* checkin, DNSLookup* lookup,
+ DNSAnswer* answer);
asio::io_service io_service_;
typedef boost::shared_ptr<UDPServer> UDPServerPtr;
@@ -112,12 +133,13 @@
IOServiceImpl::IOServiceImpl(const char& port,
const ip::address* const v4addr,
const ip::address* const v6addr,
- CheckinProvider* checkin, DNSProvider* process) :
+ IOCallback* checkin,
+ DNSLookup* lookup,
+ DNSAnswer* answer) :
udp4_server_(UDPServerPtr()), udp6_server_(UDPServerPtr()),
tcp4_server_(TCPServerPtr()), tcp6_server_(TCPServerPtr())
{
uint16_t portnum;
-
try {
// XXX: SunStudio with stlport4 doesn't reject some invalid
// representation such as "-1" by lexical_cast<uint16_t>, so
@@ -137,21 +159,21 @@
if (v4addr != NULL) {
udp4_server_ = UDPServerPtr(new UDPServer(io_service_,
*v4addr, portnum,
- checkin, process));
+ checkin, lookup, answer));
(*udp4_server_)();
tcp4_server_ = TCPServerPtr(new TCPServer(io_service_,
*v4addr, portnum,
- checkin, process));
+ checkin, lookup, answer));
(*tcp4_server_)();
}
if (v6addr != NULL) {
udp6_server_ = UDPServerPtr(new UDPServer(io_service_,
*v6addr, portnum,
- checkin, process));
+ checkin, lookup, answer));
(*udp6_server_)();
tcp6_server_ = TCPServerPtr(new TCPServer(io_service_,
*v6addr, portnum,
- checkin, process));
+ checkin, lookup, answer));
(*tcp6_server_)();
}
} catch (const asio::system_error& err) {
@@ -164,7 +186,9 @@
}
IOService::IOService(const char& port, const char& address,
- CheckinProvider* checkin, DNSProvider* process) :
+ IOCallback* checkin,
+ DNSLookup* lookup,
+ DNSAnswer* answer) :
impl_(NULL)
{
error_code err;
@@ -177,20 +201,21 @@
impl_ = new IOServiceImpl(port,
addr.is_v4() ? &addr : NULL,
addr.is_v6() ? &addr : NULL,
- checkin, process);
+ checkin, lookup, answer);
}
IOService::IOService(const char& port,
const bool use_ipv4, const bool use_ipv6,
- CheckinProvider* checkin, DNSProvider* process) :
+ IOCallback* checkin,
+ DNSLookup* lookup,
+ DNSAnswer* answer) :
impl_(NULL)
{
const ip::address v4addr_any = ip::address(ip::address_v4::any());
const ip::address* const v4addrp = use_ipv4 ? &v4addr_any : NULL;
const ip::address v6addr_any = ip::address(ip::address_v6::any());
const ip::address* const v6addrp = use_ipv6 ? &v6addr_any : NULL;
- impl_ = new IOServiceImpl(port, v4addrp, v6addrp,
- checkin, process);
+ impl_ = new IOServiceImpl(port, v4addrp, v6addrp, checkin, lookup, answer);
}
IOService::~IOService() {
Modified: branches/trac327/src/lib/asiolink/asiolink.h
==============================================================================
--- branches/trac327/src/lib/asiolink/asiolink.h (original)
+++ branches/trac327/src/lib/asiolink/asiolink.h Thu Sep 30 06:40:49 2010
@@ -30,6 +30,7 @@
#include <dns/message.h>
#include <dns/messagerenderer.h>
+#include <dns/question.h>
#include <exceptions/exceptions.h>
@@ -386,7 +387,34 @@
const IOEndpoint& remote_endpoint_;
};
-/// \brief The \c DNSProvider class is an abstract base class for a DNS
+/// XXX: need to add doc
+class BasicServer {
+public:
+ BasicServer() : self(this) {}
+ virtual void operator()(asio::error_code ec = asio::error_code(),
+ size_t length = 0)
+ {
+ (*self)(ec, length);
+ }
+
+ virtual void doLookup() {}
+ virtual void resume() {}
+private:
+ BasicServer* self;
+};
+
+template <typename T>
+class LookupHandler {
+public:
+ LookupHandler(T caller) : caller_(caller) {}
+ void operator()() {
+ caller_.doLookup();
+ }
+private:
+ T caller_;
+};
+
+/// \brief The \c DNSLookup class is an abstract base class for a DNS
/// provider function.
///
/// Specific derived class implementations are hidden within the
@@ -394,7 +422,7 @@
/// as functions via the operator() interface. Pointers to these
/// instances can then be provided to the \c IOService class
/// via its constructor.
-class DNSProvider {
+class DNSLookup {
///
/// \name Constructors and Destructor
///
@@ -402,32 +430,44 @@
/// intentionally defined as private, making this class non-copyable.
//@{
private:
- DNSProvider(const DNSProvider& source);
- DNSProvider& operator=(const DNSProvider& source);
+ DNSLookup(const DNSLookup& source);
+ DNSLookup& operator=(const DNSLookup& source);
protected:
/// \brief The default constructor.
///
/// This is intentionally defined as \c protected as this base class
/// should never be instantiated (except as part of a derived class).
- DNSProvider() {}
+ DNSLookup() : self(this) {}
public:
/// \brief The destructor
- virtual ~DNSProvider() {}
- //@}
- virtual bool operator()(const IOMessage& io_message,
+ virtual ~DNSLookup() {}
+ /// \brief The function operator
+ ///
+ /// This makes its call indirectly via the "self" pointer, ensuring
+ /// that the function ultimately invoked will be the one in the derived
+ /// class.
+ virtual void operator()(const IOMessage& io_message,
isc::dns::Message& dns_message,
- isc::dns::MessageRenderer& renderer) const = 0;
-};
-
-/// \brief The \c CheckinProvider class is an abstract base class for a
-/// checkin function.
+ isc::dns::MessageRenderer& renderer,
+ BasicServer* server, bool& success)
+ const
+ {
+ (*self)(io_message, dns_message, renderer, server, success);
+ }
+ //@}
+private:
+ DNSLookup* self;
+};
+
+/// \brief The \c DNSAnswer class is an abstract base class for a DNS
+/// provider function.
///
/// Specific derived class implementations are hidden within the
/// implementation. Instances of the derived classes can be called
/// as functions via the operator() interface. Pointers to these
/// instances can then be provided to the \c IOService class
/// via its constructor.
-class CheckinProvider {
+class DNSAnswer {
///
/// \name Constructors and Destructor
///
@@ -435,19 +475,62 @@
/// intentionally defined as private, making this class non-copyable.
//@{
private:
- CheckinProvider(const CheckinProvider& source);
- CheckinProvider& operator=(const CheckinProvider& source);
+ DNSAnswer(const DNSAnswer& source);
+ DNSAnswer& operator=(const DNSAnswer& source);
protected:
/// \brief The default constructor.
///
/// This is intentionally defined as \c protected as this base class
/// should never be instantiated (except as part of a derived class).
- CheckinProvider() {}
+ DNSAnswer() {}
public:
/// \brief The destructor
- virtual ~CheckinProvider() {}
- //@}
- virtual void operator()(void) const = 0;
+ virtual ~DNSAnswer() {}
+ /// \brief The function operator
+ virtual void operator()(const IOMessage& io_message,
+ isc::dns::Message& dns_message,
+ isc::dns::MessageRenderer& renderer) const = 0;
+ //@}
+};
+
+/// \brief The \c IOCallback class is an abstract base class for a
+/// simple callback function with the signature:
+///
+/// Specific derived class implementations are hidden within the
+/// implementation. Instances of the derived classes can be called
+/// as functions via the operator() interface. Pointers to these
+/// instances can then be provided to the \c IOService class
+/// via its constructor.
+class IOCallback {
+ ///
+ /// \name Constructors and Destructor
+ ///
+ /// Note: The copy constructor and the assignment operator are
+ /// intentionally defined as private, making this class non-copyable.
+ //@{
+private:
+ IOCallback(const IOCallback& source);
+ IOCallback& operator=(const IOCallback& source);
+protected:
+ /// \brief The default constructor.
+ ///
+ /// This is intentionally defined as \c protected as this base class
+ /// should never be instantiated (except as part of a derived class).
+ IOCallback() : self(this) {}
+public:
+ /// \brief The destructor
+ virtual ~IOCallback() {}
+ /// \brief The function operator
+ ///
+ /// This makes its call indirectly via the "self" pointer, ensuring
+ /// that the function ultimately invoked will be the one in the derived
+ /// class.
+ virtual void operator()(const IOMessage& io_message) const {
+ (*self)(io_message);
+ }
+ //@}
+private:
+ IOCallback* self;
};
/// \brief The \c IOService class is a wrapper for the ASIO \c io_service
@@ -474,7 +557,9 @@
/// \brief The constructor with a specific IP address and port on which
/// the services listen on.
IOService(const char& port, const char& address,
- CheckinProvider* checkin, DNSProvider* process);
+ IOCallback* checkin,
+ DNSLookup* lookup,
+ DNSAnswer* answer);
/// \brief The constructor with a specific port on which the services
/// listen on.
///
@@ -482,7 +567,9 @@
/// IPv4/IPv6 services will be available if and only if \c use_ipv4
/// or \c use_ipv6 is \c true, respectively.
IOService(const char& port, const bool use_ipv4, const bool use_ipv6,
- CheckinProvider* checkin, DNSProvider* process);
+ IOCallback* checkin,
+ DNSLookup* lookup,
+ DNSAnswer* answer);
/// \brief The destructor.
~IOService();
//@}
@@ -509,6 +596,19 @@
IOServiceImpl* impl_;
};
+/// \brief The \c IOQuery class provides a layer of abstraction around
+/// the ASIO code that carries out upstream queries.
+class IOQuery {
+public:
+ IOQuery(IOService& io_service);
+ void sendQuery(const IOMessage& io_message,
+ const isc::dns::Question& question,
+ isc::dns::MessageRenderer& renderer,
+ BasicServer* caller);
+private:
+ IOService& io_service_;
+};
+
} // asiolink
#endif // __ASIOLINK_H
Modified: branches/trac327/src/lib/asiolink/internal/coroutine.h
==============================================================================
--- branches/trac327/src/lib/asiolink/internal/coroutine.h (original)
+++ branches/trac327/src/lib/asiolink/internal/coroutine.h Thu Sep 30 06:40:49 2010
@@ -18,6 +18,7 @@
bool is_child() const { return value_ < 0; }
bool is_parent() const { return !is_child(); }
bool is_complete() const { return value_ == -1; }
+ int get_value() const { return value_; }
private:
friend class coroutine_ref;
int value_;
Modified: branches/trac327/src/lib/asiolink/internal/tcpdns.h
==============================================================================
--- branches/trac327/src/lib/asiolink/internal/tcpdns.h (original)
+++ branches/trac327/src/lib/asiolink/internal/tcpdns.h Thu Sep 30 06:40:49 2010
@@ -73,19 +73,25 @@
//
// Asynchronous TCP server coroutine
//
-class TCPServer : public coroutine {
+class TCPServer : public virtual BasicServer, public virtual coroutine {
public:
explicit TCPServer(asio::io_service& io_service,
const asio::ip::address& addr, const uint16_t port,
- CheckinProvider* checkin = NULL,
- DNSProvider* process = NULL);
+ const IOCallback* checkin = NULL,
+ const DNSLookup* lookup = NULL,
+ const DNSAnswer* answer = NULL);
void operator()(asio::error_code ec = asio::error_code(),
size_t length = 0);
+ void doLookup();
+ void resume();
+
private:
enum { MAX_LENGTH = 65535 };
static const size_t TCP_MESSAGE_LENGTHSIZE = 2;
+
+ asio::io_service& io_;
// Class member variables which are dynamic, and changes to which
// need to accessible from both sides of a coroutine fork or from
@@ -95,16 +101,22 @@
boost::shared_ptr<asio::ip::tcp::acceptor> acceptor_;
boost::shared_ptr<asio::ip::tcp::socket> socket_;
boost::shared_ptr<isc::dns::MessageRenderer> renderer_;
+ boost::shared_ptr<isc::dns::OutputBuffer> lenbuf_;
+ boost::shared_ptr<isc::dns::OutputBuffer> respbuf_;
+ boost::shared_ptr<asiolink::IOEndpoint> peer_;
+ boost::shared_ptr<asiolink::IOSocket> iosock_;
+ boost::shared_ptr<asiolink::IOMessage> io_message_;
boost::shared_ptr<char> data_;
// State information that is entirely internal to a given instance
// of the coroutine can be declared here.
- isc::dns::OutputBuffer respbuf_;
- isc::dns::OutputBuffer lenbuf_;
+ size_t bytes_;
+ bool done_;
// Callbacks
- const CheckinProvider* checkin_callback_;
- const DNSProvider* dns_callback_;
+ const IOCallback* checkin_callback_;
+ const DNSLookup* lookup_callback_;
+ const DNSAnswer* answer_callback_;
};
}
Modified: branches/trac327/src/lib/asiolink/internal/udpdns.h
==============================================================================
--- branches/trac327/src/lib/asiolink/internal/udpdns.h (original)
+++ branches/trac327/src/lib/asiolink/internal/udpdns.h Thu Sep 30 06:40:49 2010
@@ -57,7 +57,6 @@
const asio::ip::udp::endpoint& asio_endpoint_;
};
-class UDPBuffers;
class UDPSocket : public IOSocket {
private:
UDPSocket(const UDPSocket& source);
@@ -69,20 +68,30 @@
private:
asio::ip::udp::socket& socket_;
};
+
//
// Asynchronous UDP server coroutine
//
-class UDPServer : public coroutine {
+class UDPServer : public virtual BasicServer, public virtual coroutine {
public:
explicit UDPServer(asio::io_service& io_service,
const asio::ip::address& addr, const uint16_t port,
- CheckinProvider* checkin = NULL,
- DNSProvider* process = NULL);
+ IOCallback* checkin = NULL,
+ DNSLookup* lookup = NULL,
+ DNSAnswer* answer = NULL);
+
void operator()(asio::error_code ec = asio::error_code(),
size_t length = 0);
+ enum { MAX_LENGTH = 4096 };
+ char answer[MAX_LENGTH];
+ asio::ip::udp::endpoint peer;
+
+ void doLookup();
+ void resume();
+
private:
- enum { MAX_LENGTH = 4096 };
+ asio::io_service& io_;
// Class member variables which are dynamic, and changes to which
// need to accessible from both sides of a coroutine fork or from
@@ -93,17 +102,45 @@
boost::shared_ptr<char> data_;
boost::shared_ptr<asio::ip::udp::endpoint> sender_;
boost::shared_ptr<isc::dns::MessageRenderer> renderer_;
+ boost::shared_ptr<isc::dns::Message> message_;
+ boost::shared_ptr<asiolink::IOEndpoint> peer_;
+ boost::shared_ptr<asiolink::IOSocket> iosock_;
+ boost::shared_ptr<asiolink::IOMessage> io_message_;
// State information that is entirely internal to a given instance
// of the coroutine can be declared here.
isc::dns::OutputBuffer respbuf_;
size_t bytes_;
+ bool done_;
// Callbacks
- const CheckinProvider* checkin_callback_;
- const DNSProvider* dns_callback_;
+ const IOCallback* checkin_callback_;
+ const DNSLookup* lookup_callback_;
+ const DNSAnswer* answer_callback_;
};
+//
+// Asynchronous UDP coroutine for upstream queries
+//
+class UDPQuery : public coroutine {
+public:
+ explicit UDPQuery(asio::io_service& io_service,
+ const IOMessage& io_message,
+ const isc::dns::Question& q,
+ const asio::ip::address& addr,
+ isc::dns::MessageRenderer& renderer,
+ BasicServer* caller);
+ void operator()(asio::error_code ec = asio::error_code(),
+ size_t length = 0);
+private:
+ boost::shared_ptr<asio::ip::udp::socket> socket_;
+ asio::ip::udp::endpoint server_;
+ isc::dns::Question question_;
+ char* data_;
+ size_t datalen_;
+ isc::dns::OutputBuffer msgbuf_;
+ BasicServer* caller_;
+};
}
#endif // __UDPDNS_H
Modified: branches/trac327/src/lib/asiolink/tcpdns.cc
==============================================================================
--- branches/trac327/src/lib/asiolink/tcpdns.cc (original)
+++ branches/trac327/src/lib/asiolink/tcpdns.cc Thu Sep 30 06:40:49 2010
@@ -76,9 +76,12 @@
TCPServer::TCPServer(io_service& io_service,
const ip::address& addr, const uint16_t port,
- CheckinProvider* checkin, DNSProvider* process) :
- respbuf_(0), lenbuf_(TCP_MESSAGE_LENGTHSIZE),
- checkin_callback_(checkin), dns_callback_(process)
+ const IOCallback* checkin,
+ const DNSLookup* lookup,
+ const DNSAnswer* answer) :
+ io_(io_service), done_(false),
+ checkin_callback_(checkin), lookup_callback_(lookup),
+ answer_callback_(answer)
{
tcp::endpoint endpoint(addr, port);
acceptor_.reset(new tcp::acceptor(io_service));
@@ -91,6 +94,8 @@
acceptor_->set_option(tcp::acceptor::reuse_address(true));
acceptor_->bind(endpoint);
acceptor_->listen();
+ lenbuf_.reset(new OutputBuffer(TCP_MESSAGE_LENGTHSIZE));
+ respbuf_.reset(new OutputBuffer(0));
}
void
@@ -99,71 +104,83 @@
return;
}
- bool done = false;
+ boost::array<const_buffer,2> bufs;
CORO_REENTER (this) {
do {
socket_.reset(new tcp::socket(acceptor_->get_io_service()));
CORO_YIELD acceptor_->async_accept(*socket_, *this);
- CORO_FORK TCPServer(*this)();
- } while (is_child());
+ CORO_FORK io_.post(TCPServer(*this));
+ } while (is_parent());
- // Perform any necessary operations prior to processing an incoming
+ // Instantiate the data buffer that will be used by the
+ // asynchronous read call.
+ data_ = boost::shared_ptr<char>(new char[MAX_LENGTH]);
+
+ // Read the message length.
+ CORO_YIELD async_read(*socket_, asio::buffer(data_.get(),
+ TCP_MESSAGE_LENGTHSIZE), *this);
+
+ // Now read the message itself. (This is done in a different scope
+ // because CORO_REENTER is implemented as a switch statement; the
+ // inline variable declaration of "msglen" and "dnsbuffer" are
+ // therefore not permitted in this scope.)
+ CORO_YIELD {
+ InputBuffer dnsbuffer((const void *) data_.get(), length);
+ uint16_t msglen = dnsbuffer.readUint16();
+ async_read(*socket_, asio::buffer(data_.get(), msglen), *this);
+ }
+
+ // Store the io_message data.
+ peer_.reset(new TCPEndpoint(socket_->remote_endpoint()));
+ iosock_.reset(new TCPSocket(*socket_));
+ io_message_.reset(new IOMessage(data_.get(), length, *iosock_, *peer_));
+
+ // Perform any necessary operations prior to processing the incoming
// packet (e.g., checking for queued configuration messages).
//
// (XXX: it may be a performance issue to have this called for
// every single incoming packet; we may wish to throttle it somehow
// in the future.)
if (checkin_callback_ != NULL) {
- (*checkin_callback_)();
+ (*checkin_callback_)(*io_message_);
}
- // Instantiate the data buffer that will be used by the
- // asynchronous read call.
- data_ = boost::shared_ptr<char>(new char[MAX_LENGTH]);
- CORO_YIELD async_read(*socket_, asio::buffer(data_.get(),
- TCP_MESSAGE_LENGTHSIZE),
- *this);
-
- CORO_YIELD {
- InputBuffer dnsbuffer((const void *) data_.get(), length);
- uint16_t msglen = dnsbuffer.readUint16();
- async_read(*socket_, asio::buffer(data_.get(), msglen), *this);
- }
-
- // Stop here if we don't have a DNS callback function
- if (dns_callback_ == NULL) {
+ // Just stop here if we don't have a DNS callback function.
+ if (lookup_callback_ == NULL) {
CORO_YIELD return;
}
- // Instantiate the objects that will be needed by the
- // DNS callback and the asynchronous write calls.
- respbuf_.clear();
- renderer_.reset(new MessageRenderer(respbuf_));
+ // Reset or instantiate objects that will be needed by the
+ // DNS lookup and the write call.
+ respbuf_->clear();
+ renderer_.reset(new MessageRenderer(*respbuf_));
- // Process the DNS message. (Must be done in a separate scope
- // because CORO_REENTER is implemented with a switch statement
- // and inline variable declaration isn't allowed.)
- {
- TCPEndpoint peer(socket_->remote_endpoint());
- TCPSocket iosock(*socket_);
- IOMessage io_message(data_.get(), length, iosock, peer);
- Message message(Message::PARSE);
- done = (*dns_callback_)(io_message, message, *renderer_);
- }
+ // Process the DNS message.
+ bytes_ = length;
+ CORO_YIELD io_.post(LookupHandler<TCPServer>(*this));
- if (!done) {
+ if (!done_) {
CORO_YIELD return;
}
- CORO_YIELD {
- lenbuf_.clear();
- lenbuf_.writeUint16(respbuf_.getLength());
- boost::array<const_buffer,2> bufs;
- bufs[0] = buffer(lenbuf_.getData(), lenbuf_.getLength());
- bufs[1] = buffer(respbuf_.getData(), respbuf_.getLength());
- async_write(*socket_, bufs, *this);
- }
+ // Send the response.
+ lenbuf_->clear();
+ lenbuf_->writeUint16(respbuf_->getLength());
+ bufs[0] = buffer(lenbuf_->getData(), lenbuf_->getLength());
+ bufs[1] = buffer(respbuf_->getData(), respbuf_->getLength());
+ CORO_YIELD async_write(*socket_, bufs, *this);
}
}
+void
+TCPServer::doLookup() {
+ Message message(Message::PARSE);
+ (*lookup_callback_)(*io_message_, message, *renderer_, this, done_);
}
+
+void
+TCPServer::resume() {
+ io_.post(*this);
+}
+
+}
Modified: branches/trac327/src/lib/asiolink/tests/asio_link_unittest.cc
==============================================================================
--- branches/trac327/src/lib/asiolink/tests/asio_link_unittest.cc (original)
+++ branches/trac327/src/lib/asiolink/tests/asio_link_unittest.cc Thu Sep 30 06:40:49 2010
@@ -120,52 +120,52 @@
}
TEST(IOServiceTest, badPort) {
- EXPECT_THROW(IOService(*"65536", true, false, NULL, NULL), IOError);
- EXPECT_THROW(IOService(*"5300.0", true, false, NULL, NULL), IOError);
- EXPECT_THROW(IOService(*"-1", true, false, NULL, NULL), IOError);
- EXPECT_THROW(IOService(*"domain", true, false, NULL, NULL), IOError);
+ EXPECT_THROW(IOService(*"65536", true, false, NULL, NULL, NULL), IOError);
+ EXPECT_THROW(IOService(*"5300.0", true, false, NULL, NULL, NULL), IOError);
+ EXPECT_THROW(IOService(*"-1", true, false, NULL, NULL, NULL), IOError);
+ EXPECT_THROW(IOService(*"domain", true, false, NULL, NULL, NULL), IOError);
}
TEST(IOServiceTest, badAddress) {
- EXPECT_THROW(IOService(*TEST_PORT, *"192.0.2.1.1", NULL, NULL), IOError);
- EXPECT_THROW(IOService(*TEST_PORT, *"2001:db8:::1", NULL, NULL), IOError);
- EXPECT_THROW(IOService(*TEST_PORT, *"localhost", NULL, NULL), IOError);
+ EXPECT_THROW(IOService(*TEST_PORT, *"192.0.2.1.1", NULL, NULL, NULL), IOError);
+ EXPECT_THROW(IOService(*TEST_PORT, *"2001:db8:::1", NULL, NULL, NULL), IOError);
+ EXPECT_THROW(IOService(*TEST_PORT, *"localhost", NULL, NULL, NULL), IOError);
}
TEST(IOServiceTest, unavailableAddress) {
// These addresses should generally be unavailable as a valid local
// address, although there's no guarantee in theory.
- EXPECT_THROW(IOService(*TEST_PORT, *"255.255.0.0", NULL, NULL), IOError);
+ EXPECT_THROW(IOService(*TEST_PORT, *"255.255.0.0", NULL, NULL, NULL), IOError);
// Some OSes would simply reject binding attempt for an AF_INET6 socket
// to an IPv4-mapped IPv6 address. Even if those that allow it, since
// the corresponding IPv4 address is the same as the one used in the
// AF_INET socket case above, it should at least show the same result
// as the previous one.
- EXPECT_THROW(IOService(*TEST_PORT, *"::ffff:255.255.0.0", NULL, NULL), IOError);
+ EXPECT_THROW(IOService(*TEST_PORT, *"::ffff:255.255.0.0", NULL, NULL, NULL), IOError);
}
TEST(IOServiceTest, duplicateBind) {
// In each sub test case, second attempt should fail due to duplicate bind
// IPv6, "any" address
- IOService* io_service = new IOService(*TEST_PORT, false, true, NULL, NULL);
- EXPECT_THROW(IOService(*TEST_PORT, false, true, NULL, NULL), IOError);
+ IOService* io_service = new IOService(*TEST_PORT, false, true, NULL, NULL, NULL);
+ EXPECT_THROW(IOService(*TEST_PORT, false, true, NULL, NULL, NULL), IOError);
delete io_service;
// IPv6, specific address
- io_service = new IOService(*TEST_PORT, *TEST_IPV6_ADDR, NULL, NULL);
- EXPECT_THROW(IOService(*TEST_PORT, *TEST_IPV6_ADDR, NULL, NULL), IOError);
+ io_service = new IOService(*TEST_PORT, *TEST_IPV6_ADDR, NULL, NULL, NULL);
+ EXPECT_THROW(IOService(*TEST_PORT, *TEST_IPV6_ADDR, NULL, NULL, NULL), IOError);
delete io_service;
// IPv4, "any" address
- io_service = new IOService(*TEST_PORT, true, false, NULL, NULL);
- EXPECT_THROW(IOService(*TEST_PORT, true, false, NULL, NULL), IOError);
+ io_service = new IOService(*TEST_PORT, true, false, NULL, NULL, NULL);
+ EXPECT_THROW(IOService(*TEST_PORT, true, false, NULL, NULL, NULL), IOError);
delete io_service;
// IPv4, specific address
- io_service = new IOService(*TEST_PORT, *TEST_IPV4_ADDR, NULL, NULL);
- EXPECT_THROW(IOService(*TEST_PORT, *TEST_IPV4_ADDR, NULL, NULL), IOError);
+ io_service = new IOService(*TEST_PORT, *TEST_IPV4_ADDR, NULL, NULL, NULL);
+ EXPECT_THROW(IOService(*TEST_PORT, *TEST_IPV4_ADDR, NULL, NULL, NULL), IOError);
delete io_service;
}
@@ -211,7 +211,12 @@
if (sock_ != -1) {
close(sock_);
}
- delete io_service_;
+ if (io_service_ != NULL) {
+ delete io_service_;
+ }
+ if (callback_ != NULL) {
+ delete callback_;
+ }
}
void sendUDP(const int family) {
res_ = resolveAddress(family, SOCK_DGRAM, IPPROTO_UDP);
@@ -246,14 +251,15 @@
void setIOService(const char& address) {
delete io_service_;
io_service_ = NULL;
- ASIOCallBack* cb = new ASIOCallBack(this);
- io_service_ = new IOService(*TEST_PORT, address, NULL, cb);
+ callback_ = new ASIOCallBack(this);
+ io_service_ = new IOService(*TEST_PORT, address, callback_, NULL, NULL);
}
void setIOService(const bool use_ipv4, const bool use_ipv6) {
delete io_service_;
io_service_ = NULL;
- ASIOCallBack* cb = new ASIOCallBack(this);
- io_service_ = new IOService(*TEST_PORT, use_ipv4, use_ipv6, NULL, cb);
+ callback_ = new ASIOCallBack(this);
+ io_service_ = new IOService(*TEST_PORT, use_ipv4, use_ipv6, callback_,
+ NULL, NULL);
}
void doTest(const int family, const int protocol) {
if (protocol == IPPROTO_UDP) {
@@ -280,15 +286,11 @@
expected_data, expected_datasize);
}
private:
- class ASIOCallBack : public DNSProvider {
+ class ASIOCallBack : public IOCallback {
public:
ASIOCallBack(ASIOLinkTest* test_obj) : test_obj_(test_obj) {}
- bool operator()(const IOMessage& io_message,
- isc::dns::Message& dns_message UNUSED_PARAM,
- isc::dns::MessageRenderer& renderer UNUSED_PARAM) const
- {
+ void operator()(const IOMessage& io_message) const {
test_obj_->callBack(io_message);
- return (true);
}
private:
ASIOLinkTest* test_obj_;
@@ -306,6 +308,7 @@
}
protected:
IOService* io_service_;
+ ASIOCallBack* callback_;
int callback_protocol_;
int callback_native_;
string callback_address_;
@@ -316,7 +319,7 @@
};
ASIOLinkTest::ASIOLinkTest() :
- io_service_(NULL), sock_(-1), res_(NULL)
+ io_service_(NULL), callback_(NULL), sock_(-1), res_(NULL)
{
setIOService(true, true);
}
Modified: branches/trac327/src/lib/asiolink/udpdns.cc
==============================================================================
--- branches/trac327/src/lib/asiolink/udpdns.cc (original)
+++ branches/trac327/src/lib/asiolink/udpdns.cc Thu Sep 30 06:40:49 2010
@@ -22,6 +22,8 @@
#include <sys/socket.h>
#include <netinet/in.h>
+#include <boost/bind.hpp>
+
#include <asio.hpp>
#include <boost/lexical_cast.hpp>
@@ -75,8 +77,12 @@
UDPServer::UDPServer(io_service& io_service,
const ip::address& addr, const uint16_t port,
- CheckinProvider* checkin, DNSProvider* process) :
- respbuf_(0), checkin_callback_(checkin), dns_callback_(process)
+ IOCallback* checkin,
+ DNSLookup* lookup,
+ DNSAnswer* answer) :
+ io_(io_service), respbuf_(0), done_(false),
+ checkin_callback_(checkin), lookup_callback_(lookup),
+ answer_callback_(answer)
{
// Wwe use a different instantiation for v4,
// otherwise asio will bind to both v4 and v6
@@ -91,7 +97,6 @@
void
UDPServer::operator()(error_code ec, size_t length) {
- bool done = false;
CORO_REENTER (this) {
do {
// Instantiate the data buffer and endpoint that will
@@ -105,8 +110,13 @@
} while (ec || length == 0);
bytes_ = length;
- CORO_FORK UDPServer(*this)();
- } while (is_child());
+ CORO_FORK io_.post(UDPServer(*this));
+ } while (is_parent());
+
+ // Store the io_message data.
+ peer_.reset(new UDPEndpoint(*sender_));
+ iosock_.reset(new UDPSocket(*socket_));
+ io_message_.reset(new IOMessage(data_.get(), bytes_, *iosock_, *peer_));
// Perform any necessary operations prior to processing an incoming
// packet (e.g., checking for queued configuration messages).
@@ -115,11 +125,11 @@
// every single incoming packet; we may wish to throttle it somehow
// in the future.)
if (checkin_callback_ != NULL) {
- (*checkin_callback_)();
+ (*checkin_callback_)(*io_message_);
}
// Stop here if we don't have a DNS callback function
- if (dns_callback_ == NULL) {
+ if (lookup_callback_ == NULL) {
CORO_YIELD return;
}
@@ -127,26 +137,70 @@
// asynchronous send call.
respbuf_.clear();
renderer_.reset(new MessageRenderer(respbuf_));
-
- // Process the DNS message. (Must be done in a separate scope
- // because CORO_REENTER is implemented with a switch statement,
- // and thus normal inline variable declaration isn't allowed.)
- {
- UDPEndpoint peer(*sender_);
- UDPSocket iosock(*socket_);
- IOMessage io_message(data_.get(), bytes_, iosock, peer);
- Message message(Message::PARSE);
- done = (*dns_callback_)(io_message, message, *renderer_);
- }
-
- if (!done) {
+ message_.reset(new Message(Message::PARSE));
+
+ CORO_YIELD io_.post(LookupHandler<UDPServer>(*this));
+
+ if (!done_) {
CORO_YIELD return;
}
+ (*answer_callback_)(*io_message_, *message_, *renderer_);
CORO_YIELD socket_->async_send_to(buffer(respbuf_.getData(),
respbuf_.getLength()),
*sender_, *this);
}
}
-}
+void
+UDPServer::doLookup() {
+ (*lookup_callback_)(*io_message_, *message_, *renderer_, this, done_);
+}
+
+void
+UDPServer::resume() {
+ io_.post(*this);
+}
+
+UDPQuery::UDPQuery(io_service& io_service, const IOMessage& io_message,
+ const Question& q, const ip::address& addr,
+ MessageRenderer& renderer, BasicServer* caller) :
+ question_(q),
+ data_((char*) renderer.getData()), datalen_(renderer.getLength()),
+ msgbuf_(512), caller_(caller)
+{
+ udp proto = addr.is_v4() ? udp::v4() : udp::v6();
+ socket_.reset(new udp::socket(io_service, proto));
+ server_ = udp::endpoint(addr, 53);
+}
+
+void
+UDPQuery::operator()(error_code ec, size_t length) {
+ if (ec) {
+ return;
+ }
+
+ CORO_REENTER (this) {
+ {
+ Message msg(Message::RENDER);
+ msg.setQid(0);
+ msg.setOpcode(Opcode::QUERY());
+ msg.setRcode(Rcode::NOERROR());
+ msg.setHeaderFlag(MessageFlag::RD());
+ msg.addQuestion(question_);
+ MessageRenderer renderer(msgbuf_);
+ msg.toWire(renderer);
+ }
+
+ CORO_YIELD socket_->async_send_to(buffer(msgbuf_.getData(),
+ msgbuf_.getLength()),
+ server_, *this);
+
+ CORO_YIELD socket_->async_receive_from(buffer(data_, datalen_),
+ server_, *this);
+ }
+
+ caller_->resume();
+}
+
+}
More information about the bind10-changes
mailing list